当前位置: 首页 > news >正文

网站做多久能盈利网站建设公司工作流程

网站做多久能盈利,网站建设公司工作流程,卢氏住房和城乡建设厅网站,推广网站的软文1 概述 Flink在做流数据计算时#xff0c;经常要外部系统进行交互#xff0c;如Redis、Hive、HBase等等存储系统。系统间通信延迟是否会拖慢整个Flink作业#xff0c;影响整体吞吐量和实时性。 如需要查询外部数据库以关联上用户的额外信息#xff0c;通常的实现方式是向数…1 概述 Flink在做流数据计算时经常要外部系统进行交互如Redis、Hive、HBase等等存储系统。系统间通信延迟是否会拖慢整个Flink作业影响整体吞吐量和实时性。 如需要查询外部数据库以关联上用户的额外信息通常的实现方式是向数据库发送用户a的查询请求如在MapFunction中然后等待结果返回返回之后才能进行下一次查询请求这是一种同步访问的模式如下图左边所示网络等待时间极大的阻碍了吞吐和延迟。 Flink从1.2版本开始就引入了Async I/Ohttps://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html。异步模式可以并发的处理多个请求和回复也就是说你可以连续的向数据库发送用户a、b、c、d等的请求与此同时哪个请求的回复先返回了就处理哪个回复从而连续的请求之间不需要阻塞等待如上图右边所示这也是Async I/O的实现原理。 2 Future和CompletableFuture 先了解一下Future和CompletableFuture 2.1 Future 从JDK1.5开始提供了Future来表示异步计算的结果一般需要结合ExecutorService执行者和Callable任务来使用。Future的get方法是阻塞的 package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心线程池大小5 最大线程池大小10 线程最大空闲时间60 时间单位s 线程等待队列ThreadPoolExecutor executor new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));FutureLong future executor.submit(() - {// 故意耗时Thread.sleep(3000);return System.currentTimeMillis();});System.out.println(future.get());System.out.println(因为get是阻塞的所以这个消息在数据之后输出);executor.shutdown();} } 结果为 1612337847685 因为get是阻塞的所以这个消息在数据之后输出Future只是个接口实际上返回的类是FutureTask public T FutureT submit(CallableT task) {if (task null) throw new NullPointerException();RunnableFutureT ftask newTaskFor(task);execute(ftask);return ftask;}protected T RunnableFutureT newTaskFor(CallableT callable) {return new FutureTaskT(callable);} FutureTask的get方法如下 private volatile int state;private static final int NEW 0;private static final int COMPLETING 1;private static final int NORMAL 2;private static final int EXCEPTIONAL 3;private static final int CANCELLED 4;private static final int INTERRUPTING 5;private static final int INTERRUPTED 6;public V get() throws InterruptedException, ExecutionException {int s state;// 首先判断FutureTask的状态是否为完成状态如果是完成状态说明已经执行过set或setException方法返回report(s)。任务的运行状态。最初是NEW 0。运行状态仅在set、setException和cancel方法中转换为终端状态。if (s COMPLETING)//如果get时,FutureTask的状态为未完成状态则调用awaitDone方法进行阻塞s awaitDone(false, 0L);return report(s);}/*** awaitDone方法可以看成是不断轮询查看FutureTask的状态。在get阻塞期间①如果执行get的线程被中断则移除FutureTask的所有阻塞队列中的线程waiters,并抛出中断异常②如果FutureTask的状态转换为完成状态正常完成或取消则返回完成状态③如果FutureTask的状态变为COMPLETING, 则说明正在set结果此时让线程等一等④如果FutureTask的状态为初始态NEW则将当前线程加入到FutureTask的阻塞线程中去⑤如果get方法没有设置超时时间则阻塞当前调用get线程如果设置了超时时间则判断是否达到超时时间如果到达则移除FutureTask的所有阻塞列队中的线程并返回此时FutureTask的状态如果未到达时间则在剩下的时间内继续阻塞当前线程。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline timed ? System.nanoTime() nanos : 0L;WaitNode q null;boolean queued false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s state;if (s COMPLETING) {if (q ! null)q.thread null;return s;}else if (s COMPLETING) // cannot time out yetThread.yield();else if (q null)q new WaitNode();else if (!queued)queued UNSAFE.compareAndSwapObject(this, waitersOffset,q.next waiters, q);else if (timed) {nanos deadline - System.nanoTime();if (nanos 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}Future的局限性 ①可以发现虽然 Future接口可以构建异步应用但是对于结果的获取却是很不方便只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背轮询的方式又会耗费无谓的 CPU 资源而且也不能及时地得到计算结果。 ②它很难直接表述多个Future 结果之间的依赖性。实际开发中经常需要将多个异步计算的结果合并成一个或者等待Future集合中的所有任务都完成或者任务完成以后触发执行动作 2.2 CompletableFuture JDk1.8引入了CompletableFuture它实际上也是Future的实现类。这里可以得出 CompletableFuture有一些新特性能完成Future不能完成的工作。 public class CompletableFutureT implements FutureT, CompletionStageT {首先看类定义实现了CompletionStage接口这个接口是所有的新特性了。 对于CompletableFuture有四个执行异步任务的方法 public static U CompletableFutureU supplyAsync(SupplierU supplier) public static U CompletableFutureU supplyAsync(SupplierU supplier, Executor executor) public static CompletableFutureVoid runAsync(Runnable runnable) public static CompletableFutureVoid runAsync(Runnable runnable, Executor executor)supply开头的带有返回值run开头的无返回值。如果我们指定线程池则会使用我么指定的线程池如果没有指定线程池默认使用ForkJoinPool.commonPool()作为线程池。 public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心线程池大小5 最大线程池大小10 线程最大空闲时间60 时间单位s 线程等待队列ThreadPoolExecutor executor new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));CompletableFutureString future CompletableFuture.supplyAsync(() - {return hello;}, executor);System.out.println(future.get());executor.shutdown();}上面只是对执行异步任务如果要利用计算结果进一步处理使用进行结果转换有如下方法①thenApply 同步②thenApplyAsync异步 // 同步转换 public U CompletableFutureU thenApply(Function? super T,? extends U fn) // 异步转换使用默认线程池 public U CompletableFutureU thenApplyAsync(Function? super T,? extends U fn) // 异步转换使用指定线程池 public U CompletableFutureU thenApplyAsync(Function? super T,? extends U fn, Executor executor)package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心线程池大小5 最大线程池大小10 线程最大空闲时间60 时间单位s 线程等待队列ThreadPoolExecutor executor new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));CompletableFutureLong future CompletableFuture// 执行异步任务.supplyAsync(() - {return System.currentTimeMillis();}, executor)// 对前面的结果进行处理.thenApply(n - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}Long time System.currentTimeMillis();System.out.println(如果是同步的这条消息应该先输出);return time-n;});System.out.println(等待2秒);System.out.println(future.get());executor.shutdown();} }结果为 如果是同步的这条消息应该先输出 等待2秒 2017如果把thenApply换成thenApplyAsync结果如下 等待2秒 如果是同步的这条消息应该先输出 2008处理完任务以及结果该去消费了有如下方法①thenAccept能够拿到并利用执行结果 ② thenRun不能够拿到并利用执行结果只是单纯的执行其它任务③thenAcceptBoth能传入另一个stage然后把另一个stage的结果和当前stage的结果作为参数去消费。 public CompletableFutureVoid thenAccept(Consumer? super T action) public CompletableFutureVoid thenAcceptAsync(Consumer? super T action) public CompletableFutureVoid thenAcceptAsync(Consumer? super T action, Executor executor)public CompletableFutureVoid thenRun(Runnable action) public CompletableFutureVoid thenRunAsync(Runnable action) public CompletableFutureVoid thenRunAsync(Runnable action, Executor executor)public U CompletableFutureVoid thenAcceptBoth(CompletionStage? extends U other, BiConsumer? super T, ? super U action) public U CompletableFutureVoid thenAcceptBothAsync(CompletionStage? extends U other, BiConsumer? super T, ? super U action) public U CompletableFutureVoid thenAcceptBothAsync(CompletionStage? extends U other, BiConsumer? super T, ? super U action, Executor executor)如果要组合两个任务有如下方法①thenCombine(至少两个方法参数一个为其它stage一个为用户自定义的处理函数函数返回值为结果类型) ② thenCompose(至少一个方法参数即处理函数函数返回值为stage类型) public U,V CompletionStageV thenCombine(CompletionStage? extends U other, BiFunction? super T,? super U,? extends V fn) public U,V CompletionStageV thenCombineAsync(CompletionStage? extends U other, BiFunction? super T,? super U,? extends V fn) public U,V CompletionStageV thenCombineAsync(CompletionStage? extends U other, BiFunction? super T,? super U,? extends V fn, Executor executor)public U CompletionStageU thenCompose(Function? super T, ? extends CompletionStageU fn) public U CompletionStageU thenComposeAsync(Function? super T, ? extends CompletionStageU fn) public U CompletionStageU thenComposeAsync(Function? super T, ? extends CompletionStageU fn, Executor executor)如果有多条渠道去完成同一种任务选择最快的那个有如下方法①applyToEither (有返回值)②acceptEither(没有返回值) public U CompletionStageU applyToEither(CompletionStage? extends T other, Function? super T, U fn) public U CompletionStageU applyToEitherAsync(CompletionStage? extends T other, Function? super T, U fn) public U CompletionStageU applyToEitherAsync(CompletionStage? extends T other, Function? super T, U fn, Executor executor)public CompletionStageVoid acceptEither(CompletionStage? extends T other, Consumer? super T action) public CompletionStageVoid acceptEitherAsync(CompletionStage? extends T other, Consumer? super T action) public CompletionStageVoid acceptEitherAsync(CompletionStage? extends T other, Consumer? super T action, Executor executor)Future和CompletableFuture对比 Future只能通过get方法或者死循环判断isDone来获取。异常情况不好处理。 CompletableFuture只要设置好回调函数即可实现①只要任务完成就执行设置的函数不用考虑什么时候任务完成②如果发生异常会执行处理异常的函数③能应付复杂任务的处理如果有复杂任务比如依赖问题组合问题等同样可以写好处理函数来处理 3 使用Aysnc I/O的条件 1具有对外部系统进行异步IO访问的客户端API如使用vertx但是目前只支持scala 2.12的版本可以使用java类库来做 2没有这样的客户端可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转变为有限的并发客户端如可以写ExecutorService来实现。但是这种方法通常比适当的异步客户端效率低。 4 Aysnc I/O的案例 4.1 有外部系统进行异步IO访问的客户端API的方式 // 这个例子实现了异步请求和回调的Futures具有Java8的Futures接口(与Flink的Futures相同)class AsyncDatabaseRequest extends RichAsyncFunctionString, Tuple2String, String {// 定义连接客户端并且不参与序列化private transient DatabaseClient client;Overridepublic void open(Configuration parameters) throws Exception {// 创建连接client new DatabaseClient(host, post, credentials);}Overridepublic void close() throws Exception {client.close();}Overridepublic void asyncInvoke(String key, final ResultFutureTuple2String, String resultFuture) throws Exception {// 用连接进行查询查询之后返回的是future有可能有有可能没有final FutureString result client.query(key);// 如果有结果返回的话会通知你(有个回调方法)这里可以设置超时时间如果超过了一定的时间还没有返回相当于从这里取一取就会抛异常结果就会返回nullCompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// Normally handled explicitly.return null;}}//如果它已经执行完了就会把结果放到Collections里面}).thenAccept( (String dbResult) - {resultFuture.complete(Collections.singleton(new Tuple2(key, dbResult)));});} }// create the original stream DataStreamString stream ...;// unorderedWait这个是不在乎请求返回的顺序的,里面用到的是阻塞队列队列满了会阻塞队列里面一次最多可以有100个异步请求超时时间是1000毫秒 DataStreamTuple2String, String resultStream AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);4.2 没有外部系统进行异步IO访问的客户端API的方式 package com.quinto.flink;import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Collections; import java.util.concurrent.*; import java.util.function.Supplier;class AsyncDatabaseRequest extends RichAsyncFunctionString,String {// 这里用到了连接池以前查询是阻塞的查询完这个下一个还是同个连接// 现在要发送多个请求不能用同个连接每个请求都会返回一个结果。这里不但要用到连接池还要用到线程池。private transient DruidDataSource druidDataSource;private transient ExecutorService executorService;Overridepublic void open(Configuration parameters) throws Exception {executorService Executors.newFixedThreadPool(20);druidDataSource new DruidDataSource();druidDataSource.setDriverClassName(com.mysql.jdbc.Driver);druidDataSource.setUsername(root);druidDataSource.setPassword(root);druidDataSource.setUrl(jdbc:mysql:..localhost:3306/bigdata?characterEncodingUTF-8);druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(10);druidDataSource.setMaxActive(20);}Overridepublic void close() throws Exception {druidDataSource.close();executorService.shutdown();}Overridepublic void asyncInvoke(String input,final ResultFutureString resultFuture) {// 向线程池丢入一个线程FutureString future executorService.submit(() - {String sql SELECT id,name FROM table WHERE id ?;String result null;Connection connection null;PreparedStatement stmt null;ResultSet rs null;try {connection druidDataSource.getConnection();stmt connection.prepareStatement(sql);rs stmt.executeQuery();while (rs.next()){result rs.getString(name);}}finally {if (rs!null){rs.close();}if (stmt!null){stmt.close();}if (connection!null){connection.close();}}return result;});// 接收任务的处理结果并消费处理无返回结果。CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {// 从future里面把结果取出来如果有就返回没有的话出异常就返回nullreturn future.get();} catch (Exception e) {return null;}}// 拿到上一步的执行结果进行处理}).thenAccept((String result)-{// 从future里面取出数据会有一个回调然后会把他放到resultFuture,complete中要求放的是一个集合所以需要进行转换resultFuture.complete(Collections.singleton(result));});} }这样mysql的API还是用他原来的只不过把mysql的查询使用把要查询的功能丢线程池。以前查询要好久才返回现在来一个查询就丢到线程池里面不需要等待结果返回的结果放在future里面。原来查询是阻塞的现在开启一个线程查把查询结果丢到future里面。相当于新开一个线程让他帮我查原来是单线程的现在开多个线程同时查然后把结果放future以后有结果了从这里面取。
http://www.zqtcl.cn/news/481168/

