如何做电影网站狼视听,wordpress如何安装模板,学网页设计要多长时间,seo网站查询文章目录 01 引言02 简介概述03 基于集合读取数据3.1 集合创建数据流3.2 迭代器创建数据流3.3 给定对象创建数据流3.4 迭代并行器创建数据流3.5 基于时间间隔创建数据流3.6 自定义数据流 04 源码实战demo4.1 pom.xml依赖4.2 创建集合数据流作业4.3 运行结果日志 01 引言
源码地… 文章目录 01 引言02 简介概述03 基于集合读取数据3.1 集合创建数据流3.2 迭代器创建数据流3.3 给定对象创建数据流3.4 迭代并行器创建数据流3.5 基于时间间隔创建数据流3.6 自定义数据流 04 源码实战demo4.1 pom.xml依赖4.2 创建集合数据流作业4.3 运行结果日志 01 引言
源码地址,一键下载可用https://gitee.com/shawsongyue/aurora.git
模块aurora_flink
主类FlinkListSourceJob集合02 简介概述
1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。03 基于集合读取数据
3.1 集合创建数据流
fromCollection(Collection)函数
从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型3.2 迭代器创建数据流
fromCollection(Iterator, Class)
从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。3.3 给定对象创建数据流
fromElements(T ...)
从给定的对象序列中创建数据流。所有的对象必须属于同一类型。3.4 迭代并行器创建数据流
注意使用迭代器的时候对象必须是实现持久化的否则报错详情可以看我的另外一篇文章、
错误org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr784c3487 is not serializable
fromParallelCollection(SplittableIterator, Class)
从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型3.5 基于时间间隔创建数据流
generateSequence
基于给定间隔内的数字序列并行生成数据流。3.6 自定义数据流
addSource - 关联一个新的 source function。例如你可以使用 addSource(new FlinkKafkaConsumer(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。04 源码实战demo
4.1 pom.xml依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xsy/groupIdartifactIdaurora_flink/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version11/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.18.0/flink.version!--scala版本--scala.binary.version2.11/scala.binary.version!--log4j依赖--log4j.version2.17.1/log4j.version/properties!--通用依赖--dependencies!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency!--集成外部依赖--!--集成日志框架 start--dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!--集成日志框架 end--/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassorg.xsy.sevenhee.flink.TestStreamJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build!--配置Maven项目中需要使用的远程仓库--repositoriesrepositoryidaliyun-repos/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/repository/repositories!--用来配置maven插件的远程仓库--pluginRepositoriespluginRepositoryidaliyun-plugin/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/pluginRepository/pluginRepositories/project4.2 创建集合数据流作业
注意Flink根据集群撇嘴可能会启动多个并行度运行可能导致数据重复处理可以通过.setParallelism(1)设置为一个平行度运行即可
package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.SplittableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.sql.DataSource;
import java.util.*;/*** description flink的list集合source应用* author 浅夏的猫* datetime 23:03 2024/1/28
*/
public class FlinkListSourceJob {private static final Logger logger LoggerFactory.getLogger(FlinkListSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式//STREAMING-流模式BATCH-批模式AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);ListString list Arrays.asList(测试, 开发, 运维);// 01 从集合创建数据流DataStreamSourceString dataStreamSource_01 env.fromCollection(list);// 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章
// DataStreamSourceString dataStreamSource_02 env.fromCollection(list.iterator(),String.class);// 03 从给定的对象序列中创建数据流DataStreamSourceString dataStreamSource_03 env.fromElements(测试, 开发, 运维);// 04 从迭代器并行创建数据流NumberSequenceIterator splittableIterator new NumberSequenceIterator(1,10);DataStreamSource dataStreamSource_04env.fromParallelCollection(splittableIterator,Long.TYPE);// 05 基于给定间隔内的数字序列并行生成数据流DataStreamSourceLong dataStreamSource_05 env.generateSequence(1, 10);//自定义数据流DataStreamSourceString dataStreamSource_06 env.addSource(new SourceFunctionString() {Overridepublic void run(SourceContextString sourceContext) throws Exception {//自定义你自己的数据来源for (int i 0; i 10; i) {sourceContext.collect(测试数据 i);}}Overridepublic void cancel() {}});//5.输出打印dataStreamSource_01.print();
// dataStreamSource_02.print();dataStreamSource_03.print();dataStreamSource_04.print();dataStreamSource_05.print();dataStreamSource_06.print();//6.启动运行env.execute();}}
4.3 运行结果日志