Skip to content

Commit

Permalink
feat(AsyncObserver): Improve NotificationCenter speed and usability #…
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Feb 6, 2024
1 parent ba2c6cf commit 38806dd
Show file tree
Hide file tree
Showing 16 changed files with 367 additions and 49 deletions.
1 change: 1 addition & 0 deletions Foundation/Foundation_vs160.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@
<ClInclude Include="include\Poco\AccessExpireStrategy.h" />
<ClInclude Include="include\Poco\ActiveDispatcher.h" />
<ClInclude Include="include\Poco\ActiveMethod.h" />
<ClInclude Include="include\Poco\AsyncObserver.h" />
<ClInclude Include="include\Poco\ActiveResult.h" />
<ClInclude Include="include\Poco\ActiveRunnable.h" />
<ClInclude Include="include\Poco\ActiveStarter.h" />
Expand Down
3 changes: 3 additions & 0 deletions Foundation/Foundation_vs160.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,9 @@
<ClInclude Include="include\Poco\NObserver.h">
<Filter>Notifications\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\AsyncObserver.h">
<Filter>Notifications\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Notification.h">
<Filter>Notifications\Header Files</Filter>
</ClInclude>
Expand Down
1 change: 1 addition & 0 deletions Foundation/Foundation_vs170.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,7 @@
<ClInclude Include="include\Poco\AccessExpireStrategy.h" />
<ClInclude Include="include\Poco\ActiveDispatcher.h" />
<ClInclude Include="include\Poco\ActiveMethod.h" />
<ClInclude Include="include\Poco\AsyncObserver.h" />
<ClInclude Include="include\Poco\ActiveResult.h" />
<ClInclude Include="include\Poco\ActiveRunnable.h" />
<ClInclude Include="include\Poco\ActiveStarter.h" />
Expand Down
3 changes: 3 additions & 0 deletions Foundation/Foundation_vs170.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,9 @@
<ClInclude Include="src\pcre2_intmodedep.h">
<Filter>RegularExpression\PCRE2 Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\AsyncObserver.h">
<Filter>Notifications\Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="src\pocomsg.rc">
Expand Down
24 changes: 24 additions & 0 deletions Foundation/include/Poco/AbstractObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,33 @@ class Foundation_API AbstractObserver
virtual bool equals(const AbstractObserver& observer) const = 0;
virtual bool accepts(Notification* pNf, const char* pName = 0) const = 0;
virtual AbstractObserver* clone() const = 0;

virtual void start();
/// No-op.
/// This method can be implemented by inheriting classes which require
/// explicit start in order to begin processing notifications.

virtual void disable() = 0;

virtual int backlog() const;
/// Returns number of queued messages that this Observer has.
/// For non-active (synchronous) observers, always returns zero.
};

//
// inlines
//

inline void AbstractObserver::start()
{
}


inline int AbstractObserver::backlog() const
{
return 0;
}


} // namespace Poco

Expand Down
197 changes: 197 additions & 0 deletions Foundation/include/Poco/AsyncObserver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
//
// AsyncObserver.h
//
// Library: Foundation
// Package: Notifications
// Module: AsyncObserver
//
// Definition of the AsyncObserver class template.
//
// Copyright (c) 2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//


#ifndef Foundation_ActiveObserver_INCLUDED
#define Foundation_ActiveObserver_INCLUDED


#include "Poco/Foundation.h"
#include "Poco/NObserver.h"
#include "Poco/Thread.h"
#include "Poco/Stopwatch.h"
#include "Poco/Debugger.h"
#include "Poco/Format.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/NotificationQueue.h"


namespace Poco {


template <class C, class N>
class AsyncObserver: public NObserver<C, N>
/// AsyncObserver notifies subscribers in a dedicated thread (as opposed
/// to (N)Observer classes, which notify subscribers synchronously).
/// In order to become active and process notifications, the start()
/// method must be called.
///
/// This class is meant to be used with the NotificationCenter only.
/// Notification processing thread can be started only once, and copying
/// should be done before `start()` is called.
{
public:
using Type = AsyncObserver<C, N>;
using Matcher = bool (C::*)(const std::string&);

AsyncObserver() = delete;

AsyncObserver(C& object, Handler handler, Matcher matcher = nullptr):
NObserver(object, handler),
_matcher(matcher),
_ra(*this, &AsyncObserver::dequeue),
_started(false),
_done(false)
{
}

AsyncObserver(const AsyncObserver& observer):
NObserver(observer),
_matcher(observer._matcher),
_ra(*this, &AsyncObserver::dequeue),
_started(false),
_done(false)
{
poco_assert(observer._nq.size() == 0);
}

~AsyncObserver()
{
disable();
}

AsyncObserver& operator = (const AsyncObserver& observer)
{
if (&observer != this)
{
poco_assert(observer._nq.size() == 0);
_pObject = observer._pObject;
_handler = observer._handler;
_matcher = observer._matcher;
_started = false;
_done =false;
}
return *this;
}

void notify(Notification* pNf) const
{
Poco::ScopedLockWithUnlock l(_mutex);
if (_pObject)
{
if (!_matcher || (_pObject->*_matcher)(pNf->name()))
{
l.unlock();
N* pCastNf = dynamic_cast<N*>(pNf);
if (pCastNf)
{
NotificationPtr ptr(pCastNf, true);
_nq.enqueueNotification(ptr);
}
}
}
}

virtual bool equals(const AbstractObserver& abstractObserver) const
{
const AsyncObserver* pObs = dynamic_cast<const AsyncObserver*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_method == _method && pObs->_matcher == _matcher;
}

virtual bool accepts(Notification* pNf, const char* pName = nullptr) const
{
if (!dynamic_cast<N*>(pNf)) return false;

Poco::ScopedLock l(_mutex);
return _pObject && _matcher && (_pObject->*_matcher)(pNf->name());
}

virtual AbstractObserver* clone() const
{
return new AsyncObserver(*this);
}

virtual void start()
{
Poco::ScopedLock l(_mutex);
if (_started)
{
throw Poco::InvalidAccessException(
Poco::format("thread already started %s", poco_src_loc));
}

_thread.start(_ra);
Poco::Stopwatch sw;
sw.start();
while (!_started)
{
if (sw.elapsedSeconds() > 5)
throw Poco::TimeoutException(poco_src_loc);
Thread::sleep(100);
}
}

virtual void disable()
{
if (!_started.exchange(false)) return;
_nq.wakeUpAll();
while (!_done) Thread::sleep(100);
_thread.join();
NObserver::disable();
}

virtual int backlog() const
{
return _nq.size();
}

private:
void dequeue()
{
NotificationPtr pNf;
_started = true;
while (pNf = _nq.waitDequeueNotification())
{
try
{
_mutex.lock();
try
{
(_pObject->*_method)(pNf);
_mutex.unlock();
}
catch (Poco::SystemException&) { break; }
catch (...) { _mutex.unlock(); }
} catch (Poco::SystemException&) {}
}
_done = true;
_started = false;
}

using Adapter = RunnableAdapter<AsyncObserver<C, N>>;

Thread _thread;
Matcher _matcher;
mutable NotificationQueue _nq;
Adapter _ra;
std::atomic<bool> _started;
std::atomic<bool> _done;
};


} // namespace Poco


