当前位置: 首页 > news >正文

双鸭山住房和城乡建设局网站广告视频拍摄制作

双鸭山住房和城乡建设局网站,广告视频拍摄制作,广州关于进一步,企业名称上节成功实现了FlinkKafkaConsumer消费Kafka数据#xff0c;并将数据写入到控制台#xff0c;接下来将继续将计算的结果输入到redis中。 pom.xml 引入redis到pom包 ?xml version1.0 encodingUTF-8? project xmlnshttp://mave…上节成功实现了FlinkKafkaConsumer消费Kafka数据并将数据写入到控制台接下来将继续将计算的结果输入到redis中。 pom.xml 引入redis到pom包 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdflink-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.maven/groupIdartifactIdmaven-plugin-api/artifactIdversion2.0/version/dependencydependencygroupIdorg.apache.maven.plugin-tools/groupIdartifactIdmaven-plugin-annotations/artifactIdversion3.2/version/dependencydependencygroupIdorg.codehaus.plexus/groupIdartifactIdplexus-utils/artifactIdversion3.0.8/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.8.2/versionscopetest/scope/dependency!--mybatis坐标--dependencygroupIdorg.mybatis/groupIdartifactIdmybatis/artifactIdversion3.4.5/version/dependency!--mysql驱动坐标--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.6/versionscoperuntime/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.7.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-plugin-plugin/artifactIdversion3.2/versionexecutionsexecutionphasepackage/phaseconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filters/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins /build /project KafkaProducer.java 生产数据存入Kafka 同上一节具体代码 package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** author snowsong*/ public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props new Properties();// Kafka 集群的初始连接地址props.put(bootstrap.servers, 172.16.1.173:9092);// 序列化器 将 Java 对象序列化为字节数组props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// kafka生产者KafkaProducerString, String producer new KafkaProducer(props);// 消息循环for (int i 0; i 50; i) {String key key- i;String value value- i;ProducerRecordString, String record new ProducerRecord(xue, key, value);producer.send(record);System.out.println(send: key);Thread.sleep(200);}// 关闭生产者producer.close();} } 启动服务类 Flink消费Kafka并将结果存入redis。 设置FlinkRedisConfig // 配置 Redis 连接池设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象用于将数据写入 RedisRedisSinkTuple2String, String redisSink new RedisSink(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中作为数据的接收端wordData.addSink(redisSink);MyRedisMapper 它实现了 RedisMapper 接口用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。 public static class MyRedisMapper implements RedisMapperTuple2String,String {/*** 获取当前命令的描述信息。** return 返回Redis命令的描述信息对象其中包含了命令的类型为LPUSH。*/Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** param data 一个包含两个字符串元素的Tuple2对象* return 返回Tuple2对象的第一个元素即键*/Overridepublic String getKeyFromData(Tuple2String,String data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** param data 一个包含两个字符串元素的元组* return 元组中的第二个元素的值*/Overridepublic String getValueFromData(Tuple2String,String data) {return data.f1;}starApp的完整代码如下 package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** author snowsong*/ public class StartApp {private static final String REDIS_SERVER 0.0.0.0;private static final Integer REDIS_PORT 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties new Properties();properties.setProperty(bootstrap.servers, 172.16.1.173:9092);FlinkKafkaConsumer flinkKafkaConsumer new FlinkKafkaConsumer(xue,new SimpleStringSchema(), properties);DataStreamSource dataStreamSource env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperatorTuple2String, String wordData dataStreamSource.map(new MapFunctionString, Tuple2String, String() {/*** 将输入的字符串映射为 Tuple2 对象。** param value 输入的字符串* return 一个包含两个元素的 Tuple2 对象第一个元素为 l_words第二个元素为输入的字符串* throws Exception 如果发生异常则抛出该异常*/Overridepublic Tuple2String, String map(String value) throws Exception {return new Tuple2(l_words, value);}});// 配置 Redis 连接池设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象用于将数据写入 RedisRedisSinkTuple2String, String redisSink new RedisSink(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapperTuple2String,String {/*** 获取当前命令的描述信息。** return 返回Redis命令的描述信息对象其中包含了命令的类型为LPUSH。*/Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** param data 一个包含两个字符串元素的Tuple2对象* return 返回Tuple2对象的第一个元素即键*/Overridepublic String getKeyFromData(Tuple2String,String data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** param data 一个包含两个字符串元素的元组* return 元组中的第二个元素的值*/Overridepublic String getValueFromData(Tuple2String,String data) {return data.f1;}}}运行结果 存入redis结果
http://www.zqtcl.cn/news/711160/

相关文章:

  • wordpress 中文网店杭州排名优化公司
  • wordpress建站安全吗wordpress企业主题教程
  • 网站构建的开发费用信息管理系统网站开发教程
  • 自己做网站怎么维护wordpress素材模板
  • 如何选择一个好的优质网站建设公司wordpress 主题小工具
  • mysql数据库做网站广州网站seo地址
  • 福建省住房和城乡建设厅网站电话网站开发项目步骤
  • 网站注册域名多少钱淘宝网商城
  • 做架构图的网站网站和网店的区别
  • 做红包网站简单个人网站设计
  • 新手学做网站pdf手wordpress修改搜索框
  • 做湲兔费网站视颍如何通过查询网站注册时间
  • 重庆cms建站模板南通网站建设推广优化
  • 合肥网站建设的公司新闻类网站如何做量化统计
  • 好用的在线地图网站十六局集团门户网
  • 网站开发数据库连接失败广州网站建站平台
  • 鄂尔多斯北京网站建设加盟网站建设的内容
  • 网站 被 抄袭不属于营销型网站的特点
  • 浙江英文网站建设互联网公司排名2021完整版
  • 完美代码的网站python开发工具
  • 餐饮网站开发参考文献网站建设500错误代码
  • 网站开发关键技术网站自动推广软件免费
  • 前端学习网站南阳东莞网站建设公司哪家好
  • 关于做网站的了解点wordpress小程序插曲
  • PHP网站开发与管理设计心得个人可以做聊天网站备案吗
  • 开公司可以在哪些网站做推广上海画册设计
  • 成都高新区规划建设局网站网络营销方式有哪些?举例说明
  • 国家企业信用公信系统入口seo服务
  • 个人网站网页模板室内装修设计自学软件
  • 什么网站可以做告白的网页网站模板套用湖南岚鸿