当前位置: 首页 > news >正文

长沙市师德师风建设网站做电子相册的网站

长沙市师德师风建设网站,做电子相册的网站,店面设计有哪些,开小程序要多少钱前言#xff1a; 这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API#xff0c;以kafka为数据源#xff0c;构建一个基础测试环境#xff1b;包含一个kafka生产者线程工具#xff0c;一个自定义FilterFunction算子#xff0c;一个自定义MapFunctio…前言 这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API以kafka为数据源构建一个基础测试环境包含一个kafka生产者线程工具一个自定义FilterFunction算子一个自定义MapFunction算子用一个flink任务的代码逻辑将实时读kafka并多层处理串起来让读者体会通过Flink构建自定义函数的技巧。 一、Flink的开发模块分析 Flink提供四个基础模块核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet基于这两个SDK在其上包装了TableAPI开发模块的SDK在Table API之上定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下还有基础API的接口可用于对时间状态算子等最细粒度的特性对象做操作如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig FLINK开发API关系结构如下 二、定制化开发Demo演示 2.1 场景介绍 Flink实时任务的的通用技术架构是消息队列中间件Flink任务 将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource监控Topic的数据情况做实时处理。 这里提供一个kafka的生产者线程可以自定义构建需要的数据和上传时间用于控制写入kafka的数据源重写两个DataStream的基础算子FilterFunction和MapFunction,用于让读者体会如何对FLINK函数的重新包装后续更基础的函数原理一样我这里用String数据对象做处理减少对象转换的SDK引入通常要基于业务做数据polo的加工这个自己处理将对象换成业务对象然后使用Flink将整个业务串起来从kafka读数据经过两层处理最终输出需要的结果 2.2 本地demo演示 2.2.1 pom文件 这里以flink1.14.6scala1.12版本为例 2.2.2 kafka生产者线程方法 package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生产数据** author i7杨* date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic topic;}private static ProducerString, String createProducer() {// 通过Properties类设置Producer的属性Properties properties new Properties(); // 测试环境 kafka 配置properties.put(bootstrap.servers, ip2:9092,ip:9092,ip3:9092);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);return new KafkaProducerString, String(properties);}Overridepublic void run() {ProducerString, String producer createProducer();Random random new Random();Random random2 new Random();while (true) {int nums random.nextInt(10);int nums2 random.nextInt(50); // double nums2 random2.nextDouble();String time new Date().getTime() / 1000 5 ;String type pv;try {if (nums2 % 2 0) {type pv;} else {type uv;} // String info {\user\: nums ,\item\: nums * 10 ,\category\: nums2 ,\pv\: nums2 * 5 ,\ts\:\ time \};String info nums nums2;System.out.println(message : info);producer.send(new ProducerRecordString, String(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println(数据已经写入);try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil(test01).run();}public static void sendMessage(String topic, String message) {ProducerString, String producer createProducer();producer.send(new ProducerRecordString, String(topic, message));}}2.2.3 自定义基础函数 这里自定义了filter和map两个算子函数测试逻辑按照数据结构变化 自定义FilterFunction函数算子阈值小于40的过滤掉 package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重构** author i7杨* date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunctionString {private double threshold;public InfoFilterFunction(double threshold) {this.threshold threshold;}Overridepublic boolean filter(String value) throws Exception {if (value.split().length 2)// 阈值过滤return Double.valueOf(value.split()[1]) threshold;else return false;} }自定义MapFunction函数后缀为2的添加上特殊信息 package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunctionString, String {Overridepublic String map(String value) throws Exception {System.out.println(value: value);if (value.endsWith(2))return value.concat(:Special processing information);else return value;} } 2.2.4 flink任务代码 任务逻辑使用kafka工具产生数据然后监控kafka的topic讲几个函数串起来输出结果 package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.example.funtion.ActionMapFunction; import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip1:9092,ip2:9092,ip3:9092);kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, flink-consumer-group);kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 创建 Kafka 消费者FlinkKafkaConsumerString kafkaConsumer new FlinkKafkaConsumer(test01,// Kafka 主题名称new SimpleStringSchema(),kafkaProps);// 从 Kafka 中读取数据流DataStreamString kafkaStream env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print(阈值大于40以上的message);// 执行任务env.execute(This is a testing task);}}运行结果
http://www.zqtcl.cn/news/613516/

相关文章:

  • 做的成功的地方网站办公室工装设计公司
  • 怎样添加网站上百度商桥代码网站建设实验报告手写
  • 江阴做网站优化辽宁世纪兴电子商务服务中心
  • 最新创建的网站搭建网站的平台有哪些
  • 全国房地产网站企管宝app下载
  • 无线网络网站dns解析失败南通模板建站多少钱
  • h5手机网站建设哪家好北京海淀房管局网站
  • 制作一个简单的网站冬奥会网页设计代码
  • 如何做网站 百度西充建设部门投诉网站
  • 怎么创建自己的博客网站网站优化主要内容
  • 太原网站建设推广建设网站观澜
  • 网站开发员名称是什么网站制作教程及流程
  • 建设财经资讯网站的目的移动端网站模板怎么做的
  • 受欢迎的赣州网站建设青岛建站
  • 青海网站制作哪家好烟台龙口网站建设
  • 婚恋网站排名前十名网站建设的论坛
  • 进行网站建设有哪些重要意义上海浦东建筑建设网站污水处理工程
  • 自己做qq代刷网站要钱吗瑞安网站建设优化推广
  • 建设网站招标定制高端网站建设报价
  • 商城网站建设code521广州安全教育平台登录入囗
  • 如何做网站系统安庆网站建设公司简
  • 北京做网站电话的公司网站怎么做外链
  • 手工艺品外贸公司网站建设方案复古风格网站
  • 企业网站后端模板如何编写手机程序
  • 泰州网站建设服务好wordpress 子分类
  • 做个企业网站要多少钱php mysql怎么编写视频网站
  • 精仿手表网站做网站为什么要做备案接入
  • 哈什么网一个网站做ppt清新区城乡建设局网站
  • 重庆专业网站建设首页排名网站模板广告去除
  • 河南省建设行业证书查询网站怎么用ps做网站首页背景图片