当前位置: 首页 > news >正文

知名企业门户网站建设联系电话营销型和展示型网站的区别

知名企业门户网站建设联系电话,营销型和展示型网站的区别,建筑效果图素材网站,网站备案 英文队列这种数据结构都不陌生#xff0c;特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能#xff0c;这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。 这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点消费者watcher监听节点新增事件来消费消息。 生产者 CuratorFramework client ... client.start(); String path /testqueue; client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,11.getBytes()) 消费者 CuratorFramework client ... client.start(); String path /testqueue; PathChildrenCache pathCache new PathChildrenCache(client,path,true); pathCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data event.getData();//handle msgclient.delete().forPath(data.getPath());}} }); pathCache.start();使用curator queue 先来使用基本的队列类DistributedQueue。 DistributedQueue的初始化需要提交准备几个参数 client连接就不多说了: CuratorFramework client ...QueueSerializer这个主要是用来指定对消息data进行序列化和反序列化 这里就搞一个简单的字符串类型 QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);} };QueueConsumer消息consumer当有新消息来的时候会调用consumer.consumeMessage()来处理消息 这里也搞个简单的string类型的处理consumer QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String s) throws Exception {System.out.println(receive msg:s);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO} };队列消息发布 //队列节点路径 String queuePath /queue; //使用上面准备的几个参数构造DistributedQueue对象 DistributedQueueString queue QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue(); queue.start(); //调用put方法生产消息 queue.put(hello); queue.put(msg); Thread.sleep(2000); queue.put(3);这样在启动测试程序在consumer的consumeMessage方法就会收到queue.put的消息。 这里有个问题有没有发现在初始化queue的时候需要指定consumer那岂不是只能同一个程序中生产消费何来的分布式 其实这里在queue对象创建的时候consumer可以为null这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。 在DistributedQueue类的构造函数有一步设置isProducerOnly属性 isProducerOnly (consumer null);然后在start()方法会根据isProducerOnly来判断启动方式 if ( !isProducerOnly || (maxItems ! QueueBuilder.NOT_SET) ) {childrenCache.start(); }if ( !isProducerOnly ) {service.submit(new CallableObject(){Overridepublic Object call(){runLoop();return null;}}); }这里看到consumer为空两个if不成立不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。 源码分析 先从消息的发布也就是put方法 首先调用makeItemPath()获取创建节点路径 ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);这里QUEUE_ITEM_NAME“queue-”。 然后调用internalPut()方法来创建节点路径 //先累加消息数量putCount putCount.incrementAndGet(); //使用serializer序列化消息数据 byte[] bytes ItemSerializer.serialize(multiItem, serializer); //根据background来创建节点 if ( putInBackground ) {doPutInBackground(item, path, givenMultiItem, bytes); } else {doPutInForeground(item, path, givenMultiItem, bytes); }看doPutInForeground里就是具体的创建节点了 //创建节点 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes); //哦错了这里putCount不是总消息数是正在创建消息数创建完再回减 synchronized(putCount) {putCount.decrementAndGet();putCount.notifyAll(); }//如果有对应的lisener依次调用 putListenerContainer.forEach(listener - {if ( item ! null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);} });消息的发布就完成了。 然后是消息的consumer这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作 1、childrenCache.start(); childrenCache初始化是在queue的构造函数里 childrenCache new ChildrenCache(client, queuePath)其start方法会调用 private final CuratorWatcher watcher new CuratorWatcher() {Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}} };private final BackgroundCallback callback new BackgroundCallback(){Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}} 这里先把代码都贴上看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理最后是调用到setNewChildren方法 private synchronized void setNewChildren(ListString newChildren) {if ( newChildren ! null ){Data currentData children.get();//将数据设置到children变量里消息版本1children.set(new Data(newChildren, currentData.version 1));//notifyAll() 等待线程获取消息notifyFromCallback();} }这里有引入了一个children变量然后将数据设置到了该变量里。 private final AtomicReferenceData children new AtomicReferenceData(new Data(Lists.StringnewArrayList(), 0));children其实是线程间通信一个共享数据容器变量。这里设置了数据然后具体的数据消费在下一步。 2、线程池里丢了个任务去执行runLoop();方法。 回到DistributedQueue.start的第二步执行runLoop()方法看名字就应该知道了一直轮询获取消息。 还是来看代码吧 private void runLoop() {long currentVersion -1;long maxWaitMs -1;//while一直轮询while ( state.get() State.STARTED ){try{//从childrenCache里获取数据ChildrenCache.Data data (maxWaitMs 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion data.version;ListString children Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() 0 ){maxWaitMs getDelay(children.get(0));if ( maxWaitMs 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点然后使用serializer反序列化节点数据调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}} }这里获取数据使用了childrenCache.blockingNextGetData synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException {long startMs System.currentTimeMillis();boolean hasMaxWait (unit ! null);long maxWaitMs hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion children.get().version ){if ( hasMaxWait ){long elapsedMs System.currentTimeMillis() - startMs;long thisWaitMs maxWaitMs - elapsedMs;if ( thisWaitMs 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get(); }这里就有wait阻塞等消息当消息来时候会被唤醒。 其它类型队列 curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现有兴趣的自己看吧。
http://www.zqtcl.cn/news/193934/

相关文章:

  • 网站建设与管理视频网站推广的方法枫子
  • 苏州市住房和城乡建设局官方网站宠物之家网站开发
  • 建个人网站活字格能开发企业网站吗
  • php网站后台密码忘记做电子商务网站 语言
  • 网站建设策划师怎样进入国外网站
  • 建设银行商城网站浙江建站管理系统价格
  • 我想做个网站怎么做的常用的网络营销方法及效果
  • 南通专业做网站南宁网站建设mxfsem
  • 阿里巴巴电子商务网站建设目的网站专题素材
  • 浙江虎霸建设机械有限公司网站哪个网站做简历好
  • 网站做电商资质吗网站开发作品
  • 大型彩灯制作公司临清聊城网站优化
  • 网站建设灬金手指下拉十五网络运维工程师简历怎么写
  • 黄岛建设局网站动漫采集WordPress
  • 做网站现在挣钱吗wordpress 网址导航主题
  • 外贸网站什么采集wordpress主题更换logo
  • 唐山开发网站的公司长沙营销型网站设计
  • 数据库策略网站推广的有效方法有美辰网站建设
  • c 网站开发构想做网站的点子
  • 个人网站模板下载提供网站建设备案公司
  • 做网站需要会写代码6山东东营
  • 兼职刷客在哪个网站做网站搬家数据库配置
  • 做搬运的话哪个网站好网站模板建站
  • 建设个人信息网站wordpress 用户权限
  • 网站不显示域名解析错误怎么办公益网站设计
  • 怎么上传网站图片的链接手表网站排行榜
  • 网站推广方法100种百度排名规则
  • 上海专业网站建设公司站霸网络萝岗区网站建设推广
  • 做微商网站的公司永久免费crm管理系统
  • 网站开发的环境专业的建设网站