当前位置: 首页 > news >正文

云南做网站公司优秀网站建设空间

云南做网站公司,优秀网站建设空间,微信小程序怎么做成链接,注册商标需要多长时间前言 本着学习hudi-flink源码的目的#xff0c;利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。 版本 Flink 1.15.4Hudi 0.13.0 目标 在文章Hudi Flink SQL代码示例及本地调试中提到#xff1a;我们…前言 本着学习hudi-flink源码的目的利用之前总结的文章Hudi Flink SQL代码示例及本地调试中的代码进行调试,记录调试学习过程中主要的步骤及对应源码片段。 版本 Flink 1.15.4Hudi 0.13.0 目标 在文章Hudi Flink SQL代码示例及本地调试中提到我们发现Table API的入口和DataStream API的入口差不多DataStream API的入口是在HoodiePipeline的sink和source方法里而这两个方法也是分别调用了HoodieTableFactory的createDynamicTableSink和createDynamicTableSource。那么Table API的代码怎么一步一步走到createDynamicTableSink和createDynamicTableSource的呢返回HoodieTableSink之后又是怎么写数据的因为我发现Hudi写数据的主要逻辑入口好像是在HoodieTableSink.getSinkRuntimeProvider的方法体里这些问题之前都没有搞清楚所以这次的目标就是要搞清楚1、Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤 2、在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体进行后面的写Hudi逻辑的 相关类 HoodiePipeline DataStream APIHoodieTableFactoryHoodieTableSinkDataStreamSinkProviderAdapter 函数式接口TableEnvironmentImplBatchPlannerPlannerBaseFactoryUtilBatchExecSinkCommonExecSink DataStream API 其实上面的问题在DataStream API代码里很容易看出来我们先看一下DataStream API写Hudi的代码详细代码在文章Flink Hudi DataStream API代码示例 DataStreamRowData dataStream env.fromElements(GenericRowData.of(1, StringData.fromString(hudi1), 1.1, 1000L, StringData.fromString(2023-04-07)),GenericRowData.of(2, StringData.fromString(hudi2), 2.2, 2000L, StringData.fromString(2023-04-08)) );HoodiePipeline.Builder builder HoodiePipeline.builder(targetTable).column(id int).column(name string).column(price double).column(ts bigint).column(dt string).pk(id).partition(dt).options(options);builder.sink(dataStream, false);HoodiePipeline.Builder.sink public DataStreamSink? sink(DataStreamRowData input, boolean bounded) {TableDescriptor tableDescriptor getTableDescriptor();return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);}HoodiePipeline.sink private static DataStreamSink? sink(DataStreamRowData input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {FactoryUtil.DefaultDynamicTableContext context Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));HoodieTableFactory hoodieTableFactory new HoodieTableFactory();return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context).getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded))).consumeDataStream(input);}在HoodiePipeline.sink就可以找到答案 1、HoodieTableFactory.createDynamicTableSink 返回HoodieTableSink 2、HoodieTableSink.getSinkRuntimeProvider 返回DataStreamSinkProviderAdapter 3、DataStreamSinkProviderAdapter.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体执行后面的写Hudi逻辑。这里的dataStream为我们最开始在程序里创建的DataStreamRowData HoodieTableSink.getSinkRuntimeProvider getSinkRuntimeProvider返回DataStreamSinkProviderAdapter,其中Lambda 表达式dataStream - {}为DataStreamSinkProviderAdapter.consumeDataStream(dataStream)的具体实现 Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {return (DataStreamSinkProviderAdapter) dataStream - {// setup configurationlong ckpTimeout dataStream.getExecutionEnvironment().getCheckpointConfig().getCheckpointTimeout();conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);// set up default parallelismOptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());RowType rowType (RowType) schema.toSinkRowDataType().notNull().getLogicalType();// bulk_insert modefinal String writeOperation this.conf.get(FlinkOptions.OPERATION);if (WriteOperationType.fromValue(writeOperation) WriteOperationType.BULK_INSERT) {return Pipelines.bulkInsert(conf, rowType, dataStream);}// Append modeif (OptionsResolver.isAppendMode(conf)) {DataStreamObject pipeline Pipelines.append(conf, rowType, dataStream, context.isBounded());if (OptionsResolver.needsAsyncClustering(conf)) {return Pipelines.cluster(conf, rowType, pipeline);} else {return Pipelines.dummySink(pipeline);}}DataStreamObject pipeline;// bootstrapfinal DataStreamHoodieRecord hoodieRecordDataStream Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);// write pipelinepipeline Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);// compactionif (OptionsResolver.needsAsyncCompaction(conf)) {// use synchronous compaction for bounded source.if (context.isBounded()) {conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);}return Pipelines.compact(conf, pipeline);} else {return Pipelines.clean(conf, pipeline);}};}DataStreamSinkProviderAdapter其实是一个函数式接口它是一种只包含一个抽象方法的接口。Lambda 表达式可以被赋值给一个函数式接口从而实现接口的实例化 public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {DataStreamSink? consumeDataStream(DataStreamRowData dataStream);Overridedefault DataStreamSink? consumeDataStream(ProviderContext providerContext, DataStreamRowData dataStream) {return consumeDataStream(dataStream);} }函数式接口和Lambda 表达式参考下面两篇文章 https://it.sohu.com/a/682888110_100123073 https://blog.csdn.net/Speechless_/article/details/123746047 Table API 知道了 DataStream API 调用步骤后来对比看一下 Table API 的大致调用步骤调试代码入口。 tableEnv.executeSql(String.format(insert into %s values (1,hudi,10,100,2023-05-28), tableName));整体调用流程 1、tableEnv.executeSql-TableEnvironmentImpl.executeSql-executeInternal(Operation operation)-executeInternal(ListModifyOperation operations)-this.translate-(PlannerBase)this.planner.translate 2.1、PlannerBase.translate-PlannerBase.translateToRel-getTableSink(catalogSink.getContextResolvedTable, dynamicOptions)-FactoryUtil.createDynamicTableSink-HoodieTableFactory.createDynamicTableSink 2.2、PlannerBase.translate-(BatchPlanner)translateToPlan(execGraph)-(ExecNodeBase)node.translateToPlan-(BatchExecSink)translateToPlanInternal-(CommonExecSink)createSinkTransformation-(HoodieTableSink)getSinkRuntimeProvider-(CommonExecSink)applySinkProvider-provider.consumeDataStream 具体代码 TableEnvironmentImpl (TableEnvironmentImpl)executeSql public TableResult executeSql(String statement) {ListOperation operations this.getParser().parse(statement);if (operations.size() ! 1) {throw new TableException(Unsupported SQL query! executeSql() only accepts a single SQL statement of type CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW [USER] FUNCTIONS, SHOW PARTITIONSCREATE VIEW, DROP VIEW, SHOW VIEWS, INSERT, DESCRIBE, LOAD MODULE, UNLOAD MODULE, USE MODULES, SHOW [FULL] MODULES.);} else {// 关键步骤executeInternalreturn this.executeInternal((Operation)operations.get(0));}}executeInternal(Operation operation) public TableResultInternal executeInternal(Operation operation) {if (operation instanceof ModifyOperation) {// 关键步骤executeInternalreturn this.executeInternal(Collections.singletonList((ModifyOperation)operation));} else if (operation instanceof StatementSetOperation) {return this.executeInternal(((StatementSetOperation)operation).getOperations());executeInternal(ListModifyOperation operations) public TableResultInternal executeInternal(ListModifyOperation operations) {// 关键步骤translateListTransformation? transformations this.translate(operations);ListString sinkIdentifierNames this.extractSinkIdentifierNames(operations);TableResultInternal result this.executeInternal(transformations, sinkIdentifierNames);if ((Boolean)this.tableConfig.get(TableConfigOptions.TABLE_DML_SYNC)) {try {result.await();} catch (ExecutionException | InterruptedException var6) {result.getJobClient().ifPresent(JobClient::cancel);throw new TableException(Fail to wait execution finish., var6);}}return result;}translate 这里的planner为BatchPlanner因为我们设置了batch模式EnvironmentSettings.inBatchMode() protected ListTransformation? translate(ListModifyOperation modifyOperations) {// 这里的planner为BatchPlanner因为我们设置了batch模式EnvironmentSettings.inBatchMode()// 关键步骤PlannerBase.translatereturn this.planner.translate(modifyOperations);}BatchPlanner (BatchPlanner的父类)PlannerBase.translate override def translate(modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] {beforeTranslation()if (modifyOperations.isEmpty) {return List.empty[Transformation[_]]}// 关键步骤translateToRelval relNodes modifyOperations.map(translateToRel)val optimizedRelNodes optimize(relNodes)val execGraph translateToExecNodeGraph(optimizedRelNodes, isCompiled false)// 关键步骤translateToPlanval transformations translateToPlan(execGraph)afterTranslation()transformations}PlannerBase.translateToRel private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode {val dataTypeFactory catalogManager.getDataTypeFactorymodifyOperation match {case s: UnregisteredSinkModifyOperation[_] val input getRelBuilder.queryOperation(s.getChild).build()val sinkSchema s.getSink.getTableSchema// validate query schema and sink schema, and apply cast if possibleval query validateSchemaAndApplyImplicitCast(input,catalogManager.getSchemaResolver.resolve(sinkSchema.toSchema),null,dataTypeFactory,getTypeFactory)LogicalLegacySink.create(query,s.getSink,UnregisteredSink,ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))case collectModifyOperation: CollectModifyOperation val input getRelBuilder.queryOperation(modifyOperation.getChild).build()DynamicSinkUtils.convertCollectToRel(getRelBuilder,input,collectModifyOperation,getTableConfig,getFlinkContext.getClassLoader)case catalogSink: SinkModifyOperation val input getRelBuilder.queryOperation(modifyOperation.getChild).build()val dynamicOptions catalogSink.getDynamicOptions// 关键步骤getTableSinkgetTableSink(catalogSink.getContextResolvedTable, dynamicOptions).map {case (table, sink: TableSink[_]) // Legacy tables cant be anonymousval identifier catalogSink.getContextResolvedTable.getIdentifier// check the logical field type and physical field type are compatibleval queryLogicalType FlinkTypeFactory.toLogicalRowType(input.getRowType)// validate logical schema and physical schema are compatiblevalidateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)// validate TableSinkvalidateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)// validate query schema and sink schema, and apply cast if possibleval query validateSchemaAndApplyImplicitCast(input,table.getResolvedSchema,identifier.asSummaryString,dataTypeFactory,getTypeFactory)val hints new util.ArrayList[RelHint]if (!dynamicOptions.isEmpty) {hints.add(RelHint.builder(OPTIONS).hintOptions(dynamicOptions).build)}LogicalLegacySink.create(query,hints,sink,identifier.toString,table,catalogSink.getStaticPartitions.toMap)case (table, sink: DynamicTableSink) DynamicSinkUtils.convertSinkToRel(getRelBuilder, input, catalogSink, sink)} match {case Some(sinkRel) sinkRelcase None throw new TableException(sSink ${catalogSink.getContextResolvedTable} does not exists)}PlannerBase.getTableSink private def getTableSink(contextResolvedTable: ContextResolvedTable,dynamicOptions: JMap[String, String]): Option[(ResolvedCatalogTable, Any)] {contextResolvedTable.getTable[CatalogBaseTable] match {case connectorTable: ConnectorCatalogTable[_, _] val resolvedTable contextResolvedTable.getResolvedTable[ResolvedCatalogTable]toScala(connectorTable.getTableSink) match {case Some(sink) Some(resolvedTable, sink)case None None}case regularTable: CatalogTable val resolvedTable contextResolvedTable.getResolvedTable[ResolvedCatalogTable]...if (!contextResolvedTable.isAnonymous TableFactoryUtil.isLegacyConnectorOptions(catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null),tableConfig,isStreamingMode,objectIdentifier,resolvedTable.getOrigin,isTemporary)) {...} else {...// 关键步骤FactoryUtil.createDynamicTableSinkval tableSink FactoryUtil.createDynamicTableSink(factory,objectIdentifier,tableToFind,Collections.emptyMap(),getTableConfig,getFlinkContext.getClassLoader,isTemporary)Option(resolvedTable, tableSink)}case _ None}FactoryUtil.createDynamicTableSink 根据’connector’‘hudi’ 找到factory为org.apache.hudi.table.HoodieTableFactory接着调用HoodieTableFactory.createDynamicTableSink public static DynamicTableSink createDynamicTableSink(Nullable DynamicTableSinkFactory preferredFactory,ObjectIdentifier objectIdentifier,ResolvedCatalogTable catalogTable,MapString, String enrichmentOptions,ReadableConfig configuration,ClassLoader classLoader,boolean isTemporary) {final DefaultDynamicTableContext context new DefaultDynamicTableContext(objectIdentifier,catalogTable,enrichmentOptions,configuration,classLoader,isTemporary);try {// connectorhudi // org.apache.hudi.table.HoodieTableFactoryfinal DynamicTableSinkFactory factory preferredFactory ! null? preferredFactory: discoverTableFactory(DynamicTableSinkFactory.class, context);// 关键步骤HoodieTableFactory.createDynamicTableSinkreturn factory.createDynamicTableSink(context);} catch (Throwable t) {throw new ValidationException(String.format(Unable to create a sink for writing table %s.\n\n Table options are:\n\n %s,objectIdentifier.asSummaryString(),catalogTable.getOptions().entrySet().stream().map(e - stringifyOption(e.getKey(), e.getValue())).sorted().collect(Collectors.joining(\n))),t);}} HoodieTableFactory.createDynamicTableSink 第一个问题解决 public DynamicTableSink createDynamicTableSink(Context context) {Configuration conf FlinkOptions.fromMap(context.getCatalogTable().getOptions());checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),Option [path] should not be empty.);setupTableOptions(conf.getString(FlinkOptions.PATH), conf);ResolvedSchema schema context.getCatalogTable().getResolvedSchema();sanityCheck(conf, schema);setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema);// 关键步骤HoodieTableSinkreturn new HoodieTableSink(conf, schema);}BatchExecSink 回到方法PlannerBase.translate它会在后面调用translateToPlan。execGraph.getRootNodes返回的内容为BatchExecSink 想知道为啥是BatchExecSink可以看PlannerBase.translate中调用的translateToExecNodeGraph方法, BatchExecSink是BatchExecNode的子类所以会执行node.translateToPlan PlannerBase.translateToPlan override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] {beforeTranslation()val planner createDummyPlanner()val transformations execGraph.getRootNodes.map {// BatchExecSink// 关键步骤ExecNodeBase.translateToPlancase node: BatchExecNode[_] node.translateToPlan(planner)case _ throw new TableException(Cannot generate BoundedStream due to an invalid logical plan. This is a bug and should not happen. Please file an issue.)}afterTranslation()transformations}BatchExecSink public class BatchExecSink extends CommonExecSink implements BatchExecNodeObject {... public abstract class CommonExecSink extends ExecNodeBaseObjectimplements MultipleTransformationTranslatorObject {... ExecNodeBase.translateToPlan public final TransformationT translateToPlan(Planner planner) {if (transformation null) {transformation // 关键步骤BatchExecSink.translateToPlanInternaltranslateToPlanInternal((PlannerBase) planner,ExecNodeConfig.of(((PlannerBase) planner).getTableConfig(),persistedConfig,isCompiled));if (this instanceof SingleTransformationTranslator) {if (inputsContainSingleton()) {transformation.setParallelism(1);transformation.setMaxParallelism(1);}}}return transformation;}BatchExecSink.translateToPlanInternal protected TransformationObject translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {final TransformationRowData inputTransform (TransformationRowData) getInputEdges().get(0).translateToPlan(planner);// org.apache.hudi.table.HoodieTableSink final DynamicTableSink tableSink tableSinkSpec.getTableSink(planner.getFlinkContext());// 关键步骤CommonExecSink.createSinkTransformationreturn createSinkTransformation(planner.getExecEnv(), config, inputTransform, tableSink, -1, false);}CommonExecSink.createSinkTransformation 这里的tableSink为HoodieTableSink会调用HoodieTableSink的getSinkRuntimeProvider方法返回runtimeProvider没有执行里面的方法体 protected TransformationObject createSinkTransformation(StreamExecutionEnvironment streamExecEnv,ExecNodeConfig config,TransformationRowData inputTransform,// 这里的tableSink为HoodieTableSinkDynamicTableSink tableSink,int rowtimeFieldIndex,boolean upsertMaterialize) {final ResolvedSchema schema tableSinkSpec.getContextResolvedTable().getResolvedSchema();final SinkRuntimeProvider runtimeProvider // 关键步骤HoodieTableSink.getSinkRuntimeProvidertableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));final RowType physicalRowType getPhysicalRowType(schema);final int[] primaryKeys getPrimaryKeyIndices(physicalRowType, schema);final int sinkParallelism deriveSinkParallelism(inputTransform, runtimeProvider);final int inputParallelism inputTransform.getParallelism();final boolean inputInsertOnly inputChangelogMode.containsOnly(RowKind.INSERT);final boolean hasPk primaryKeys.length 0;...return (TransformationObject)// 关键步骤CommonExecSink.applySinkProviderapplySinkProvider(sinkTransform,streamExecEnv,runtimeProvider,rowtimeFieldIndex,sinkParallelism,config);} CommonExecSink.applySinkProvider 先通过new DataStream(env, sinkTransformation)生成dataStream接着通过执行provider.consumeDataStream调用HoodieTableSink.getSinkRuntimeProvider中的方法体这里的provider为HoodieTableSink.getSinkRuntimeProvider返回的DataStreamSinkProviderAdapter private Transformation? applySinkProvider(TransformationRowData inputTransform,StreamExecutionEnvironment env,SinkRuntimeProvider runtimeProvider,int rowtimeFieldIndex,int sinkParallelism,ExecNodeConfig config) {TransformationMetadata sinkMeta createTransformationMeta(SINK_TRANSFORMATION, config);if (runtimeProvider instanceof DataStreamSinkProvider) {TransformationRowData sinkTransformation applyRowtimeTransformation(inputTransform, rowtimeFieldIndex, sinkParallelism, config);// 生成dataStreamfinal DataStreamRowData dataStream new DataStream(env, sinkTransformation);final DataStreamSinkProvider provider (DataStreamSinkProvider) runtimeProvider;// 关键步骤provider.consumeDataStreamreturn provider.consumeDataStream(createProviderContext(config), dataStream).getTransformation();} else if (runtimeProvider instanceof TransformationSinkProvider) {...provider.consumeDataStream(已经在上面的类DataStreamSinkProviderAdapter提过 它会调用HoodieTableSink.getSinkRuntimeProvider中的方法体Lambda 表达式执行后面的写hudi逻辑 第二个问题解决 default DataStreamSink? consumeDataStream(ProviderContext providerContext, DataStreamRowData dataStream) {return consumeDataStream(dataStream);}总结 本文主要简单记录了自己调试 Hudi Flink SQL 源码的过程并没有对源码进行深入的分析自己水平也不够。主要目的是为了弄清楚从Table API 的入口到createDynamicTableSink返回HoodieTableSink的主要代码步骤以及在哪里调用HoodieTableSink.getSinkRuntimeProvider的方法体以进行后面的写Hudi逻辑这样便于后面对Hudi源码的分析和学习。 本文新学习知识点函数式接口以及对应的 Lambda 表达式的实现
http://www.zqtcl.cn/news/829222/

