网站建设项目可行性,万户网站建设公司,wordpress主题安装慢,seo文章目录 举个例子
连接器
下载连接器#xff08;connector#xff09;和格式#xff08;format#xff09;jar 包
依赖管理 如何使用连接器 举个例子
StreamExecutionEnvironment集成了DataStream API#xff0c;通过额外的函数扩展了TableEnvironment。
下面代码演示两…目录 举个例子
连接器
下载连接器connector和格式formatjar 包
依赖管理 如何使用连接器 举个例子
StreamExecutionEnvironment集成了DataStream API通过额外的函数扩展了TableEnvironment。
下面代码演示两种API如何互转
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.common.typeinfo import Typesenv StreamExecutionEnvironment.get_execution_environment()
t_env StreamTableEnvironment.create(env)
# create a DataStream
ds env.from_collection([Alice, Bob, John], Types.STRING())# interpret the insert-only DataStream as a Table
t t_env.from_data_stream(ds)# register the Table object as a view and query it
t_env.create_temporary_view(InputTable, t)
res_table t_env.sql_query(SELECT UPPER(f0) FROM InputTable)# interpret the insert-only Table as a DataStream again
res_ds t_env.to_data_stream(res_table)# add a printing sink and execute in DataStream API
res_ds.print()env.execute()
TableEnvironment将采用StreamExecutionEnvironment所有的配置选项。
建议在转换为Table API之前设置DataStream API的所有配置选项如下代码。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.checkpointing_mode import CheckpointingMode# create Python DataStream API
env StreamExecutionEnvironment.get_execution_environment()# set various configuration early
env.set_max_parallelism(256)env.get_config().add_default_kryo_serializer(type_class_name, serializer_class_name)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)# then switch to Python Table API
t_env StreamTableEnvironment.create(env)
# set configuration early
t_env.get_config().set_local_timezone(Europe/Berlin)# start defining your pipelines in both APIs...
连接器
下载连接器connector和格式formatjar 包
由于Flink是一个基于 Java/Scala 的项目连接器connector和格式format的实现是作为 jar 包存在的 要在 PyFlink 作业中使用首先需要将其指定为作业的依赖。
如果使用第三方JAR可以在Python Table API中指定JAR如下所示
table_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)
or
table_env.get_config().get_configuration().set_string(pipeline.classpaths, file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar)
依赖管理
需要在Python API程序中使用依赖项。例如Python用户自定义函数中使用第三方Python库。此外在用机器学习模型预测等场景中用户可能希望在Python自定义函数中加载机器学习模型。
当PyFlink作业在本地执行时可以将第三方Python库安装到本地Python环境中将机器学习模型下载到本地等等。
然而当用户想要将PyFlink任务提交到远程集群时这种方法并不奏效。
除了Table API 在Python DataStream API中则如下配置
stream_execution_environment.add_jars(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar)
stream_execution_environment.add_jars(file:///E:/my/jar/path/connector1.jar, file:///E:/my/jar/path/connector2.jar)
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the
# URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths(file:///my/jar/path/connector1.jar, file:///my/jar/path/connector2.jar) 如何使用连接器
在 PyFlink Table API 中DDL 是定义 source 和 sink 比较推荐的方式这可以通过 TableEnvironment 中的 execute_sql() 方法来完成然后就可以在作业中使用这张表了。
--下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。
from pyflink.table import TableEnvironment, Environmentsettingsdef log_processing():env_settings Environmentsettings.in_streaming_mode()t_env TableEnvironment.create(env_settings)# specify connector and format jarst_env.get_config().get_configuration().set_string(pipeline.jars, file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)source_ddl CREATE TABLE source_table(a VARCHAR,b INT) WITH (connector kafka,topic source_topic,properties.bootstrap.servers kafka:9092,properties.group.id test_3,scan.startup.mode latest-offset,format json)sink_ddl CREATE TABLE sink_table(a VARCHAR) WITH (connector kafka,topic sink_topic,properties.bootstrap.servers kafka:9092,format json)t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table) \.execute_insert(sink_table).wait()if __name__ __main__:log_processing()