网站建设服务版权归谁,个人兴趣图片集网站建设,wordpress信息收集表单制作,甘肃省网站建设咨询rxjava 循环发送事件Spring Framework 4.2 GA即将发布#xff0c;让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter #xff0c;它是对Spring MVC控制器中容易使用的发送事件的抽象。 SSE是一项技术#xff0c;可让您在一个HTTP连接内沿一… rxjava 循环发送事件 Spring Framework 4.2 GA即将发布让我们看一下它提供的一些新功能。 引起我注意的一个事件是一个简单的新类SseEmitter 它是对Spring MVC控制器中容易使用的发送事件的抽象。 SSE是一项技术可让您在一个HTTP连接内沿一个方向将数据从服务器流式传输到浏览器。 听起来像是websocket可以做什么的子集。 但是由于它是一个简单得多的协议因此可以在不需要全双工的情况下使用例如实时推动股价变化或显示长时间运行的进程。 这将是我们的例子。 假设我们有一个具有以下API的虚拟硬币矿工 public interface CoinMiner {BigDecimal mine() {//...}
} 每次调用mine()我们都必须等待几秒钟才能获得大约1个硬币的回报平均。 如果要挖掘多个硬币则必须多次调用此方法 RestController
public class MiningController {//...RequestMapping(/mine/{count})void mine(PathVariable int count) {IntStream.range(0, count).forEach(x - coinMiner.mine());}} 这项工作我们可以请求/mine/10和mine()方法将执行10次。 到目前为止一切都很好。 但是挖掘是一项占用大量CPU的任务将计算分散到多个内核将是有益的。 此外即使使用并行化我们的API端点也相当慢我们必须耐心等待直到所有工作完成而没有任何进度通知。 让我们首先修复并行性–但是由于并行流无法控制底层线程池因此我们来使用显式的ExecutorService Component
class CoinMiner {CompletableFutureBigDecimal mineAsync(ExecutorService executorService) {return CompletableFuture.supplyAsync(this::mine, executorService);}//...} 客户端代码必须显式提供ExecutorService 只是设计选择 RequestMapping(/mine/{count})
void mine(PathVariable int count) {final ListCompletableFutureBigDecimal futures IntStream.range(0, count).mapToObj(x - coinMiner.mineAsync(executorService)).collect(toList());futures.forEach(CompletableFuture::join);
} 首先多次调用mineAsync 然后作为第二阶段等待所有期货完成并join 这mineAsync重要。 很容易写 IntStream.range(0, count).mapToObj(x - coinMiner.mineAsync(executorService)).forEach(CompletableFuture::join); 但是由于Java 8中流的惰性该任务将按顺序执行 如果您还不习惯流的懒惰请始终从下至上阅读它们我们要求join一些将来的内容以便流上升并只调用一次mineAsync() 惰性并将其传递给join() 。 当join()完成时它再次上升并要求另一个Future 。 通过使用collect()我们强制所有mineAsync()执行开始所有异步计算。 稍后我们等待每一个。 介绍 现在该变得更具React性了我说过了。 控制器可以返回SseEmitter的实例。 从处理程序方法return后容器线程将被释放并可以处理更多即将到来的请求。 但是连接没有关闭客户端一直在等待 我们应该做的是保留对SseEmitter实例的引用并在以后从另一个线程调用其send()和complete方法。 例如我们可以启动一个长时间运行的进程并保持send()从任意线程进行进度。 完成该过程后我们complete() SseEmitter 最后关闭HTTP连接至少从逻辑SseEmitter 请记住Keep-alive 。 在下面的示例中我们有一堆CompletableFuture 当每个CompletableFuture完成时我们只需将1发送给客户端 notifyProgress() 。 当所有期货都完成后我们完成流 thenRun(sseEmitter::complete) 关闭连接 RequestMapping(/mine/{count})
SseEmitter mine(PathVariable int count) {final SseEmitter sseEmitter new SseEmitter();final ListCompletableFutureBigDecimal futures mineAsync(count);futures.forEach(future -future.thenRun(() - notifyProgress(sseEmitter)));final CompletableFuture[] futuresArr futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter) {try {sseEmitter.send(1);} catch (IOException e) {throw new RuntimeException(e);}
}private ListCompletableFutureBigDecimal mineAsync(PathVariable int count) {return IntStream.range(0, count).mapToObj(x - coinMiner.mineAsync(executorService)).collect(toList());
} 调用此方法将产生以下响应注意Content-Type HTTP/1.1 200 OKContent-Type: text/event-stream;charsetUTF-8Transfer-Encoding: chunkeddata:1data:1data:1data:1* Connection #0 to host localhost left intact 稍后我们将学习如何在客户端解释这种响应。 现在暂时让我们整理一下设计。 与引进RxJava 上面的代码有效但是看起来很混乱。 我们实际上有一系列事件每个事件都代表计算的进度。 计算最终完成因此流也应发出信号结束。 听起来就像是Observable 我们从重构CoinMiner开始以返回ObservableBigDecimal ObservableBigDecimal mineMany(int count, ExecutorService executorService) {final ReplaySubjectBigDecimal subject ReplaySubject.create();final ListCompletableFutureBigDecimal futures IntStream.range(0, count).mapToObj(x - mineAsync(executorService)).collect(toList());futures.forEach(future -future.thenRun(() - subject.onNext(BigDecimal.ONE)));final CompletableFuture[] futuresArr futures.toArray(new CompletableFuture[futures.size()]);CompletableFuture.allOf(futuresArr).thenRun(subject::onCompleted);return subject;
} 每当mineMany()返回的事件出现在Observable 我们就mineMany()那么多硬币。 当所有期货都完成后我们也完成了交易。 在实现方面这看起来还没有改善但是从控制器的角度来看它有多干净 RequestMapping(/mine/{count})
SseEmitter mine(PathVariable int count) {final SseEmitter sseEmitter new SseEmitter();coinMiner.mineMany(count, executorService).subscribe(value - notifyProgress(sseEmitter),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
} 调用coinMiner.mineMany()我们只需订阅事件。 事实证明Observable和SseEmitter方法匹配11。 这里发生的事情是不言自明的启动异步计算每当后台计算发出任何进度信号时将其转发给客户端。 好的让我们回到实现上。 由于我们将CompletableFuture和Observable混合使用因此看起来很混乱。 我已经描述了如何仅使用一个元素将CompletableFuture转换为Observable 。 这是一个概述包括rx.Single从RxJava 1.0.13开始发现的rx.Single抽象此处未使用 public class Futures {public static T ObservableT toObservable(CompletableFutureT future) {return Observable.create(subscriber -future.whenComplete((result, error) - {if (error ! null) {subscriber.onError(error);} else {subscriber.onNext(result);subscriber.onCompleted();}}));}public static T SingleT toSingle(CompletableFutureT future) {return Single.create(subscriber -future.whenComplete((result, error) - {if (error ! null) {subscriber.onError(error);} else {subscriber.onSuccess(result);}}));}} 将这些实用程序运算符放在某个地方我们可以改善实现并避免混合使用两个API ObservableBigDecimal mineMany(int count, ExecutorService executorService) {final ListObservableBigDecimal observables IntStream.range(0, count).mapToObj(x - mineAsync(executorService)).collect(toList());return Observable.merge(observables);
}ObservableBigDecimal mineAsync(ExecutorService executorService) {final CompletableFutureBigDecimal future CompletableFuture.supplyAsync(this::mine, executorService);return Futures.toObservable(future);
} RxJava有一个内置的运算符用于将多个Observable合并为一个我们的每个基础Observable发出一个事件。 深入研究RxJava运算符 让我们使用RxJava的功能来稍微改善流式传输。 scan 当前每次我们开采一枚硬币时我们都会send(1)客户端send(1)事件。 这意味着每个客户都必须跟踪其已经收到的硬币数量以便计算总的计算数量。 如果服务器总是发送总金额而不是增量那就太好了。 但是我们不想更改实现。 事实证明使用Observable.scan()运算符非常简单 RequestMapping(/mine/{count})
SseEmitter mine(PathVariable int count) {final SseEmitter sseEmitter new SseEmitter();coinMiner.mineMany(count, executorService).scan(BigDecimal::add).subscribe(value - notifyProgress(sseEmitter, value),sseEmitter::completeWithError,sseEmitter::complete);return sseEmitter;
}private void notifyProgress(SseEmitter sseEmitter, BigDecimal value) {try {sseEmitter.send(value);} catch (IOException e) {e.printStackTrace();}
} scan()运算符接收上一个事件和当前事件并将它们组合在一起。 通过应用BigDecimal::add我们只需将所有数字相加即可。 例如1、1 11 1 1依此类推。 scan()类似于flatMap() 但保留中间值。 用sample()采样 可能是因为我们的后端服务产生了太多的进度更新我们无法使用。 我们不想给客户端增加不相关的更新并饱和带宽。 每秒最多发送两次更新听起来很合理。 幸运的是RxJava也有一个内置的运算符 ObservableBigDecimal obs coinMiner.mineMany(count, executorService);
obs.scan(BigDecimal::add).sample(500, TimeUnit.MILLISECONDS).subscribe(//...); sample()将定期查看底层流并仅发出最新的项并丢弃中间项。 幸运的是我们使用scan()即时聚合了项目因此我们不会丢失任何更新。 window() –恒定的发射间隔 不过有一个陷阱。 如果在选定的500毫秒内没有新内容出现 sample()将不会两次发出相同的项目。 很好但是请记住我们正在通过TCP / IP连接推送这些更新。 最好定期将更新发送给客户端即使在此期间什么也没发生–只是为了保持连接的正常运行就像ping 。 可能有多种方法可以实现此要求例如涉及timeout()运算符。 我选择使用window()运算符每500毫秒对所有事件进行分组 ObservableBigDecimal obs coinMiner.mineMany(count, executorService);
obs.window(500, TimeUnit.MILLISECONDS).flatMap(window - window.reduce(BigDecimal.ZERO, BigDecimal::add)).scan(BigDecimal::add).subscribe(//...); 这是一个棘手的问题。 首先我们将所有进度更新分组在500毫秒的窗口中。 然后我们使用reduce来计算在此时间段内开采的硬币的总数类似于scan() 。 如果在此期间未开采任何硬币我们只需返回ZERO 。 最后我们使用scan()汇总每个窗口的小计。 我们不再需要sample()因为window()确保每500毫秒发出一个事件。 客户端 JavaScript中有很多SSE用法示例因此为您提供一种调用我们的控制器的快速解决方案 var source new EventSource(/mine/10);
source.onmessage function (event) {console.info(event);
}; 我相信SseEmitter是Spring MVC的一项重大改进它将使我们能够编写更健壮和更快的Web应用程序需要即时的单向更新。 翻译自: https://www.javacodegeeks.com/2015/08/server-sent-events-with-rxjava-and-sseemitter.htmlrxjava 循环发送事件