#endif // Foundation_ActiveObserver_INCLUDED
26 changes: 14 additions & 12 deletions Foundation/include/Poco/NObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
// Library: Foundation
// Package: Notifications
// Module: NotificationCenter
// Module: NObserver
//
// Definition of the NObserver class template.
//
Expand Down Expand Up @@ -43,8 +43,12 @@ class NObserver: public AbstractObserver
/// management.
{
public:
typedef AutoPtr<N> NotificationPtr;
typedef void (C::*Callback)(const NotificationPtr&);
using Type = NObserver<C, N>;
using NotificationPtr = AutoPtr<N>;
using Callback = void (C::*)(const NotificationPtr&);
using Handler = Callback;

NObserver() = delete;

NObserver(C& object, Callback method):
_pObject(&object),
Expand Down Expand Up @@ -73,7 +77,7 @@ class NObserver: public AbstractObserver
return *this;
}

void notify(Notification* pNf) const
virtual void notify(Notification* pNf) const
{
Poco::Mutex::ScopedLock lock(_mutex);

Expand All @@ -88,32 +92,30 @@ class NObserver: public AbstractObserver
}
}

bool equals(const AbstractObserver& abstractObserver) const
virtual bool equals(const AbstractObserver& abstractObserver) const
{
const NObserver* pObs = dynamic_cast<const NObserver*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_method == _method;
}

bool accepts(Notification* pNf, const char* pName = 0) const
virtual bool accepts(Notification* pNf, const char* pName = nullptr) const
{
return dynamic_cast<N*>(pNf) && (!pName || pNf->name() == pName);
}

AbstractObserver* clone() const
virtual AbstractObserver* clone() const
{
return new NObserver(*this);
}

void disable()
virtual void disable()
{
Poco::Mutex::ScopedLock lock(_mutex);

_pObject = 0;
_pObject = nullptr;
}

private:
NObserver();

protected:
C* _pObject;
Callback _method;
mutable Poco::Mutex _mutex;
Expand Down
3 changes: 2 additions & 1 deletion Foundation/include/Poco/Notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Foundation_API Notification: public RefCountedObject
public:
using Ptr = AutoPtr<Notification>;

Notification();
Notification(const std::string& name = ""s);
/// Creates the notification.

virtual std::string name() const;
Expand All @@ -46,6 +46,7 @@ class Foundation_API Notification: public RefCountedObject

protected:
virtual ~Notification();
std::unique_ptr<std::string> _pName;
};


Expand Down
9 changes: 9 additions & 0 deletions Foundation/include/Poco/NotificationCenter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class Foundation_API NotificationCenter
/// }
{
public:

NotificationCenter();
/// Creates the NotificationCenter.

Expand Down Expand Up @@ -120,6 +121,12 @@ class Foundation_API NotificationCenter
std::size_t countObservers() const;
/// Returns the number of registered observers.

int backlog() const;
/// Returns the sum of queued notifications
/// for all observers (applies only to active observers,
/// regular observers post notifications syncronously and
/// never have a backlog).

static NotificationCenter& defaultCenter();
/// Returns a reference to the default
/// NotificationCenter.
Expand All @@ -128,6 +135,8 @@ class Foundation_API NotificationCenter
typedef SharedPtr<AbstractObserver> AbstractObserverPtr;
typedef std::vector<AbstractObserverPtr> ObserverList;

ObserverList observersToNotify(Notification::Ptr pNotification) const;

ObserverList _observers;
mutable Mutex _mutex;
};
Expand Down
2 changes: 1 addition & 1 deletion Foundation/include/Poco/RunnableAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RunnableAdapter: public Runnable
/// Usage:
/// RunnableAdapter<MyClass> ra(myObject, &MyObject::doSomething));
/// Thread thr;
/// thr.Start(ra);
/// thr.start(ra);
///
/// For using a freestanding or static member function as a thread
/// target, please see the ThreadTarget class.
Expand Down
Loading

0 comments on commit 38806dd

Please sign in to comment.