知名企业门户网站建设联系电话,营销型和展示型网站的区别,建筑效果图素材网站,网站备案 英文队列这种数据结构都不陌生#xff0c;特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能#xff0c;这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。
这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点特点就是先进先出。有很多常用的消息中间件可以有现成的该部分功能这里使用zookeeper基于发布订阅模式来实现分布式队列。对应的会有一个生产者和一个消费者。
这里理论上还是使用顺序节点。生产者不断产生新的顺序子节点消费者watcher监听节点新增事件来消费消息。
生产者
CuratorFramework client ...
client.start();
String path /testqueue;
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,11.getBytes())
消费者
CuratorFramework client ...
client.start();
String path /testqueue;
PathChildrenCache pathCache new PathChildrenCache(client,path,true);
pathCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType() PathChildrenCacheEvent.Type.CHILD_ADDED){ChildData data event.getData();//handle msgclient.delete().forPath(data.getPath());}}
});
pathCache.start();使用curator queue
先来使用基本的队列类DistributedQueue。
DistributedQueue的初始化需要提交准备几个参数
client连接就不多说了:
CuratorFramework client ...QueueSerializer这个主要是用来指定对消息data进行序列化和反序列化
这里就搞一个简单的字符串类型
QueueSerializerString serializer new QueueSerializerString() {Overridepublic byte[] serialize(String item) {return item.getBytes();}Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}
};QueueConsumer消息consumer当有新消息来的时候会调用consumer.consumeMessage()来处理消息
这里也搞个简单的string类型的处理consumer
QueueConsumerString consumer new QueueConsumerString() {Overridepublic void consumeMessage(String s) throws Exception {System.out.println(receive msg:s);}Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {//TODO}
};队列消息发布
//队列节点路径
String queuePath /queue;
//使用上面准备的几个参数构造DistributedQueue对象
DistributedQueueString queue QueueBuilder.builder(client,consumer,serializer,queuePath).buildQueue();
queue.start();
//调用put方法生产消息
queue.put(hello);
queue.put(msg);
Thread.sleep(2000);
queue.put(3);这样在启动测试程序在consumer的consumeMessage方法就会收到queue.put的消息。
这里有个问题有没有发现在初始化queue的时候需要指定consumer那岂不是只能同一个程序中生产消费何来的分布式
其实这里在queue对象创建的时候consumer可以为null这个时候queue就只生产消息。具体的逻辑需要看下DistributedQueue类的源码。
在DistributedQueue类的构造函数有一步设置isProducerOnly属性
isProducerOnly (consumer null);然后在start()方法会根据isProducerOnly来判断启动方式
if ( !isProducerOnly || (maxItems ! QueueBuilder.NOT_SET) )
{childrenCache.start();
}if ( !isProducerOnly )
{service.submit(new CallableObject(){Overridepublic Object call(){runLoop();return null;}});
}这里看到consumer为空两个if不成立不会初始化对那个的消息消费逻辑wather监听。只需要在另一个程序里创建queue启动时指定consumer即可。
源码分析
先从消息的发布也就是put方法
首先调用makeItemPath()获取创建节点路径
ZKPaths.makePath(queuePath, QUEUE_ITEM_NAME);这里QUEUE_ITEM_NAME“queue-”。
然后调用internalPut()方法来创建节点路径
//先累加消息数量putCount
putCount.incrementAndGet();
//使用serializer序列化消息数据
byte[] bytes ItemSerializer.serialize(multiItem, serializer);
//根据background来创建节点
if ( putInBackground )
{doPutInBackground(item, path, givenMultiItem, bytes);
}
else
{doPutInForeground(item, path, givenMultiItem, bytes);
}看doPutInForeground里就是具体的创建节点了
//创建节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path, bytes);
//哦错了这里putCount不是总消息数是正在创建消息数创建完再回减
synchronized(putCount)
{putCount.decrementAndGet();putCount.notifyAll();
}//如果有对应的lisener依次调用
putListenerContainer.forEach(listener - {if ( item ! null ){listener.putCompleted(item);}else{listener.putMultiCompleted(givenMultiItem);}
});消息的发布就完成了。
然后是消息的consumer这里肯定是使用的watcher。这里还是回到前面start方法处根据isProducerOnly属性判断有两步操作
1、childrenCache.start();
childrenCache初始化是在queue的构造函数里
childrenCache new ChildrenCache(client, queuePath)其start方法会调用
private final CuratorWatcher watcher new CuratorWatcher()
{Overridepublic void process(WatchedEvent event) throws Exception{if ( !isClosed.get() ){sync(true);}}
};private final BackgroundCallback callback new BackgroundCallback(){Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() KeeperException.Code.OK.intValue() ){setNewChildren(event.getChildren());}}};void start() throws Exception{sync(true);}private synchronized void sync(boolean watched) throws Exception{if ( watched ){//走这里client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);}else{client.getChildren().inBackground(callback).forPath(path);}}
这里先把代码都贴上看到内部定义了一个watcher和callback。这里inBackground就是watcher到事件使用callback进行处理最后是调用到setNewChildren方法
private synchronized void setNewChildren(ListString newChildren)
{if ( newChildren ! null ){Data currentData children.get();//将数据设置到children变量里消息版本1children.set(new Data(newChildren, currentData.version 1));//notifyAll() 等待线程获取消息notifyFromCallback();}
}这里有引入了一个children变量然后将数据设置到了该变量里。
private final AtomicReferenceData children new AtomicReferenceData(new Data(Lists.StringnewArrayList(), 0));children其实是线程间通信一个共享数据容器变量。这里设置了数据然后具体的数据消费在下一步。
2、线程池里丢了个任务去执行runLoop();方法。
回到DistributedQueue.start的第二步执行runLoop()方法看名字就应该知道了一直轮询获取消息。
还是来看代码吧
private void runLoop()
{long currentVersion -1;long maxWaitMs -1;//while一直轮询while ( state.get() State.STARTED ){try{//从childrenCache里获取数据ChildrenCache.Data data (maxWaitMs 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);currentVersion data.version;ListString children Lists.newArrayList(data.children);sortChildren(children); // makes sure items are processed in the correct orderif ( children.size() 0 ){maxWaitMs getDelay(children.get(0));if ( maxWaitMs 0 ){continue;}}else{continue;}/**处理数据 这里取出消息后会删除节点然后使用serializer反序列化节点数据调用consumer.consumeMessage来处理消息**/processChildren(children, currentVersion);}}}
}这里获取数据使用了childrenCache.blockingNextGetData
synchronized Data blockingNextGetData(long startVersion, long maxWait, TimeUnit unit) throws InterruptedException
{long startMs System.currentTimeMillis();boolean hasMaxWait (unit ! null);long maxWaitMs hasMaxWait ? unit.toMillis(maxWait) : -1;//数据版本没变一直wait等待while ( startVersion children.get().version ){if ( hasMaxWait ){long elapsedMs System.currentTimeMillis() - startMs;long thisWaitMs maxWaitMs - elapsedMs;if ( thisWaitMs 0 ){break;}wait(thisWaitMs);}else{wait();}}return children.get();
}这里就有wait阻塞等消息当消息来时候会被唤醒。
其它类型队列
curator对优先队列(DistributedPriorityQueue)、延迟队列(DistributedDelayQueue)都有对应的实现有兴趣的自己看吧。