家用电脑如何做网站服务器,建立网站流程图,定制系统软件开发,店面设计多少钱一个平方1、介绍
PocoPocoPoco 中的通知#xff0c;是消息源通过中间载体将消息发送给观察者#xff0c;通知可以分为 同步通知和异步通知。 下图是同步通知#xff0c;消息发送流程#xff1a;
2.同步通知
2.1 消息
class Notification: public RefCountedObject
{
public:ty…1、介绍
PocoPocoPoco 中的通知是消息源通过中间载体将消息发送给观察者通知可以分为 同步通知和异步通知。 下图是同步通知消息发送流程
2.同步通知
2.1 消息
class Notification: public RefCountedObject
{
public:typedef AutoPtrNotification Ptr;Notification();virtual std::string name() const;
protected:virtual ~Notification();
};2.2 消息的发送者 source
类 NotificationCenter 类扮演了一个消息源的角色。下面是它的定义
class NotificationCenter
{
public:NotificationCenter();/// Creates the NotificationCenter.~NotificationCenter();/// Destroys the NotificationCenter.void addObserver(const AbstractObserver observer);/// Registers an observer with the NotificationCenter./// Usage:/// ObserverMyClass, MyNotification obs(*this, MyClass::handleNotification);/// notificationCenter.addObserver(obs);////// Alternatively, the NObserver template class can be used instead of Observer.void removeObserver(const AbstractObserver observer);/// Unregisters an observer with the NotificationCenter.bool hasObserver(const AbstractObserver observer) const;/// Returns true if the observer is registered with this NotificationCenter.void postNotification(Notification::Ptr pNotification);/// Posts a notification to the NotificationCenter./// The NotificationCenter then delivers the notification/// to all interested observers./// If an observer throws an exception, dispatching terminates/// and the exception is rethrown to the caller./// Ownership of the notification object is claimed and the/// notification is released before returning. Therefore,/// a call like/// notificationCenter.postNotification(new MyNotification);/// does not result in a memory leak.bool hasObservers() const;/// Returns true iff there is at least one registered observer.////// Can be used to improve performance if an expensive notification/// shall only be created and posted if there are any observers.std::size_t countObservers() const;/// Returns the number of registered observers.static NotificationCenter defaultCenter();/// Returns a reference to the default/// NotificationCenter.private:typedef SharedPtrAbstractObserver AbstractObserverPtr;typedef std::vectorAbstractObserverPtr ObserverList;ObserverList _observers;mutable Mutex _mutex;
};通过定义可以看出它是目标对象的集合std::vectorAbstractObserverPtr _observers。 通过调用函数 addObserver(const AbstractObserver observer)可以完成目标对象 的注册过程。调用函数 removeObserver()则可以完成注销。函数 postNotification 是一个消息传递的过程其定义如下
void NotificationCenter::postNotification(Notification::Ptr pNotification)
{poco_check_ptr (pNotification);ScopedLockWithUnlockMutex lock(_mutex);ObserverList observersToNotify(_observers);lock.unlock();for (ObserverList::iterator it observersToNotify.begin(); it ! observersToNotify.end(); it){(*it)-notify(pNotification);}
}
可以看到它是向所有观察者发送消息。这里为了避免长期占用_observers在发送时复制了一份。
2.3 消息的接收者 target
class AbstractObserver
{
public:AbstractObserver();AbstractObserver(const AbstractObserver observer);virtual ~AbstractObserver();AbstractObserver operator (const AbstractObserver observer);virtual void notify(Notification* pNf) const 0;virtual bool equals(const AbstractObserver observer) const 0;virtual bool accepts(Notification* pNf, const char* pName 0) const 0;virtual AbstractObserver* clone() const 0;virtual void disable() 0;
};所有接收类都需要继承AbstractObserver
Observer类
template class C, class N
class Observer: public AbstractObserver
{
public:typedef void (C::*Callback)(N*);Observer(C object, Callback method): _pObject(object), _method(method){}Observer(const Observer observer):AbstractObserver(observer),_pObject(observer._pObject), _method(observer._method){}~Observer(){}Observer operator (const Observer observer){if (observer ! this){_pObject observer._pObject;_method observer._method;}return *this;}void notify(Notification* pNf) const{Poco::Mutex::ScopedLock lock(_mutex);if (_pObject){N* pCastNf dynamic_castN*(pNf);if (pCastNf){pCastNf-duplicate();(_pObject-*_method)(pCastNf);}}}bool equals(const AbstractObserver abstractObserver) const{const Observer* pObs dynamic_castconst Observer*(abstractObserver);return pObs pObs-_pObject _pObject pObs-_method _method;}bool accepts(Notification* pNf) const{return dynamic_castN*(pNf) ! 0;}AbstractObserver* clone() const{return new Observer(*this);}void disable(){Poco::Mutex::ScopedLock lock(_mutex);_pObject 0;}private:Observer();C* _pObject;Callback _method;mutable Poco::Mutex _mutex;
};Observer 中存在一个类实例对象的指针_pObject以及对应函数入口地址_method。
NObserver类
template class C, class N
class NObserver: public AbstractObserver
{
public:typedef AutoPtrN NotificationPtr;typedef void (C::*Callback)(const NotificationPtr);NObserver(C object, Callback method): _pObject(object), _method(method){}NObserver(const NObserver observer):AbstractObserver(observer),_pObject(observer._pObject), _method(observer._method){}~NObserver(){}NObserver operator (const NObserver observer){if (observer ! this){_pObject observer._pObject;_method observer._method;}return *this;}void notify(Notification* pNf) const{Poco::Mutex::ScopedLock lock(_mutex);if (_pObject){N* pCastNf dynamic_castN*(pNf);if (pCastNf){NotificationPtr ptr(pCastNf, true);(_pObject-*_method)(ptr);}}}bool equals(const AbstractObserver abstractObserver) const{const NObserver* pObs dynamic_castconst NObserver*(abstractObserver);return pObs pObs-_pObject _pObject pObs-_method _method;}bool accepts(Notification* pNf) const{return dynamic_castN*(pNf) ! 0;}AbstractObserver* clone() const{return new NObserver(*this);}void disable(){Poco::Mutex::ScopedLock lock(_mutex);_pObject 0;}private:NObserver();C* _pObject;Callback _method;mutable Poco::Mutex _mutex;
};2.4 使用例子
#include Poco/NotificationCenter.h
#include Poco/Notification.h
#include Poco/Observer.h
#include Poco/NObserver.h
#include Poco/AutoPtr.h
#include iostream
using Poco::NotificationCenter;
using Poco::Notification;
using Poco::Observer;
using Poco::NObserver;
using Poco::AutoPtr;
class BaseNotification: public Notification
{
};
class SubNotification: public BaseNotification
{
};class Target
{
public:void handleBase(BaseNotification* pNf){std::cout handleBase: pNf-name() std::endl;pNf-release(); // we got ownership, so we must release}void handleSub(const AutoPtrSubNotification pNf){std::cout handleSub: pNf-name() std::endl;}
};int main(int argc, char** argv)
{NotificationCenter nc;Target target;nc.addObserver(ObserverTarget, BaseNotification(target, Target::handleBase));nc.addObserver(NObserverTarget, SubNotification(target, Target::handleSub));nc.postNotification(new BaseNotification);nc.postNotification(new SubNotification);nc.removeObserver(ObserverTarget, BaseNotification(target, Target::handleBase));nc.removeObserver(NObserverTarget, SubNotification(target, Target::handleSub));return 0;
}总结 类似于观察者模式通过创建观察者增加到NotificationCenter通知中心遍历所有观察者调用到观察者的notify函数然后回调到用户的函数。
3. 异步通知
3.1 介绍
Poco 中的异步通知是通过 NotificationQueue 类来实现的同它功能类似还有类PriorityNotificationQueue 和 TimedNotificationQueue。不同的是 PriorityNotificationQueue类中对消息分了优先级对优先级高的消息优先处理而 TimedNotificationQueue 对消息给了时间戳时间戳早的优先处理而和其压入队列的时间无关。所以接下来我们主要关注NotificationQueue 的实现。
class Foundation_API NotificationQueue/// A NotificationQueue object provides a way to implement asynchronous/// notifications. This is especially useful for sending notifications/// from one thread to another, for example from a background thread to /// the main (user interface) thread. /// /// The NotificationQueue can also be used to distribute work from/// a controlling thread to one or more worker threads. Each worker thread/// repeatedly calls waitDequeueNotification() and processes the/// returned notification. Special care must be taken when shutting/// down a queue with worker threads waiting for notifications./// The recommended sequence to shut down and destroy the queue is to/// 1. set a termination flag for every worker thread/// 2. call the wakeUpAll() method/// 3. join each worker thread/// 4. destroy the notification queue.
{
public:NotificationQueue();/// Creates the NotificationQueue.~NotificationQueue();/// Destroys the NotificationQueue.void enqueueNotification(Notification::Ptr pNotification);/// Enqueues the given notification by adding it to/// the end of the queue (FIFO)./// The queue takes ownership of the notification, thus/// a call like/// notificationQueue.enqueueNotification(new MyNotification);/// does not result in a memory leak.void enqueueUrgentNotification(Notification::Ptr pNotification);/// Enqueues the given notification by adding it to/// the front of the queue (LIFO). The event therefore gets processed/// before all other events already in the queue./// The queue takes ownership of the notification, thus/// a call like/// notificationQueue.enqueueUrgentNotification(new MyNotification);/// does not result in a memory leak.Notification* dequeueNotification();/// Dequeues the next pending notification./// Returns 0 (null) if no notification is available./// The caller gains ownership of the notification and/// is expected to release it when done with it.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.Notification* waitDequeueNotification();/// Dequeues the next pending notification./// If no notification is available, waits for a notification/// to be enqueued. /// The caller gains ownership of the notification and/// is expected to release it when done with it./// This method returns 0 (null) if wakeUpWaitingThreads()/// has been called by another thread.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.Notification* waitDequeueNotification(long milliseconds);/// Dequeues the next pending notification./// If no notification is available, waits for a notification/// to be enqueued up to the specified time./// Returns 0 (null) if no notification is available./// The caller gains ownership of the notification and/// is expected to release it when done with it.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.void dispatch(NotificationCenter notificationCenter);/// Dispatches all queued notifications to the given/// notification center.void wakeUpAll();/// Wakes up all threads that wait for a notification.bool empty() const;/// Returns true iff the queue is empty.int size() const;/// Returns the number of notifications in the queue.void clear();/// Removes all notifications from the queue.bool hasIdleThreads() const; /// Returns true if the queue has at least one thread waiting /// for a notification.static NotificationQueue defaultQueue();/// Returns a reference to the default/// NotificationQueue.protected:Notification::Ptr dequeueOne();private:typedef std::dequeNotification::Ptr NfQueue;struct WaitInfo{Notification::Ptr pNf;Event nfAvailable;};typedef std::dequeWaitInfo* WaitQueue;NfQueue _nfQueue;WaitQueue _waitQueue;mutable FastMutex _mutex;
};
定义可以看到 NotificationQueue 类管理了两个 deque 容器。其中一个是 WaitInfo对象的 deque另一个是 Notification 对象的 deque。而 WaitInfo 一对一的对应了一个消息对象 pNf 和事件对象 nfAvailable毫无疑问 Event 对象是用来同步多线程的。
Notification* NotificationQueue::waitDequeueNotification()
{Notification::Ptr pNf;WaitInfo* pWI 0;{FastMutex::ScopedLock lock(_mutex);pNf dequeueOne();if (pNf) return pNf.duplicate();pWI new WaitInfo;_waitQueue.push_back(pWI);}pWI-nfAvailable.wait();pNf pWI-pNf;delete pWI;return pNf.duplicate();
}消费者线程首先从 Notification 对象的 deque 中获取消息如果消息获取不为空则接返回处理如果消息为空则创建一个新的 WaitInfo 对象并压入 WaitInfo 对象的deque。 消费者线程开始等待直到生产者通知有消息的存在然后再从 WaitInfo 对象中取出消息返回处理。当消费者线程能从 Notification 对象的 deque 中获取到消息时说明消费者处理消息的速度要比生成者低反之则说明消费者处理消息的速度要比生成者高。
void NotificationQueue::enqueueNotification(Notification::Ptr pNotification)
{poco_check_ptr (pNotification);FastMutex::ScopedLock lock(_mutex);if (_waitQueue.empty()){_nfQueue.push_back(pNotification);}else{WaitInfo* pWI _waitQueue.front();_waitQueue.pop_front();pWI-pNf pNotification;pWI-nfAvailable.set();}
}生产者线程首先判断 WaitInfo 对象的 deque 是否为空如果不为空说明存在消费者线程等待则从 deque 中获取一个 WaitInfo 对象灌入 Notification 消息释放信号量激活消费者线程而如果为空说明目前说有的消费者线程都在工作则把消息暂时存入Notification 对象的 deque等待消费者线程有空时处理。
3.2 例子
#include Poco/Notification.h
#include Poco/NotificationQueue.h
#include Poco/ThreadPool.h
#include Poco/Runnable.h
#include Poco/AutoPtr.h
using Poco::Notification;
using Poco::NotificationQueue;
using Poco::ThreadPool;
using Poco::Runnable;
using Poco::AutoPtr;
class WorkNotification: public Notification
{public:WorkNotification(int data): _data(data) {}int data() const{return _data;}private:int _data;
};class Worker: public Runnable
{
public:Worker(NotificationQueue queue): _queue(queue) {}void run(){AutoPtrNotification pNf(_queue.waitDequeueNotification());while (pNf){WorkNotification* pWorkNf dynamic_castWorkNotification*(pNf.get());if (pWorkNf){// do some work}pNf _queue.waitDequeueNotification();}}private:NotificationQueue _queue;
};int main(int argc, char** argv)
{NotificationQueue queue;Worker worker1(queue); // create worker threadsWorker worker2(queue);ThreadPool::defaultPool().start(worker1); // start workersThreadPool::defaultPool().start(worker2);// create some workfor (int i 0; i 100; i){queue.enqueueNotification(new WorkNotification(i));}while (!queue.empty()) // wait until all work is donePoco::Thread::sleep(100);queue.wakeUpAll(); // tell workers theyre doneThreadPool::defaultPool().joinAll();return 0;
}总结 生产者入队列消费者出队列实现消息的异步处理。
4、参考链接
https://pocoproject.org/slides/090-NotificationsEvents.pdf https://blog.csdn.net/arau_sh/article/details/8673459