当前位置: 首页 > news >正文

做网站比较好的网站开发摊销期多少年

做网站比较好的,网站开发摊销期多少年,潍坊营销网站,湖南省博物馆网站建设背景 在flink中#xff0c;我们需要对我们写的map转换函数#xff0c;process处理函数进行单元测试#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新#xff0c;本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…背景 在flink中我们需要对我们写的map转换函数process处理函数进行单元测试测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测试的是数据流的类型选择不同的测试套件如下所示 OneInputStreamOperatorTestHarness适用于 DataStreams 数据流KeyedOneInputStreamOperatorTestHarness适用于 KeyedStreams 分组后的数据流TwoInputStreamOperatorTestHarness适用于两个数据流DataStream的 ConnectedStreamKeyedTwoInputStreamOperatorTestHarness适用于两个 KeyedStream 的 ConnectedStream 其次根据是测试map函数还是process函数我们选择不同的操作符如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子 map函数测试代码: Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarnessString, String testHarness // KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarnessString, String, String(new StreamFlatMap(statefulFlatMap),x - 1, Types.STRING);testHarness.open();// test first recordtestHarness.processElement(world, 10);ValueStateString previousInput statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor(previousInput, Types.STRING));String stateValue previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals(world, stateValue);// test second recordtestHarness.processElement(parallel, 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(hello parallel world, 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals(parallel, previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunctionString, String {ValueStateString previousInput;Overridepublic void open(Configuration parameters) throws Exception {previousInput getRuntimeContext().getState(new ValueStateDescriptorString(previousInput, Types.STRING));}Overridepublic void flatMap(String in, CollectorString collector) throws Exception {String out hello in;if(previousInput.value() ! null){out out previousInput.value();}previousInput.update(in);collector.collect(out);} }process处理函数代码: Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10)),testHarness.extractOutputStreamRecords());}Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction new MyProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x - 1, Types.STRING);testHarness.open();testHarness.processElement(world, 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord(hello world, 10),new StreamRecord(Timer triggered at timestamp 50)),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunctionString, String, String {Overridepublic void processElement(String in, Context context, CollectorString collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out hello in;collector.collect(out);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {out.collect(String.format(Timer triggered at timestamp %d, timestamp));}}此外附加官方的map函数的测试代码 /** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the License); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an AS IS BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Collector;import org.junit.Assert; import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {link StreamMap}. These test that:** ul* liRichFunction methods are called correctly* liTimestamps of processed elements match the input timestamp* liWatermarks are correctly forwarded* /ul*/ public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunctionInteger, Integer {private static final long serialVersionUID 1L;Overridepublic void flatMap(Integer value, CollectorInteger out) throws Exception {if (value % 2 0) {out.collect(value);out.collect(value * value);}}}Testpublic void testFlatMap() throws Exception {StreamFlatMapInteger, Integer operator new StreamFlatMapInteger, Integer(new MyFlatMap());OneInputStreamOperatorTestHarnessInteger, Integer testHarness new OneInputStreamOperatorTestHarnessInteger, Integer(operator);long initialTime 0L;ConcurrentLinkedQueueObject expectedOutput new ConcurrentLinkedQueueObject();testHarness.open();testHarness.processElement(new StreamRecordInteger(1, initialTime 1));testHarness.processElement(new StreamRecordInteger(2, initialTime 2));testHarness.processWatermark(new Watermark(initialTime 2));testHarness.processElement(new StreamRecordInteger(3, initialTime 3));testHarness.processElement(new StreamRecordInteger(4, initialTime 4));testHarness.processElement(new StreamRecordInteger(5, initialTime 5));testHarness.processElement(new StreamRecordInteger(6, initialTime 6));testHarness.processElement(new StreamRecordInteger(7, initialTime 7));testHarness.processElement(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(2, initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 2));expectedOutput.add(new Watermark(initialTime 2));expectedOutput.add(new StreamRecordInteger(4, initialTime 4));expectedOutput.add(new StreamRecordInteger(16, initialTime 4));expectedOutput.add(new StreamRecordInteger(6, initialTime 6));expectedOutput.add(new StreamRecordInteger(36, initialTime 6));expectedOutput.add(new StreamRecordInteger(8, initialTime 8));expectedOutput.add(new StreamRecordInteger(64, initialTime 8));TestHarnessUtil.assertOutputEquals(Output was not correct., expectedOutput, testHarness.getOutput());}Testpublic void testOpenClose() throws Exception {StreamFlatMapString, String operator new StreamFlatMapString, String(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarnessString, String testHarness new OneInputStreamOperatorTestHarnessString, String(operator);long initialTime 0L;testHarness.open();testHarness.processElement(new StreamRecordString(Hello, initialTime));testHarness.close();Assert.assertTrue(RichFunction methods where not called., TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue(Output contains no elements., testHarness.getOutput().size() 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunctionString, String {private static final long serialVersionUID 1L;public static boolean openCalled false;public static boolean closeCalled false;Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail(Close called before open.);}openCalled true;}Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail(Open was not called before close.);}closeCalled true;}Overridepublic void flatMap(String value, CollectorString out) throws Exception {if (!openCalled) {Assert.fail(Open was not called before run.);}out.collect(value);}} } 包含同时测试FlatMap和RichFlatMap函数但是其中没有操作状态我前面的例子包含了RichFlatMap状态的测试 参考文献 https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/
http://www.zqtcl.cn/news/836857/

相关文章:

  • 自己做的网站怎么在百度搜索到网页制作论文3000字
  • 如何网站托管中国跨境电商平台有多少
  • 手机p2p网站做平面设计兼职的网站有哪些
  • 贵金属网站建设唐山网站制作工具
  • 网站入门成都网站制作沈阳
  • 接做网站单子的网站做网站要会那些ps
  • 做盗市相关网站wordpress速度优化简书
  • 贵阳手机网站建设公司国内永久免费云服务器
  • 温州做网站定制哪家网络推广公司好
  • 招聘网站怎么做线下活动网站后台管理系统怎么开发
  • 西湖区外贸网站建设商梦建站
  • 网站首页设计注意斗蟋蟀网站建设
  • 石家庄网站建设远策科技网站建设公司人员配备
  • 手机怎么建网站链接专门做鞋子的网站吗
  • 网站建设设计作品怎么写网站建设 网站内容 采集
  • 自己做网站nas如何做网站大图片
  • 网站优化定做嘉兴模板建站代理
  • 南宁做网站比较好的公司有哪些花乡科技园区网站建设
  • 网站注册平台怎么注册申请空间 建立网站吗
  • 汕头住房与城乡建设网站做网站视频 上传到哪儿
  • 东莞网站关键词优化福建个人网站备案
  • 国外获奖flash网站泉州网站制作专业
  • 万网域名注册后如何做网站教学上海app开发和制作公司
  • 恩施网站建设公司个人网站怎么制作成图片
  • 泸州高端网站建设公司上海企业网站
  • wordpress 建站 知乎济南全包圆装修400电话
  • 织梦建设两个网站 视频影视公司宣传片
  • 北京小企业网站建设那个做网站好
  • 怎样用模块做网站深圳网站建设制作厂家
  • 网站项目中的工作流程网站建设社区