当前位置: 首页 > news >正文

腾云网建设网站关于设计的网站

腾云网建设网站,关于设计的网站,建筑网官网查询,湖南平台网站建设方案Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala Shell、SQL Client、Restful API和 Web五个方面进行整理。 在Flink安装目录的bin目录下可以看到flink#xff0c;start-scala-shell.sh和sql-client.sh等文件#xff0c;这些都是客户…Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、Scala Shell、SQL Client、Restful API和 Web五个方面进行整理。 在Flink安装目录的bin目录下可以看到flinkstart-scala-shell.sh和sql-client.sh等文件这些都是客户端操作的入口。 flink 常见操作可以通过 -help 查看帮助 run 运行任务 -d以分离模式运行作业 -c如果没有在jar包中指定入口类则需要在这里通过这个参数指定 -m指定需要连接的jobmanager(主节点)地址使用这个参数可以指定一个不同于配置文件中的jobmanager可以说是yarn集群名称 -p指定程序的并行度。可以覆盖配置文件中的默认值 -s保存点savepoint的路径以还原作业来自例如hdfs:///flink/savepoint-1537) [roothadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID dce7b69ad15e8756766967c46122736f就可以看到我们提交的JobManager默认是一个并发。 点进去就可以看到详细的信息 点击左侧TaskManager —Stdout能看到具体输出的日志信息。 或者查看TaskManager节点的log目录下的*.out文件也能看到具体的输出信息。 list 查看任务列表 -mjobmanagerarg作业管理器主的地址连接。 [roothadoop1 flink-1.10.1]# bin/flink list -m 127.0.0.1:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 09.07.2020 16:44:09 : dce7b69ad15e8756766967c46122736f : CarTopSpeedWindowingExample (RUNNING) -------------------------------------------------------------- No scheduled jobs.Stop 停止任务 需要指定jobmanager的ip:prot和jobId。如下报错可知一个job能够被stop要求所有的source都是可以stoppable的即实现了 StoppableFunction接口。 [roothadoop1 flink-1.10.1]# bin/flink stop -m 127.0.0.1:8081 dce7b69ad15e8756766967c46122736f Suspending job dce7b69ad15e8756766967c46122736f with a savepoint.------------------------------------------------------------The program finished with the following exception:org.apache.flink.util.FlinkException: Could not stop with a savepoint job dce7b69ad15e8756766967c46122736f.at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:458)StoppableFunction接口如下属于优雅停止任务。 /*** Description 需要 stoppabel 的函数必须实现此接口例如流式任务 source** stop() 方法在任务收到 stop信号的时候调用* source 在接收到这个信号后必须停止发送新的数据优雅的停止。* Date 2020/7/9 17:26*/PublicEvolvingpublic interface StoppableFunction {/*** 停止 source与 cancel() 不同的是这是一个让 source优雅停止的请求。* 等待中的数据可以继续发送出去不需要立即停止*/void stop(); }Cancel 取消任务 如果在conf/flink-conf.yaml里面配置state.savepoints.dir会保存savepoint否则不会保存savepoint。重启 state.savepoints.dir: file:///tmp/savepoint执行 Cancel命令 取消任务 [roothadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s e8ce0d111262c52bf8228d5722742d47 DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use stop instead. Cancelling job e8ce0d111262c52bf8228d5722742d47 with savepoint to default savepoint directory. Cancelled job e8ce0d111262c52bf8228d5722742d47. Savepoint stored in file:/tmp/savepoint/savepoint-e8ce0d-f7fa96a085d8.也可以在停止的时候显示指定savepoint目录 1 [roothadoop1 flink-1.10.1]# bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint f58bb4c49ee5580ab5f27fdb24083353 DEPRECATION WARNING: Cancelling a job with savepoint is deprecated. Use stop instead. Cancelling job f58bb4c49ee5580ab5f27fdb24083353 with savepoint to /tmp/savepoint. Cancelled job f58bb4c49ee5580ab5f27fdb24083353. Savepoint stored in file:/tmp/savepoint/savepoint-f58bb4-127b7e84910e.取消和停止流作业的区别如下 ● cancel()调用 立即调用作业算子的cancel()方法以尽快取消它们。如果算子在接到cancel()调用后没有停止Flink将开始定期中断算子线程的执行直到所有算子停止为止。 ● stop()调用 是更优雅的停止正在运行流作业的方式。stop()仅适用于source实现了StoppableFunction接口的作业。当用户请求停止作业时作业的所有source都将接收stop()方法调用。直到所有source正常关闭时作业才会正常结束。这种方式使 作业正常处理完所有作业。 触发 savepoint 当需要生成savepoint文件时需要手动触发savepoint。如下需要指定正在运行的 JobID 和生成文件的存放目录。同时我们也可以看到它会返回给用户存放的savepoint的文件名称等信息。 [roothadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set.Use --input to specify file input.Printing result to stdout. Use --output to specify output path.Job has been submitted with JobID 216c427d63e3754eb757d2cc268a448d[roothadoop1 flink-1.10.1]# bin/flink savepoint -m 127.0.0.1:8081 216c427d63e3754eb757d2cc268a448d /tmp/savepoint/Triggering savepoint for job 216c427d63e3754eb757d2cc268a448d.Waiting for response...Savepoint completed. Path: file:/tmp/savepoint/savepoint-216c42-154a34cf6bfdYou can resume your program from this savepoint with the run command.savepoint和checkpoint的区别 ● checkpoint是增量做的每次的时间较短数据量较小只要在程序里面启用后会自动触发用户无须感知savepoint是全量做的每次的时间较长数据量较大需要用户主动去触发。 ● checkpoint是作业failover的时候自动使用不需要用户指定。savepoint一般用于程序的版本更新bug修复A/B Test等场景需要用户指定。 从指定 savepoint 中启动 [roothadoop1 flink-1.10.1]# bin/flink run -d -s /tmp/savepoint/savepoint-f58bb4-127b7e84910e/ examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 1a5c5ce279e0e4bd8609f541b37652e2查看JobManager的日志能够看到Reset the checkpoint ID为我们指定的savepoint文件中的ID modify 修改任务并行度 这里修改master的conf/flink-conf.yaml将task slot数修改为4。并通过xsync分发到 两个slave节点上。 taskmanager.numberOfTaskSlots: 4修改参数后需要重启集群生效关闭/启动集群 [roothadoop1 flink-1.10.1]# bin/stop-cluster.sh bin/start-cluster.sh Stopping taskexecutor daemon (pid: 8236) on host hadoop2. Stopping taskexecutor daemon (pid: 8141) on host hadoop3. Stopping standalonesession daemon (pid: 22633) on host hadoop1. Starting cluster. Starting standalonesession daemon on host hadoop1. Starting taskexecutor daemon on host hadoop2. Starting taskexecutor daemon on host hadoop3.启动任务 [roothadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/TopSpeedWindowing.jar Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 2e833a438da7d8052f14d5433910515a从页面上能看到Task Slots总计变为了8运行的Slot为1剩余Slot数量为7。 这时候默认的并行度是1 Flink1.0版本命令行flink modify已经没有这个行为了被移除了。。。Flink1.7上是可以运行的。 [roothadoop1 flink-1.10.1]# bin/flink modify -p 4 cc22cc3d09f5d65651d637be6fb0a1c3 modify is not a valid action.Info 显示程序的执行计划 [roothadoop1 flink-1.10.1]# bin/flink info examples/streaming/TopSpeedWindowing.jar ----------------------- Execution Plan ----------------------- {nodes:[{id:1,type:Source: Custom Source,pact:Data Source,contents:Source: Custom Source,parallelism:1},{id:2,type:Timestamps/Watermarks,pact:Operator,contents:Timestamps/Watermarks,parallelism:1,predecessors:[{id:1,ship_strategy:FORWARD,side:second}]},{id:4,type:Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction),pact:Operator,contents:Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction),parallelism:1,predecessors:[{id:2,ship_strategy:HASH,side:second}]},{id:5,type:Sink: Print to Std. Out,pact:Data Sink,contents:Sink: Print to Std. Out,parallelism:1,predecessors:[{id:4,ship_strategy:FORWARD,side:second}]}]} --------------------------------------------------------------拷贝输出的json内容粘贴到这个网站http://flink.apache.org/visualizer/可以生成类似如下的执行图。 可以与实际运行的物理执行计划进行对比。 SQL Client Beta 进入 Flink SQL [roothadoop1 flink-1.10.1]# bin/sql-client.sh embeddedSelect查询按Q退出如下界面 Flink SQL select hello word;SQL Query Result (Table)Table program finished. Page: Last of 1 Updated: 16:37:04.649EXPR$0hello wordQ Quit Inc Refresh G Goto Page N Next Page O Open Row R Refresh - Dec Refresh L Last Page P Prev Page打开http://hadoop1:8081能看到这条select语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的Custom Source输出用的是Stream Collect Sink且只输出一条结果。 explain 查看 SQL 的执行计划。 Flink SQL explain SELECT name, COUNT(*) AS cnt FROM (VALUES (Bob), (Alice), (Greg), (Bob)) AS NameTable(name) GROUP BY name;Abstract Syntax Tree //抽象语法树 LogicalAggregate(group[{0}], cnt[COUNT()]) - LogicalValues(type[RecordType(VARCHAR(5) name)], tuples[[{ _UTF-16LEBob }, { _UTF-16LEAlice }, { _UTF-16LEGreg }, { _UTF-16LEBob }]]) Optimized Logical Plan //优化后的逻辑执行计划 GroupAggregate(groupBy[name], select[name, COUNT(*) AS cnt]) - Exchange(distribution[hash[name]])- Values(type[RecordType(VARCHAR(5) name)], tuples[[{ _UTF-16LEBob }, { _UTF-16LEAlice }, { _UTF-16LEGreg }, { _UTF-16LEBob }]]) Physical Execution Plan //物理执行计划 Stage 13 : Data Sourcecontent : Source: Values(tuples[[{ _UTF-16LEBob }, { _UTF-16LEAlice }, { _UTF-16LEGreg }, { _UTF-16LEBob }]])Stage 15 : Operatorcontent : GroupAggregate(groupBy[name], select[name, COUNT(*) AS cnt])ship_strategy : HASH结果展示 SQL Client支持两种模式来维护并展示查询结果 table mode 在内存中物化查询结果并以分页table形式展示。用户可以通过以下命令启用table mode例如如下案例 Flink SQL SET execution.result-modetable; [INFO] Session property has been set.Flink SQL SELECT name, COUNT(*) AS cnt FROM (VALUES (Bob), (Alice), (Greg), (Bob)) AS NameTable(name) GROUP BY name;SQL Query Result (Table)Table program finished. Page: Last of 1 Updated: 16:55:08.589name cntAlice 1Greg 1Bob 2Q Quit Inc Refresh G Goto Page N Next Page O Open Row R Refresh - Dec Refresh L Last Page P Prev Pagechangelog mode 不会物化查询结果而是直接对continuous query产生的添加和撤回retractions结果进行展示如下案例中的-表示撤回消息 Flink SQL SET execution.result-modechangelog; [INFO] Session property has been set.Flink SQL SELECT name, COUNT(*) AS cnt FROM (VALUES (Bob), (Alice), (Greg), (Bob)) AS NameTable(name) GROUP BY name;SQL Query Result (Changelog)Table program finished. Updated: 16:58:05.777/- name cnt Bob 1 Alice 1 Greg 1- Bob 1 Bob 2Q Quit Inc Refresh O Open Row R Refresh - Dec RefreshEnvironment Files CREATE TABLE 创建表DDL语句 Flink SQL CREATE TABLE pvuv_sink (dt VARCHAR,pv BIGINT,uv BIGINT) ; [INFO] Table has been created.SHOW TABLES 查看所有表名 Flink SQL show tables; pvuv_sinkDESCRIBE 表名 查看表的详细信息 Flink SQL describe pvuv_sink; root|-- dt: STRING|-- pv: BIGINT|-- uv: BIGINT插入等操作均与关系型数据库操作语句一样省略N个操作 Restful API 接下来我们演示如何通过rest api来提交jar包和执行任务。 通过Show Plan可以看到执行图 提交之后的操作取消的话点击页面的Cancel Job
http://www.zqtcl.cn/news/166989/

