眉山建设中等职业技术学校 网站,sns有哪些著名的网站,永久域名查询,工业互联网企业排名在我以前的文章中#xff0c;我研究了一个虚拟的交易引擎#xff0c;并将基于Java的阻止解决方案与基于Node.js的非阻止解决方案进行了比较。 在文章的结尾#xff0c;我写道#xff1a; 我怀疑在Node.js近期取得成功之后#xff0c;越来越多的异步Java库将开始出现。 这… 在我以前的文章中我研究了一个虚拟的交易引擎并将基于Java的阻止解决方案与基于Node.js的非阻止解决方案进行了比较。 在文章的结尾我写道 我怀疑在Node.js近期取得成功之后越来越多的异步Java库将开始出现。 这样的库已经存在例如 Akka Spray和此Mysql异步驱动程序 。 我给自己设定了一个挑战即要确切地使用这些库来创建基于Java的非阻塞解决方案以便将其性能与上一篇文章中创建的Node.js解决方案进行比较。 您可能注意到的第一件事是这些都是基于Scala的库但是我用Java编写了该解决方案尽管它在语法上不太优雅。 在上一篇文章中我介绍了一种基于Akka的解决方案其中交易引擎封装在actor中。 在这里我放弃了Tomcat作为HTTP服务器并用Spray代替了它后者将HTTP服务器直接集成到Akka中。 从理论上讲这应该不会对性能造成任何影响因为Spray是NIO就像Tomcat 8一样。 但是吸引我到此解决方案的是总体而言线程的数量大大减少了因为SprayAkka和异步Mysql库都使用相同的执行上下文 。 Tomcat在我的Windows开发计算机上运行有30多个线程而此处构建的解决方案只有10个以上或者与Websphere或JBoss相比有数百个线程。 执行上下文基本上是一个线程池这些线程运行分配给它的任务。 由于此处介绍的解决方案中使用的所有库都是非阻塞的因此线程数可以保持较低并接近理论最佳值从而尽可能少地进行上下文切换 从而使过程高效运行。 本文编写的代码在GitHub上 。 该程序的第一部分是启动Spray和Akka的main方法 public static final ActorSystem system ActorSystem.create(system);public static void main(String[] args) {...ActorRef listener system.actorOf(Props.create(HttpActor.class), httpActor); InetSocketAddress endpoint new InetSocketAddress(3000);int backlog 100;ListInet.SocketOption options JavaConversions.asScalaBuffer(new ArrayListInet.SocketOption()).toList();OptionServerSettings settings scala.Option.empty();ServerSSLEngineProvider sslEngineProvider null;Bind bind new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());system.scheduler().schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), ()-{System.out.println(new Date() - numSales numSales.get());}, system.dispatcher());
} 第1行创建了一个公共的actor系统因此我可以从其他地方访问它因为它用于访问我想在整个程序中使用的单个执行上下文。 在存在可维护性问题的代码中我会写一些东西以便将该对象注入程序的相关部分。然后第5行使用该系统实例化一个actor该actor用于处理所有HTTP买卖请求。命令。 第7-11行仅设置了服务器的配置数据。 第12和13行是我们进行配置和actor的地方并告诉Akka IO使用它们和HTTP模块将所有HTTP请求作为消息从第5行发送给我们的actor。15-17行是我有效地设置计时器任务的地方每5秒触发一次以输出一些统计信息。 这里的重要部分是要注意我没有使用Java的Timer来调度任务因为这只会给进程添加更多不必要的线程。 相反我使用与Akka相同的执行上下文因此创建了尽可能少的线程。 接下来是处理HTTP请求的参与者 private static class HttpActor extends AbstractActor {private static final HttpProtocol HTTP_1_1 HttpProtocols.HTTP$div1$u002E1();public HttpActor() {final Router router partitionAndCreateRouter();receive(ReceiveBuilder.match(HttpRequest.class, r - {int id Constants.ID.getAndIncrement();String path String.valueOf(r.uri().path());if(/sell.equals(path)){String productId r.uri().query().get(productId).get();...SalesOrder so new SalesOrder(price, productId, quantity, id);so.setSeller(new Seller(who));router.route(so, self());replyOK(id);}else if(/buy.equals(path)){...}else{handleUnexpected(r);}}).match(Tcp.Connected.class, r -{sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!}).build());}第3行显示了一个示例该示例显示如何将Scala集成到Java程序中很丑陋但是有时您如何通过添加自己的抽象来隐藏那些丑陋的部分。 响应HTTP请求的HTTP actor具有3个作业。 第6行上的第一个工作是在其中创建一个路由器我将在下面对其进行描述并将其用于委派工作。 第二项工作是处理24-24行上的所有新连接这告诉Spray这个参与者也将处理实际的请求而不仅仅是连接。 该参与者具有的第三项工作在第9-18行中显示该参与者接受HTTP请求并将一些工作委托路由到系统中的另一个参与者。 这个参与者知道HTTP模型但是HTTP抽象不会泄漏到系统的下一层。 相反参与者将域对象或值对象或案例类或类似对象传递给封装了交易引擎的参与者。 使用从HTTP请求中提取的数据例如在第13行或者使用请求主体中的JSON对象可以在第15和16行看到此类域对象的构造。 Spray包含有用的指令 可以帮助您从请求中提取数据如果需要的话可以从HTTP提取一些内容。 构造哪个域对象取决于我构建并在第9、12和19行处理的类似REST的接口。如果我使用了Scala则可以在HttpRequest对象上使用模式匹配来编写更精美的代码。 通过从第6行获得路由器以将域对象路由到合适的参与者将域对象传递到交易引擎在第17行。最后但并非最不重要的是第18行是在HTTP响应中确认销售订单请求的位置它将JSON对象以及分配给订单的唯一ID传递回消费者以便以后可以查询其状态将其持久化到销售对象中。 下一个代码片段显示了我们如何划分市场并创建多个参与者来并行处理请求。 private Router partitionAndCreateRouter() {MapString, ActorRef kids new HashMap();java.util.ListRoutee routees new ArrayListRoutee();int chunk Constants.PRODUCT_IDS.length / NUM_KIDS;for (int i 0, j Constants.PRODUCT_IDS.length; i j; i chunk) {String[] temparray Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i chunk);LOGGER.info(created engine for products temparray);ActorRef actor getContext().actorOf(Props.create(EngineActor.class));getContext().watch(actor);routees.add(new ActorRefRoutee(actor));for (int k 0; k temparray.length; k) {LOGGER.debug(mapping productId temparray[k] to engine i);kids.put(temparray[k], actor);}LOGGER.info(---started trading);actor.tell(EngineActor.RUN, ActorRef.noSender());} Router router new Router(new PartitioningRoutingLogic(kids), routees);return router;
}此代码与上一篇文章中的代码相似。 为了横向扩展并同时使用多个核心按产品ID对市场进行了划分并且每个交易引擎针对不同的市场划分同时运行。 在此处提供的解决方案中在每个分区上创建一个EngineActor并将其包装在第10行的Routee中。第14行还填充了一个由产品ID键控的actor映射。在第19行和第19行使用路由和映射构建了路由器。委派工作时 HttpActor在上一片段中使用的就是这个。 还要注意第17行它启动了包含在EngineActor的交易引擎以便启动并运行该引擎准备在将购买和销售订单传递给这些EngineActor进行交易。 这里没有明确显示EngineActor类因为它与上一篇文章中使用的actor几乎相同并且仅封装了一个交易引擎该引擎处理特定市场分区中的所有产品。 上面的第19行使用RoutingLogic构建路由器如下所示 public static class PartitioningRoutingLogic implements RoutingLogic {private MapString, ActorRef kids;public PartitioningRoutingLogic(MapString, ActorRef kids) {this.kids kids;}Overridepublic Routee select(Object message, IndexedSeqRoutee routees) {//find which product ID is relevant hereString productId null;if(message instanceof PurchaseOrder){productId ((PurchaseOrder) message).getProductId();}else if(message instanceof SalesOrder){productId ((SalesOrder) message).getProductId();}ActorRef actorHandlingProduct kids.get(productId);//no go find the routee for the relevant actorfor(Routee r : JavaConversions.asJavaIterable(routees)){ActorRef a ((ActorRefRoutee) r).ref(); //cast ok, since the are by definition in this program all routees to ActorRefsif(a.equals(actorHandlingProduct)){return r;}}return akka.routing.NoRoutee$.MODULE$; //none found, return NoRoutee}
} 路由器在接收到必须路由到正确角色的对象时会调用第10行的select(...)方法。 使用在上一个清单中创建的地图以及从请求中获得的产品ID很容易找到包含负责相关市场划分的交易引擎的参与者。 通过返回包裹该参与者的路由Akka会将订单对象传递给正确的EngineActor 然后在交易引擎处于交易周期之间且EngineActor下次检查时处理该消息时将数据放入模型中它的收件箱。 好的这就是要处理的前端。 上一篇文章的解决方案所需要的第二个主要更改是方法的设计该方法可以在交易后保持销售。 在基于Java的解决方案中我同步遍历每笔交易并将insert语句发送到数据库并且仅在数据库回复后才处理下一次交易。 使用此处提供的解决方案我选择通过向数据库发出insert请求并立即移至下一个销售并执行相同操作来并行处理销售。 使用我提供的回调在执行上下文中异步处理了响应。 我编写了程序以等待最后一次插入被确认然后再继续进行新创建的购买和销售订单的交易该订单自上次交易时段开始以来就已经到来。 在下面的清单中显示 private void persistSales(ListSale sales, final PersistenceComplete f) {if (!sales.isEmpty()) {LOGGER.info(preparing to persist sales);final AtomicInteger count new AtomicInteger(sales.size());sales.forEach(sale - {List values Arrays.asList(sale.getBuyer().getName(), sale.getSeller().getName(),sale.getProductId(),sale.getPrice(),sale.getQuantity(),sale.getPurchaseOrder().getId(),sale.getSalesOrder().getId());FutureQueryResult sendQuery POOL.sendPreparedStatement(SQL, JavaConversions.asScalaBuffer(values));sendQuery.onComplete(new JFunction1TryQueryResult, Void() {Overridepublic Void apply(TryQueryResult t) {if(t.isSuccess()){QueryResult qr t.get();//the query result doesnt contain auto generated IDs! library seems immature...//sale.setId(???);}if(count.decrementAndGet() 0){if(t.isSuccess()){f.apply(null);}else{f.apply(t.failed().get());}}return null; //coz of Void}}, Main.system.dispatcher());});}else{f.apply(null); //nothing to do, so continue immediately}
} 交易引擎在每个交易周期后调用persistSales(...)方法并向该方法传递在该交易周期内完成的销售清单并在所有持久性完成后调用一个回调函数。 如果未售出任何东西则第38行立即调用回调。 否则在第5行上创建一个计数器并使用要保留的销售数量进行初始化。 每次销售都在第7-15行异步保存。 请注意如何在第15行返回一个Future以及如何在第16-35行使用另一个回调来处理future的完成–这里没有阻塞等待future完成 上面提到的计数器在第25行递减一旦销售被持久化并且所有销售都被持久化则调用传递给persistSales(...)方法的回调。 请注意在第16行使用的类JFunction1是一个垫片可以更轻松地集成JFunction1代码在GitHub上的上面给出的链接上。 第21和22行表明我使用的异步Mysql库存在一些问题。 它仍然是一个测试版似乎没有办法掌握销售产生的自动递增主键。 还要注意第35行在这里我传入了Akka使用的执行上下文以便处理插入语句完成的Future在一个现有线程上进行处理而不是在某些新线程上进行处理–再次保持该线程的总数线程越低越好。 该清单还显示了一个有趣的问题即调用数据库以插入数据的线程不一定是可能需要关闭连接的线程[1]。 在普通的Java EE和Spring中经常使用线程本地存储另请参见此处 。 如果您从处理将来完成的函数中调用Bean则注入到其中的资源可能不起作用因为容器无法确定上下文是什么。 Scala使用隐式参数解决了这个问题这些参数在后台传递给方法。 上面的清单使用PersistenceComplete回调如下面第14-16行所示。 它还使用使用以下代码创建的连接池。 再一次Akka使用的执行上下文将传递到下面第10行的异步Mysql库。 下面的第10行还显示了一个非默认的池配置其中允许的最大队列大小最大为一千。 在负载测试期间我收到许多错误消息表明池已饱和增加该值可以解决问题。 private static final String SQL INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) VALUES (?, ?, ?, ?, ?, ?, ?);private static final ConnectionPoolMySQLConnection POOL;
static {Duration connectTimeout Duration.apply(5.0, TimeUnit.SECONDS);Duration testTimeout Duration.apply(5.0, TimeUnit.SECONDS);Configuration configuration new Configuration(root, Main.DB_HOST, 3306, Option.apply(password), Option.apply(TRADER), io.netty.util.CharsetUtil.UTF_8, 16777216, PooledByteBufAllocator.DEFAULT, connectTimeout, testTimeout);MySQLConnectionFactory factory new MySQLConnectionFactory(configuration);POOL new ConnectionPoolMySQLConnection(factory, new PoolConfiguration(1000, 4, 1000, 4000), Main.system.dispatcher());
}private static interface PersistenceComplete {void apply(Throwable failure);
} 传递给persistSales(...)的回调在下一个清单中显示。 以下代码与上一篇文章中显示的原始代码几乎没有什么不同不同之处在于以下代码现在是异步的。 一旦所有销售都持续存在就会调用该回调然后回调才会在下面的第14行上通过其事件侦听器向参与者发送一条消息。 在加载大量新的购买和销售订单后该消息通常位于收件箱的后面。 这些消息中的每一个都会被处理从而导致在重新开始交易之前使用新订单更新交易引擎模型。 persistSales(sales, t - {if(t ! null){LOGGER.error(failed to persist sales: sales, t);}else{LOGGER.info(persisting completed, notifying involved parties...);sales.stream().forEach(sale - {if (sale.getBuyer().listener ! null)sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);if (sale.getSeller().listener ! null)sale.getSeller().listener.onEvent(EventType.SALE, sale);});...}listener.onEvent(EventType.STOPPED, null);
}); 最终的代码清单是对Node.js解决方案的修改该修改使它也可以并行地保持销售而不是像上一篇文章中那样一个接一个地销售。 function persistSales(sales, callback){if(sales.length 0 || process.env.skipPersistence) {callback(); //nothing to do, so continue immediately}else{resources.dbConnection(function(err, connection) {if(err) callback(err); else {logger.info(preparing to persist sales.length sales);var count sales.length;_.each(sales, function(sale){ //save them in parallelconnection.query(INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) values (?, ?, ?, ?, ?, ?, ?),[sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],function(err, rows, fields) {if(err) callback(err); else {sale.id rows.insertId;count--;if(count 0){logger.info(persisted all sales);connection.release();callback();}}});});}});}
} 第5行从池中获取一个连接并且相同的连接“并行”用于所有销售并且在最后一次销售持续后仅在第19行中释放即返回到池中。 因此再次通过一些负载测试比较解决方案的时间到了。 这次我选择查看以下三个解决方案中的每一个可以达到的最大销售率 情况1 –此处介绍的解决方案即Spray Akka 异步Mysql驱动程序 情况2 –修改后的Node.js解决方案使用并行持久性 情况3 –原始的Tomcat非阻塞连接器但具有同步持久性。 这些案例是使用上一篇文章中的硬件运行的交易引擎运行在快速硬件上而数据库运行在慢速硬件上因为这是显示阻塞I / O如何导致性能问题的最佳设置。 对于每种情况我可以在调整时调整三个变量。 这些曾经是 交易引擎作为参与者或作为子流程的数量 客户端调用服务器之间等待的时间 并发客户端数。 后两个基本上调整了每秒的请求数量因为连接没有保持打开状态以等待交易结果请参阅上一篇文章。 结果如下最佳性能以粗体显示。 情况1 – Spray Akka 异步Mysql驱动程序 交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU 8 100毫秒 60 42,810 25-35 8 80毫秒 70 62,392 25-35 8 60毫秒 80 75,600 30-40 8 40毫秒 90 59,217 30-50 10 60毫秒 80 太多的数据库连接问题 5 60毫秒 60 67,398 25-35 6 60毫秒 80 79,536 25-35 案例2 –具有并行持久性的Node.js 交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU 8 200毫秒 30 6,684 40-50 8 100毫秒 60 开始落后 8 100毫秒 40 17,058 25-35 8 100毫秒 50 开始落后 12 100毫秒 50 20,808 45-60 16 100毫秒 60 24,960 45-65 20 100毫秒 80 32,718 45-70 25 60毫秒 80 51,234 75-85 30 50毫秒 80 22,026 75-85 25 10毫秒 70 17,604 75-90 情况3 – Tomcat 8 NIO具有同步阻止持久性 交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU 4 200毫秒 30 9,586 5 4 150毫秒 30 10,221 5 8 200毫秒 30 9,510 5 结果表明将NIO连接器用螺栓固定在Tomcat上并认为您没有阻塞并且性能很危险因为与Akka解决方案相比该解决方案的表现差了近8倍。 结果还表明通过使用非阻塞库并用Java编写非阻塞解决方案与Node.js相比可以创建性能卓越的解决方案。 Java解决方案不仅具有大约50的吞吐量而且使用的CPU不到一半。 非常重要请注意这是特定于此处使用的算法以及我的体系结构设计和实现的结果。 它还依赖于使用“非标准” Java库实际上我使用的Mysql库缺少功能例如从insert结果中读取生成的主键。 在得出JavaScala和Node.js的相对性能结论之前请针对您的用例做自己的实验 比较交易引擎数量变化时的一个值得注意的点在Node.js中它直接控制子进程的数量类似于线程数 在Akka解决方案中它对系统中的线程数量没有任何影响–该数量保持不变 在Akka解决方案中更改参与者的数量会影响其收件箱中消息的数量。 有关此视频的详细信息请参见有关使用Akka和Spray的更多信息。 请花时间也快速阅读有关反应式宣言 。 此处介绍的Akka解决方案是反应性的因为它具有响应能力这三种情况中的吞吐量都最高有弹性Akka提供了处理故障的简便方法尽管这里没有必要有弹性它是自动扩展的因为Akka管理线程池它在执行上下文中的大小并且由于Akka提供了actor的透明位置而扩大了规模并且它是消息驱动的由于使用actor模型。 [1]这里使用的Mysql库不需要关闭连接并返回到池例如Apache数据库池 。 这样做实际上会引起问题 我进行的负载测试证明将其保持打开状态不会造成任何问题。 翻译自: https://www.javacodegeeks.com/2015/01/a-reactive-and-performant-spray-akka-solution-to-playing-with-concurrency-and-performance-in-java-and-node-js.html