当前位置: 首页 > news >正文

关于做教育新闻的网站国网公司网站

关于做教育新闻的网站,国网公司网站,淄博周村学校网站建设公司,wordpress 云盘kafka分布式因此#xff0c;您已经有了使用actor的精美设计#xff0c;选择了JVM和Quasar在该主题上的强大而忠实的观点。 所有明智的决定#xff0c;但是在集群上进行分配时您有什么选择呢#xff1f; 星系 Galaxy是一个非常酷的选择#xff1a;快速的内存中数据网格您已经有了使用actor的精美设计选择了JVM和Quasar在该主题上的强大而忠实的观点。 所有明智的决定但是在集群上进行分配时您有什么选择呢 星系 Galaxy是一个非常酷的选择快速的内存中数据网格针对数据局部性进行了优化具有复制可选的持久性分布式参与者注册表甚至参与者之间的参与者迁移 唯一需要注意的是要发布正式版的生产质量的银河版还需要几个月的时间。 不建议将当前版本的Galaxy用于生产。 如果我们需要在那之前上线怎么办 幸运的是Quasar Actors的阻塞编程模型非常简单以至于将其与大多数消息传递解决方案集成起来都是轻而易举的并且为了证明让我们用两种快速流行且截然不同的模型来做到这一点 Apache Kafka和ØMQ 。 代码和计划 可以在GitHub上找到以下所有示例只需快速阅读简短的README 您就可以立即运行它们。 Kafka和ØMQ分别有两个示例 快速而肮脏的人直接进行发布/投票或发送/接收演员的呼叫。 详细介绍了代理角色这些代理角色将您的代码与消息传递API隔离开。 为了证明我没有在撒谎该程序对两种技术使用了相同的生产者和消费者参与者类 并且几乎使用了相同的引导程序。 卡夫卡 Apache Kafka的采用率急剧上升这是由于其基于持久性提交日志和用于并行消息使用的使用者组的独特设计这种结合形成了快速可靠灵活和可扩展的代理。 该API包括两种类型的生产者sync和async一种消费者仅sync Comsat包括社区贡献的对光纤友好的Kafka生产商集成 。 Kafka生产者句柄是线程安全的在共享时表现最佳可以像这样在actor主体或其他任何地方中轻松获得和使用 final Properties producerConfig new Properties(); producerConfig.put(bootstrap.servers, localhost:9092); producerConfig.put(client.id, DemoProducer); producerConfig.put(key.serializer, org.apache.kafka.common.serialization.IntegerSerializer); producerConfig.put(value.serializer, org.apache.kafka.common.serialization.ByteArraySerializer);try (final FiberKafkaProducerInteger, byte[] producer new FiberKafkaProducer(new KafkaProducer(producerConfig))) {final byte[] myBytes getMyBytes(); // ...final FutureRecordMetaData res producer.send(new ProducerRecord(MyTopic, i, myBytes));res.get(); // Optional, blocks the fiber until the record is persisted; thres also producer.flush() } 我们用Comsat的FiberKafkaProducer包装KafkaProducer对象以便找回光纤阻塞的未来。 但是使用者句柄不是线程安全的1而仅是线程阻塞的 final Properties producerConfig new Properties(); consumerConfig new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, DemoConsumer); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.IntegerDeserializer); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArrayDeserializer);try (final ConsumerInteger, byte[] consumer new KafkaConsumer(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecordsInteger, byte[] records consumer.poll(1000L);for (final ConsumerRecordInteger, byte[] record : records) {final byte[] v record.value();useMyBytes(v); // ...} } 由于我们不想阻塞光纤的基础线程池除了那些Kafka屏蔽的线程池之外-我们无法对它们做太多事情因此在我们的actor的doRun我们将使用FiberAsync.runBlocking代替以喂入固定的具有异步任务的size执行程序该任务将阻塞光纤直到poll 将在给定的池中执行返回之前 final ExecutorService e Executors.newFixedThreadPool(2);try (final ConsumerInteger, byte[] consumer new KafkaConsumer(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecordsInteger, byte[] records call(e, () - consumer.poll(1000L));for (final ConsumerRecordInteger, byte[] record : records) {final byte[] v record.value();useMyBytes(v); // ...} } 其中call是一个定义如下的实用程序方法如果不是此Java编译器bug则没有必要 Suspendable public static V V call(ExecutorService es, CallableV c) throws InterruptedException, SuspendExecution {try {return runBlocking(es, (CheckedCallableV, Exception) c::call);} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} } 在第一个完整的示例中我们将从生产者角色向消费者发送一千个序列化消息。 ØMQ ØMQ或ZeroMQ不是集中的代理解决方案而更多地是针对各种通信模式请求/答复发布/订阅等的套接字的一般化。 在我们的示例中我们将使用最简单的请求-答复模式。 这是我们的新生产者代码 try (final ZMQ.Context zmq ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt zmq.socket(ZMQ.REQ)) {trgt.connect(tcp://localhost:8000);final byte[] myBytes getMyBytes(); // ...trgt.send(baos.toByteArray(), 0 /* flags */)trgt.recv(); // Reply, e.g. ACK } 如您所见上下文充当套接字工厂并传递了要使用的I / O线程数这是因为ØMQ套接字不是连接绑定的OS句柄而是用于处理的机器的简单前端重试连接多个连接高效的并发I / O甚至为您排队。 这就是为什么send调用几乎从不阻塞而recv调用不是连接上的I / O调用而是线程与专门的I / O任务之间的同步的原因该任务将处理来自一个或多个连接的传入字节。 但是我们将在角色中阻塞光纤而不是线程因此让我们在read调用上使用FiberAsync.runBlocking 以防万一它阻塞甚至在send时阻塞 final ExecutorService ep Executors.newFixedThreadPool(2);try (final ZMQ.Context zmq ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt zmq.socket(ZMQ.REQ)) {exec(e, () - trgt.connect(tcp://localhost:8000));final byte[] myBytes getMyBytes(); // ...call(e, trgt.send(myBytes, 0 /* flags */));call(e, trgt::recv); // Reply, e.g. ACK } 这是消费者 try (final ZMQ.Context zmq ZMQ.context(1 /* IO threads */);final ZMQ.Socket src zmq.socket(ZMQ.REP)) {exec(e, () - src.bind(tcp://*:8000));final byte[] v call(e, src::recv);exec(e, () - src.send(ACK));useMyBytes(v); // ... } exec是另一个实用程序函数类似于call Suspendable public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {try {runBlocking(es, (CheckedCallableVoid, Exception) () - { r.run(); return null; });} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} } 这是完整的第一个示例 。 在不改变逻辑的情况下进行分发与救援人员的松散耦合 很简单不是吗 但是有一个令人烦恼的事情我们在网络另一端与演员打交道的方式与本地角色截然不同。 无论他们位于何处或如何连接这些都是我们愿意写的演员 public final class ProducerActor extends BasicActorVoid, Void {private final ActorRefMsg target;public ProducerActor(ActorRefMsg target) {this.target target;}Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (int i 0; i MSGS; i) {final Msg m new Msg(i);System.err.println(USER PRODUCER: m);target.send(m);}System.err.println(USER PRODUCER: EXIT);target.send(EXIT);return null;} }public final class ConsumerActor extends BasicActorMsg, Void {Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (;;) {final Msg m receive();System.err.println(USER CONSUMER: m);if (EXIT.equals(m))return null;}} } 幸运的是每个演员无论做什么都具有相同的非常基本的接口称为信箱的传入消息队列。 这意味着我们可以在两个通信参与者之间插入任意数量的中间参与者或代理 尤其是我们希望有一个发送代理它将通过中间件将消息获取到目的地主机并在其中接收接收代理以捕获传入的消息。并将它们放入目标目的地的邮箱中。 因此在我们的主程序中我们将简单地为我们的ProducerActor提供合适的发送代理并让我们的ConsumerActor从合适的接收代理接收 final ProducerActor pa Actor.newActor(ProducerActor.class, getSendingProxy()); // ... final ConsumerActor ca Actor.newActor(ConsumerActor.class); pa.spawn(); System.err.println(USER PRODUCER started); subscribeToReceivingProxy(ca.spawn()); // ... System.err.println(USER CONSUMER started); pa.join(); System.err.println(USER PRODUCER finished); ca.join(); System.err.println(USER CONSUMER finished); 让我们看看如何首先使用Kafka然后使用ØMQ来实现这些代理。 卡夫卡男演员代理 代理参与者的工厂将与特定的Kafka主题相关联这是因为可以对主题进行分区 以使多个使用者可以同时读取该主题。 我们希望能够最佳地利用每个主题的最大级别或并发性 /* ... */ KafkaProxies implements AutoCloseable {/* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ }// ... } 当然我们希望对多个参与者使用一个主题因此发送代理将指定接收者参与者ID接收代理将仅将消息转发给绑定到该ID的用户参与者 /* ... */ M ActorRefM create(String actorID) { /* ... */ } /* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ } /* ... */ M void subscribe(ActorRef? super M consumer, String actorID) { /* ... */ } /* ... */ void unsubscribe(ActorRef? consumer, String actorID) { /* ... */ } 关闭AutoClosable工厂将通知所有代理终止并清理簿记参考 /* ... */ void close() throws Exception { /* ... */ } 生产者实现是非常简单且无趣的同时给消费者带来了更多的乐趣因为它将使用Quasar Actors的选择性接收将传入消息保留在其邮箱中直到至少有一个订阅的用户actor可以使用它们为止 Override protected Void doRun() throws InterruptedException, SuspendExecution {//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object msg tryReceive((Object m) - {if (EXIT.equals(m))return EXIT;if (m ! null) {//noinspection uncheckedfinal ProxiedMsg rmsg (ProxiedMsg) m;final ListActorRef l subscribers.get(rmsg.actorID);if (l ! null) {boolean sent false;for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);sent true;}if (sent) // Someone was listening, remove from queuereturn m;}}return null; // No subscribers (leave in queue) or no messages});// Something from queueif (msg ! null) {if (EXIT.equals(msg)) {return null;}continue; // Go to next cycle - precedence to queue}// Try receiving//noinspection Convert2Lambdafinal ConsumerRecordsVoid, byte[] records call(e, () - consumer.get().poll(100L));for (final ConsumerRecordVoid, byte[] record : records) {final byte[] v record.value();try (final ByteArrayInputStream bis new ByteArrayInputStream(v);final ObjectInputStream ois new ObjectInputStream(bis)) {//noinspection uncheckedfinal ProxiedMsg rmsg (ProxiedMsg) ois.readObject();final ListActorRef l subscribers.get(rmsg.actorID);if (l ! null l.size() 0) {for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);}} else {ref().send(rmsg); // Enqueue}} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}} } 由于我们还需要处理邮箱因此我们以足够小的超时来轮询Kafka。 还要注意许多参与者可以订阅相同的ID传入消息将广播给所有参与者。 每个主题创建的接收actor代理即光纤的数量以及池线程和Kafka使用者句柄 consumer是本地线程因为Kafka使用者不是线程安全的的数量将等于每个主题划分分区这使接收吞吐量达到最大。 目前此实现使用Java序列化在字节之间来回转换消息但是当然可以使用其他框架例如Kryo 。 ØMQ演员代理 ØMQ模型是完全去中心化的既没有经纪人也没有话题因此我们可以简单地将ØMQ地址/端点与一组参与者等同而无需使用额外的参与者ID /* ... */ ZeroMQProxies implements AutoCloseable {/* ... */ ZeroMQProxies(int ioThreads) { /* ... */ }/* ... */ M ActorRefM to(String trgtZMQAddress) { /* ... */ }/* ... */ void drop(String trgtZMQAddress)/* ... */ void subscribe(ActorRef? super M consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void unsubscribe(ActorRef? consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void close() throws Exception { /* ... */ } } 同样在这种情况下并且由于与以前相同的原因使用方有点有趣但幸运的是线程安全性方面的任何问题都因为ØMQ套接字在多个线程中可以正常工作 Override protected Void doRun() throws InterruptedException, SuspendExecution {try(final ZMQ.Socket src zmq.socket(ZMQ.REP)) {System.err.printf(PROXY CONSUMER: binding %s\n, srcZMQEndpoint);Util.exec(e, () - src.bind(srcZMQEndpoint));src.setReceiveTimeOut(100);//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object m tryReceive((Object o) - {if (EXIT.equals(o))return EXIT;if (o ! null) {//noinspection uncheckedfinal ListActorRef l subscribers.get(srcZMQEndpoint);if (l ! null) {boolean sent false;for (final ActorRef r : l) {//noinspection uncheckedr.send(o);sent true;}if (sent) // Someone was listening, remove from queuereturn o;}}return null; // No subscribers (leave in queue) or no messages});// Something processable is thereif (m ! null) {if (EXIT.equals(m)) {return null;}continue; // Go to next cycle - precedence to queue}System.err.println(PROXY CONSUMER: receiving);final byte[] msg Util.call(e, src::recv);if (msg ! null) {System.err.println(PROXY CONSUMER: ACKing);Util.exec(e, () - src.send(ACK));final Object o;try (final ByteArrayInputStream bis new ByteArrayInputStream(msg);final ObjectInputStream ois new ObjectInputStream(bis)) {o ois.readObject();} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}System.err.printf(PROXY CONSUMER: distributing %s to %d subscribers\n, o, subscribers.size());//noinspection uncheckedfor (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (ListActorRef) Collections.EMPTY_LIST))//noinspection uncheckeds.send(o);} else {System.err.println(PROXY CONSUMER: receive timeout);}}} }更多功能 这篇简短的文章有望使人们一眼就可以看出由于Quasar的Actor具有顺畅的顺序流程的特性因此可以无缝地将Quasar的Actor与消息传递解决方案进行交互。 当然可以更进一步例如 演员查找和发现 我们如何提供全球演员命名/发现服务 例如Kafka使用ZooKeeper因此可能值得利用但是ØMQ大量押注去中心化并且故意不提供预先打包的基础。 Actor故障管理 我们如何支持在不同节点中运行的actor之间的故障管理链接和监视 消息路由 如何在不更改参与者内部逻辑的情况下动态调整节点与参与者之间的消息流 角色移动性 我们如何将角色移动到其他节点例如使其更靠近消息源以提高性能或移动到具有不同安全性的位置 可伸缩性和容错性 如何管理参与者节点的添加删除死亡和分区 像Galaxy这样的分布式IMDG和像Kafka这样的基于代理的解决方案通常已经做到了但是像ØMQ这样的结构级解决方案通常不这样做。 安全性 我们如何支持相关的信息安全性属性 测试记录监视 我们如何方便地整体测试跟踪和监视分布式参与者集合 这些主题尤其是分布式系统设计尤其是分布式参与者的“硬核”因此要有效地解决它们可能需要大量的精力。 Galaxy解决了所有这些问题但是Quasar参与者提供了一个SPI 涵盖了上述一些主题并允许与发行技术更紧密地集成。 您可能也对Akka和Quasar Galaxy之间的比较感兴趣该比较涵盖了许多此类方面。 就是这样请与您分布的Quasar演员一起玩乐并在Quasar-Pulsar用户组中留下有关您的旅程的注释 实际上它也禁止除第一个线程外的任何线程使用。 翻译自: https://www.javacodegeeks.com/2016/05/distributed-quasar-actors-kafka-zeromq.htmlkafka分布式
http://www.zqtcl.cn/news/931341/