相关文章:

  • 广州越秀建网站济南房产网新开楼盘
  • 线上咨询预约网站建设方案保定外贸网站制作
  • 网站流量如何增加提高工作效率的措施
  • 龙湖镇华南城网站建设.net 网站开发书籍
  • 域名费用和网站服务器费用是同样的吗推广营销方案
  • 安徽网站设计方案中文外贸网站有哪些
  • 衡阳手机网站设计响应式网站做多大的尺寸
  • 海尔电子商务网站建设预算灵台县门户网
  • 四川网站建设设计公司排名开发公司与建筑公司合作协议
  • 江西智能网站建设嘉定注册公司
  • 海口网站建设联系方式十大免费软文推广平台
  • 石碣镇做网站帮别人做网站开价
  • 站长 网站ip客户都不愿意做网站
  • 网站开发和软件开发哪个难网站备案账号
  • 2昌平区网站建设安徽盛绿建设网站
  • 商务网站建设目的天津建设网站需要的费用
  • flash 网站头部wordpress支持大文件上传
  • 网站开发方式的选择凡客设计
  • 常德建设网站如何查询某个网站的设计公司
  • wordpress 仿站教程学校ui设计培训
  • 南昌模板建站定制网站合肥瑶海区网站建设价格
  • 奥尔马手表官方网站导出wordpress文章
  • 网站栏目内容和功能手机网站建设 如何获得更好的排名
  • 网站运营推广难做常德网警
  • 北滘网站建设公司在百度上做网站怎么做
  • 合肥网站建设 毅耘园林设计网站大全
  • 免费备案网站空间爱营销app
  • 郑州网站建设公网站建设需要步骤
  • 源创派网站建设做软件赚钱的网站有哪些
  • 中英文网站建设公司推广引流