当前位置: 首页 > news >正文

网站建设项目可行性万户网站建设公司

网站建设项目可行性,万户网站建设公司,wordpress主题安装慢,seo文章目录 举个例子 连接器 下载连接器#xff08;connector#xff09;和格式#xff08;format#xff09;jar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API#xff0c;通过额外的函数扩展了TableEnvironment。 下面代码演示两…目录 举个例子 连接器 下载连接器connector和格式formatjar 包 依赖管理 如何使用连接器 举个例子 StreamExecutionEnvironment集成了DataStream API通过额外的函数扩展了TableEnvironment。 下面代码演示两种API如何互转 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.common.typeinfo import Typesenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(env) # create a DataStream ds env.from_collection([Alice, Bob, John], Types.STRING())# interpret the insert-only DataStream as a Table t t_env.from_data_stream(ds)# register the Table object as a view and query it t_env.create_temporary_view(InputTable, t) res_table t_env.sql_query(SELECT UPPER(f0) FROM InputTable)# interpret the insert-only Table as a DataStream again res_ds t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API res_ds.print()env.execute() TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。 建议在转换为Table API之前设置DataStream API的所有配置选项如下代码。 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API env StreamExecutionEnvironment.get_execution_environment()# set various configuration early env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer(type_class_name, serializer_class_name) env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API t_env StreamTableEnvironment.create(env) # set configuration early t_env.get_config().set_local_timezone(Europe/Berlin)# start defining your pipelines in both APIs... 连接器 下载连接器connector和格式formatjar 包 由于Flink是一个基于 Java/Scala 的项目连接器connector和格式format的实现是作为 jar 包存在的 要在 PyFlink 作业中使用首先需要将其指定为作业的依赖。 如果使用第三方JAR可以在Python Table API中指定JAR如下所示 table_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar) or table_env.get_config().get_configuration().set_string(pipeline.classpaths, file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar) 依赖管理 需要在Python API程序中使用依赖项。例如Python用户自定义函数中使用第三方Python库。此外在用机器学习模型预测等场景中用户可能希望在Python自定义函数中加载机器学习模型。 当PyFlink作业在本地执行时可以将第三方Python库安装到本地Python环境中将机器学习模型下载到本地等等。 然而当用户想要将PyFlink任务提交到远程集群时这种方法并不奏效。 除了Table API 在Python DataStream API中则如下配置 stream_execution_environment.add_jars(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) stream_execution_environment.add_jars(file:///E:/my/jar/path/connector1.jar, file:///E:/my/jar/path/connector2.jar) # NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the # URLs are accessible on both the client and the cluster. stream_execution_environment.add_classpaths(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) 如何使用连接器 在 PyFlink Table API 中DDL 是定义 source 和 sink 比较推荐的方式这可以通过 TableEnvironment 中的 execute_sql() 方法来完成然后就可以在作业中使用这张表了。 --下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。 from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings Environmentsettings.in_streaming_mode()t_env TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)source_ddl CREATE TABLE source_table(a VARCHAR,b INT) WITH (connector kafka,topic source_topic,properties.bootstrap.servers kafka:9092,properties.group.id test_3,scan.startup.mode latest-offset,format json)sink_ddl CREATE TABLE sink_table(a VARCHAR) WITH (connector kafka,topic sink_topic,properties.bootstrap.servers kafka:9092,format json)t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table) \.execute_insert(sink_table).wait()if __name__ __main__:log_processing()
http://www.zqtcl.cn/news/996140/

相关文章:

  • 足球梦网站建设的基本思路网站介绍词
  • 森马网站建设情况网站推广中应注意哪些事项
  • 简单网站vs2008不能新建网站
  • 牌具做网站可以吗海外广告投放公司
  • 响应式单页网站模板宁波企业自助建站
  • 网站广告收费标准装饰设计公司起名
  • 网站开发人员构成中国兰州网官网
  • 网站设计的提案旅游网站建设风格
  • 成都网站建设的公司做高大上分析的网站
  • 专业企业网站建设公司成都的网站
  • 广东省建设教育协会官方网站首页怎么设置wordpress头像
  • 图书网站建设论文页游中心
  • 建网站的流程及注意事项任务网站建设
  • 河北邯郸做网站的公司哪家好辽源市住房和城乡建设局网站
  • 网站系统建设技术服务费安康市网站建设
  • 网络运行管理系统seo关键词优化方法
  • 西安学校网站建设价格徐州网页关键词优化
  • 上海哪个网站能应聘做家教的营销网站中最重要的部分是
  • 一个设计网站多少钱WordPress的简约博客主题
  • 普通的宣传网站用什么做济南市工程建设技术监督局网站
  • 合肥网站建设公司还有不dw如何制作表格网页
  • 讯美智能网站建设自己域名做网站
  • 自己做网站优化韩国外贸平台
  • 齐河建设局网站长沙市住房和建设局官方网站
  • 萧山区住房和城乡建设局网站wordpress网站合并
  • 做背景网站网站建设与维护制作网页
  • 网站建设公司知名营销型企业网站项目策划表
  • 写作网站哪个最好企业培训机构有哪些
  • 江苏省水利工程建设局网站域名不备案可以正常使用吗
  • 对网站开发语言的统计网站内容建设包括什么