当前位置: 首页 > news >正文

建设企业网站企业网上银行官网官方wordpress判断浏览器

建设企业网站企业网上银行官网官方,wordpress判断浏览器,河间做网站 申梦网络,怎么形容网站风格目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors…目录 StreamExecutionEnvironment Watermark watermark策略简介 使用 Watermark 策略 内置水印生成器 处理空闲数据源 算子处理 Watermark 的方式 创建DataStream的方式 通过list对象创建 ​​​​​​使用DataStream connectors创建 使用Table SQL connectors创建 StreamExecutionEnvironment 编写一个 Flink Python DataStream API 程序首先需要声明一个执行环境StreamExecutionEnvironment这是流式程序执行的上下文。 你将通过它来设置作业的属性例如默认并发度、重启策略等、创建源、并最终触发作业的执行。 env StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) env.set_parallelism(1) 创建了 StreamExecutionEnvironment 之后你可以使用它来声明数据源。数据源从外部系统如 Apache Kafka、Rabbit MQ 或 Apache Pulsar拉取数据到 Flink 作业里。 为了简单起见本教程读取文件作为数据源。 ds env.from_source(sourceFileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategyWatermarkStrategy.for_monotonous_timestamps(),source_namefile_source ) Watermark 大部分情况下流到operator的数据都是按照事件产生的时间顺序来的但是也不排除由于网络、分布式等原因导致乱序的产生所谓乱序就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。 为了解决乱序数据flink引入watermark。引入watermark机制则会等待晚到的数据一段时间等待时间到则触发计算如果数据延迟很大通常也会被丢弃或者另外处理。 为了使用事件时间语义Flink 应用程序需要知道事件时间戳对应的字段意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。 watermark策略简介 时间戳的分配与 watermark 的生成是齐头并进的其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。 使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略并且用户也可以在某些必要场景下构建自己的 watermark 策略。 使用 Watermark 策略 WatermarkStrategy 可以在 Flink 应用程序中的两处使用第一种是直接在数据源上使用第二种是直接在非数据源的操作之后使用。 第一种方式相比会更好因为数据源可以利用 watermark 生成逻辑中有关分片/分区shards/partitions/splits的信息。使用这种方式数据源通常可以更精准地跟踪 watermark整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。 仅当无法直接在数据源上设置策略时才应该使用第二种方式在任意转换操作之后设置 WatermarkStrategy 内置水印生成器 水印策略定义了如何在流源中生成水印。WatermarkStrategy是生成水印的WatermarkGenerator和分配记录内部时间戳的TimestampAssigner的生成器/工厂。 BoundedOutOfOrdernessDuration为创建WatermarkStrategy常见的内置策略。 for_bound_out_of_ordernness(max_out_of_ordernesspyflink.common.time.Duration)为记录无序的情况创建水印策略但可以设置事件无序程度的上限。 无序绑定B意味着一旦遇到时间戳为T的事件就不会再出现早于T-B的事件。 for_bound_out_of_ordernness(5) for_mononous_timestamps()为时间戳单调递增的情况创建水印策略。 水印是定期生成的并严格遵循数据中的最新时间戳。该策略引入的延迟主要是生成水印的周期间隔。 WatermarkStrategy.for_monotonous_timestamps() with_timestamp_assigner(timestamp_assigner:pyflink.common.watermark_strategy.TimestampAssigner) 创建一个新的WatermarkStrategy该策略通过实现TimestampAssigner接口使用给定的TimestampAssigner。 参数: timestamp_assigner 给定的TimestampAssigner。 Return: 包装TimestampAssigner的WaterMarkStrategy。 watermark_strategy WatermarkStrategy.for_monotonous_timestamps() with_timestamp_assigner(MyTimestampAssigner()) 处理空闲数据源 如果数据源中的某一个分区/分片在一段时间内未发送事件数据则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入或空闲源。在这种情况下当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值则其 watermark 将不会发生变化。 为了解决这个问题你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口withIdleness(Duration.ofMinutes(1)) with_idleness(idle_timeout:pyfrink.common.time.Duration) 创建一个新的丰富的WatermarkStrategy它也在创建的WatermarkGenerator中执行空闲检测。 参数idle_timeout–空闲超时。 Return配置了空闲检测的新水印策略。 算子处理 Watermark 的方式 一般情况下在将 watermark 转发到下游之前需要算子对其进行触发的事件完全进行处理。例如WindowOperator 将首先计算该 watermark 触发的所有窗口数据当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后其才会被发送到下游。换句话说由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。 相同的规则也适用于 TwoInputstreamOperator。但是在这种情况下算子当前的 watermark 会取其两个输入的最小值。 创建DataStream的方式 通过list对象创建 from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironmentenv StreamExecutionEnvironment.get_execution_environment() ds env.from_collection(collection[(1, aaa|bb), (2, bb|a), (3, aaa|a)],type_infoTypes.ROW([Types.INT(), Types.STRING()])) ​​​​​​使用DataStream connectors创建 使用add_source函数此函数仅支持FlinkKafkaConsumer仅在streaming执行模式下使用 from pyflink.common.serialization import JsonRowDeserializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FlinkKafkaConsumerenv StreamExecutionEnvironment.get_execution_environment() # the sql connector for kafka is used here as its a fat jar and could avoid dependency issues env.add_jars(file:///path/to/flink-sql-connector-kafka.jar) deserialization_schema JsonRowDeserializationSchema.builder() \.type_info(type_infoTypes.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer FlinkKafkaConsumer(topicstest_source_topic,deserialization_schemadeserialization_schema,properties{bootstrap.servers: localhost:9092, group.id: test_group})ds env.add_source(kafka_consumer) 使用from_source函数此函数仅支持NumberSequenceSource和FileSource自定义数据源仅在streaming执行模式下使用 from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import NumberSequenceSourceenv StreamExecutionEnvironment.get_execution_environment() seq_num_source NumberSequenceSource(1, 1000) ds env.from_source(sourceseq_num_source,watermark_strategyWatermarkStrategy.for_monotonous_timestamps(),source_nameseq_num_source,type_infoTypes.LONG()) ​​​​​​​使用Table SQL connectors创建 首先用Table SQL connectors创建表再转换为DataStream. from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironmentenv StreamExecutionEnvironment.get_execution_environment() t_env StreamTableEnvironment.create(stream_execution_environmentenv)t_env.execute_sql(CREATE TABLE my_source (a INT,b VARCHAR) WITH (connector datagen,number-of-rows 10))ds t_env.to_append_stream(t_env.from_path(my_source),Types.ROW([Types.INT(), Types.STRING()]))
http://www.zqtcl.cn/news/383882/