相关文章:

  • 深圳公司的网站设计网页制作视频教程下载
  • 动漫网站开发优势网站做电话线用
  • 河南移动商城网站建设广州营销型企业网站建设
  • 佛山做网站公司个人账号密码网站建设
  • 做零售网站智慧建筑信息平台
  • 山西住房建设厅官方网站建设部建造师网站
  • 加大门户网站安全制度建设wordpress切换数据库
  • 百度代理服务器株洲seo优化
  • 即刻搜索网站提交入口网站中的打赏怎么做的
  • 电子商务网站建设课后作业开发公司管理制度
  • mysql同一数据库放多少个网站表优化大师windows
  • 微信小程序插件开发seo的网站建设
  • 婚纱摄影网站建设方案WordPress 同步网易博客
  • 上海长宁网站建设公司python语言基础
  • 官方网站怎样做餐饮业手机php网站
  • 网站建设企业有哪些内容十九届六中全会
  • 如何管理手机网站首页怎么建设一个社交网站
  • 网站规则山东网站备案网站
  • 成都网站制作龙兵科技做网站原型图用什么软件
  • 鄂州网站网站建设做网站 用哪种
  • 医药公司网站建设厦门网站建设合同
  • 网站开发全程设计注册公司哪个网站
  • 广州大型网站设计公司网站总体设计怎么写
  • 福州网站制作工具搜索引擎营销的特点是什么
  • 安徽省建设干部网站新品网络推广
  • 做网站要实名吗怎样给一个公司做网站
  • 品牌官方网站建设大航母网站建设
  • 自己做音乐网站挣钱吗网站定制公司kinglink
  • 网站建设案例新闻随州程力网站建设
  • 国外网站平台龙岩天宫山缆车收费