相关文章:

  • html免费网站模板下载有什么网站学做标书的
  • 哪里做网站seo深圳专业做网站专业
  • 网站建设名词解析自己制作免费网页
  • 网站开发深圳公司企业自助建站的网站
  • 珠海网站建设平台中国软文网官网
  • 绵阳学校网站建设wordpress 采集站
  • 免费设计软件下载网站大全贵州seo技术培训
  • wordpress网站+搬家自做购物网站多少钱
  • 用自己网站做淘宝客深圳上市公司一览表
  • 如何用图片文字做网站建设部网站安全事故
  • 订制网站网易企业邮箱怎么修改密码
  • 一小时做网站网上免费设计效果图
  • 网站如何注册域名公司主页填什么
  • 南宁国贸网站建设网站跟网页有什么区别
  • 兰州企业 网站建设短链接在线转换
  • 长沙网上商城网站建设方案导航网站系统
  • 网站更换目录名如何做301跳转网站活泼
  • 化妆品网站网页设计怎样在淘宝网做网站
  • 邢台建站湛江海田网站建设招聘
  • 免费个人网站建站能上传视频吗中国舆情在线网
  • 网站开发项目的心得体会惠州建设厅网站
  • 网站小程序怎么做北京单位网站建设培训
  • 北京市专业网站建设广州安全教育平台登录账号登录入口
  • 广州做网站的价格三个关键词介绍自己
  • 基于工作过程的商务网站建设:网页制作扬州网站建设公元国际
  • wordpress著名网站微信公众号怎么做网站链接
  • 长沙网站建设大概多少钱深圳做网站网络营销公司
  • 融资平台排行榜企业网站seo运营
  • 英文手表网站南昌装修网站建设
  • 网站建设要懂哪些技术甘肃园区网络搭建