相关文章:

  • 扬中网站建设 优帮云望野古诗带拼音
  • 网站和discuz同步登录建设产品网站课程
  • 常州做网站多少钱图片链接在线生成器
  • 服务器网站打不开可以做哪些网站
  • 建设银行网站信息补充网站如何备份
  • 网站建设 杭州市萧山区网页此站点不安全
  • 微网站免费开发平台钟表珠宝商城网站建设
  • 帮建网站的人wordpress广告栏
  • 怎么学建网站教做甜品网站
  • 建网站 服务器需要安装 tomcat安徽城乡建设 厅网站
  • 建筑公司企业简介模板关于网站优化的文章
  • 绥化网站建设兼职互联网大厂设计哪家口碑好
  • 成交型网站建设公司六安亿联网络科技有限公司
  • 优秀行业网站广州网站建设怎么样
  • 南宁建设信息网seo推广公司排名
  • 凯发网站国外网站博客网站也可以做引流
  • 网站设计要学什么vestacp wordpress
  • 模板建站代理3免费做网站
  • 酒店官方网站的功能建设百度网盟推广案例
  • 屯昌网站建设wap网站搭建
  • 毕设做音乐网站重庆正云环境网页制作
  • 免费网站建站w深圳罗湖建网站
  • 创建一个网站一般步骤有哪些互动网站策划
  • 文化传媒 网站设计宿迁网站建设价格
  • 网站开发五人分工是网站推广的案例
  • 海外网站制作seo技术
  • 包头网站建设熊掌号免费行情100个软件
  • 江门网站制作维护电子商务网站运营与管理
  • 动画网页制作网站常用的网络推广方法有
  • 一个设计网站多少钱sku电商是什么意思