相关文章:

  • 2014网站怎么备案怎样建置换平台网站
  • 惠州网站建设信息嘉兴做网站软件
  • 如何做发表文章的网站淮安市建设工程质量监督站网站
  • 做洁净的网站太原便宜做网站的公司
  • 网站设计评级检索标准的网站
  • 做个网站每年都要交域名费吗html静态网页首页模板
  • 网站资源整合与建设wordpress固定链接设置后404
  • 网站历史快照seo推广方法
  • 做淘宝客的的网站有什么要求北京专业网站制作公司
  • 建设网站 知乎个人可以开发app软件吗
  • 网站如何后台管理北京正规网站建设有几种
  • 临沂网站排名高质量的中山网站建设
  • 响应式网站定制开发网络教育全程托管
  • 做网站中的剪辑图片龙岗网站
  • 建设购物网站的意义免费做外贸的网站平台
  • 长沙做电商网站设计重庆观音桥旅游攻略
  • 网站建设的目标与期望动漫设计与制作工资多少
  • 做网站找网站设计公司 长沙
  • 网站维护内容网站代码下载
  • 西安建设主管部门官方网站wordpress返回件
  • 建立免费空间网站南宁seo推广外包
  • 网站初期如何推广用秀米制作h5详细步骤
  • 做网站需要执照嘛开发 网站 团队
  • 怎么提交网站关键词包头人脸检测系统
  • 哪个网站开发是按月付费的婚纱摄影建设网站的目的
  • 站长之家app简单网站制作步骤
  • 网站开发与桌面应用开发wordpress if include
  • 网站怎么做预约小程序江苏省工程建设招标网站
  • python做网站有什么弊端专业做网带
  • 浙江建设工程考试网站wordpress等模版比较