昆明免费网站制作,公司怎么找做网站,网络营销的未来趋势,瑞安微信网站测试 Flink 作业
a#xff09;JUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则#xff0c;用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.
要使用 MiniClusterWit…测试 Flink 作业
aJUnit 规则 MiniClusterWithClientResource
Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.
要使用 MiniClusterWithClientResource需要添加一个额外的依赖项测试范围。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-test-utils/artifactIdversion1.19.0/version scopetest/scope
/dependency示例MapFunction
public class IncrementMapFunction implements MapFunctionLong, Long {Overridepublic Long map(Long record) throws Exception {return record 1;}
}在本地 Flink 集群使用这个 MapFunction 的简单 pipeline如下所示。
public class ExampleIntegrationTest {ClassRulepublic static MiniClusterWithClientResource flinkCluster new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberSlotsPerTaskManager(2).setNumberTaskManagers(1).build());Testpublic void testIncrementPipeline() throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(1L, 21L, 22L).map(new IncrementMapFunction()).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(2L, 22L, 23L));}// create a testing sinkprivate static class CollectSink implements SinkFunctionLong {// must be staticpublic static final ListLong values Collections.synchronizedList(new ArrayList());Overridepublic void invoke(Long value, SinkFunction.Context context) throws Exception {values.add(value);}}
}使用 MiniClusterWithClientResource 进行集成测试的注意
为了不将整个 pipeline 代码从生产复制到测试请将 source 和 sink 在生产代码中设置成可插拔的并在测试中注入特殊的测试 source 和测试 sink。这里使用 CollectSink 中的静态变量是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者可以使用测试的 sink 将数据写入临时目录的文件中。如果作业使用事件时间定时器则可以实现自定义的 并行 源函数来发出 watermark。建议始终以 parallelism 1 的方式在本地测试 pipeline以识别只有在并行执行 pipeline 时才会出现的 bug。优先使用 ClassRule 而不是 Rule这样多个测试可以共享同一个 Flink 集群。可以节省大量的时间因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。如果 pipeline 包含自定义状态处理则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此需要在 pipeline 中仅测试抛出用户自定义函数的异常来触发失败。