双鸭山住房和城乡建设局网站,广告视频拍摄制作,广州关于进一步,企业名称上节成功实现了FlinkKafkaConsumer消费Kafka数据#xff0c;并将数据写入到控制台#xff0c;接下来将继续将计算的结果输入到redis中。
pom.xml
引入redis到pom包
?xml version1.0 encodingUTF-8?
project xmlnshttp://mave…上节成功实现了FlinkKafkaConsumer消费Kafka数据并将数据写入到控制台接下来将继续将计算的结果输入到redis中。
pom.xml
引入redis到pom包
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdflink-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependencies!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion1.17.1/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.maven/groupIdartifactIdmaven-plugin-api/artifactIdversion2.0/version/dependencydependencygroupIdorg.apache.maven.plugin-tools/groupIdartifactIdmaven-plugin-annotations/artifactIdversion3.2/version/dependencydependencygroupIdorg.codehaus.plexus/groupIdartifactIdplexus-utils/artifactIdversion3.0.8/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.8.2/versionscopetest/scope/dependency!--mybatis坐标--dependencygroupIdorg.mybatis/groupIdartifactIdmybatis/artifactIdversion3.4.5/version/dependency!--mysql驱动坐标--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.6/versionscoperuntime/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion1.17.1/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.7.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-redis_2.11/artifactIdversion1.1.0/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-plugin-plugin/artifactIdversion3.2/versionexecutionsexecutionphasepackage/phaseconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filters/configuration/execution/executions/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins
/build
/project
KafkaProducer.java 生产数据存入Kafka
同上一节具体代码
package org.example.snow.demo5;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** author snowsong*/
public class KafkaTestProducer {public static void main(String[] args) throws InterruptedException {Properties props new Properties();// Kafka 集群的初始连接地址props.put(bootstrap.servers, 172.16.1.173:9092);// 序列化器 将 Java 对象序列化为字节数组props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);// kafka生产者KafkaProducerString, String producer new KafkaProducer(props);// 消息循环for (int i 0; i 50; i) {String key key- i;String value value- i;ProducerRecordString, String record new ProducerRecord(xue, key, value);producer.send(record);System.out.println(send: key);Thread.sleep(200);}// 关闭生产者producer.close();}
}
启动服务类
Flink消费Kafka并将结果存入redis。 设置FlinkRedisConfig // 配置 Redis 连接池设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象用于将数据写入 RedisRedisSinkTuple2String, String redisSink new RedisSink(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中作为数据的接收端wordData.addSink(redisSink);MyRedisMapper 它实现了 RedisMapper 接口用于自定义 Redis 数据的映射规则。MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。
public static class MyRedisMapper implements RedisMapperTuple2String,String {/*** 获取当前命令的描述信息。** return 返回Redis命令的描述信息对象其中包含了命令的类型为LPUSH。*/Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** param data 一个包含两个字符串元素的Tuple2对象* return 返回Tuple2对象的第一个元素即键*/Overridepublic String getKeyFromData(Tuple2String,String data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** param data 一个包含两个字符串元素的元组* return 元组中的第二个元素的值*/Overridepublic String getValueFromData(Tuple2String,String data) {return data.f1;}starApp的完整代码如下
package org.example.snow.demo5;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import java.util.Properties;/*** author snowsong*/
public class StartApp {private static final String REDIS_SERVER 0.0.0.0;private static final Integer REDIS_PORT 6379;public static void main(String[] args) throws Exception {// 初始化StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 客户端的连接参数Properties properties new Properties();properties.setProperty(bootstrap.servers, 172.16.1.173:9092);FlinkKafkaConsumer flinkKafkaConsumer new FlinkKafkaConsumer(xue,new SimpleStringSchema(), properties);DataStreamSource dataStreamSource env.addSource(flinkKafkaConsumer);// 将接收的数据映射为二元组SingleOutputStreamOperatorTuple2String, String wordData dataStreamSource.map(new MapFunctionString, Tuple2String, String() {/*** 将输入的字符串映射为 Tuple2 对象。** param value 输入的字符串* return 一个包含两个元素的 Tuple2 对象第一个元素为 l_words第二个元素为输入的字符串* throws Exception 如果发生异常则抛出该异常*/Overridepublic Tuple2String, String map(String value) throws Exception {return new Tuple2(l_words, value);}});// 配置 Redis 连接池设置 Redis 服务器地址和端口并构建对象FlinkJedisPoolConfig conf new FlinkJedisPoolConfig.Builder().setHost(REDIS_SERVER).setPort(REDIS_PORT).build();// 创建 RedisSink 对象用于将数据写入 RedisRedisSinkTuple2String, String redisSink new RedisSink(conf, new MyRedisMapper());// 将 RedisSink 添加到数据流中作为数据的接收端wordData.addSink(redisSink);env.execute();}/*** MyRedisMapper 类用于将 Flink 数据流中的 Tuple2 对象映射到 Redis 命令中。* 它实现了 RedisMapper 接口用于自定义 Redis 数据的映射规则。*/public static class MyRedisMapper implements RedisMapperTuple2String,String {/*** 获取当前命令的描述信息。** return 返回Redis命令的描述信息对象其中包含了命令的类型为LPUSH。*/Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}/*** 从给定的Tuple2数据中获取键。** param data 一个包含两个字符串元素的Tuple2对象* return 返回Tuple2对象的第一个元素即键*/Overridepublic String getKeyFromData(Tuple2String,String data) {return data.f0;}/*** 从给定的元组中获取第二个元素的值。** param data 一个包含两个字符串元素的元组* return 元组中的第二个元素的值*/Overridepublic String getValueFromData(Tuple2String,String data) {return data.f1;}}}运行结果 存入redis结果