From 86a4f0045e478feb9383601d9d00693bfb110825 Mon Sep 17 00:00:00 2001 From: Aleksandar Fabijanic Date: Tue, 26 Jul 2022 13:54:56 +0200 Subject: [PATCH] Fix/posix sleep (#3705) * fix(Thread_POSIX): sleep() poor performance #3703 * chore(vscode): add file associations * fix(TaskManager): waits for all threads in the ThreadPool #3704 * fix(Thread): call std::this_thread::sleep_for() to sleep #3703 * fix(PollSet): wakeup fd is never read #3708 * feat(Thread): Add Thread::set/getAffinity() #3709 * doc(Thread): Thread::trySleep() assertion #3710 * fix(PollSet): wakeup fd is never read (windows portion and some other optimizations) #3708 * feat(SocketReactor): improvements #3713 * chore(ThreadTest): add missing include * fix(PollSet): wakeup fd is never read #3708 * fix(Any): #3682 #3683 #3692 #3712 * fix(mingw): lowercase winsock2 and iphlpapi to allow cross compile #3711 * feat(Thread): Add Thread::set/getAffinity() #3709 * chore(SocketReactor): one-liners inlined, removed redundant try/catch in dospatch, remove unused onBusy() * feat(SocketReactor): add socket to ErrorNotification * fix(SocketReactor): pollTimeout assignment and ConnectorTest leak --- .vscode/settings.json | 5 +- Foundation/include/Poco/Any.h | 2 +- Foundation/include/Poco/TaskManager.h | 20 ++- Foundation/include/Poco/Thread.h | 36 +++- Foundation/include/Poco/Thread_POSIX.h | 4 +- Foundation/include/Poco/Thread_VX.h | 15 +- Foundation/include/Poco/Thread_WIN32.h | 9 +- Foundation/include/Poco/Thread_WINCE.h | 21 ++- Foundation/include/Poco/UnWindows.h | 6 +- Foundation/src/Task.cpp | 3 +- Foundation/src/TaskManager.cpp | 28 ++- Foundation/src/Thread_POSIX.cpp | 94 +++++----- Foundation/src/Thread_VX.cpp | 25 --- Foundation/src/Thread_WIN32.cpp | 51 ++++++ Foundation/testsuite/src/TaskManagerTest.cpp | 32 +++- Foundation/testsuite/src/ThreadTest.cpp | 18 ++ Foundation/testsuite/src/ThreadTest.h | 1 + Net/include/Poco/Net/SocketNotification.h | 7 +- Net/include/Poco/Net/SocketReactor.h | 166 ++++++++++++++---- Net/src/PollSet.cpp | 38 ++-- Net/src/SocketNotification.cpp | 10 ++ Net/src/SocketReactor.cpp | 174 +++++++++---------- Net/testsuite/src/PollSetTest.cpp | 7 +- Net/testsuite/src/SocketConnectorTest.cpp | 25 ++- Net/testsuite/src/SocketReactorTest.cpp | 29 +++- Net/testsuite/src/SocketReactorTest.h | 1 + 26 files changed, 555 insertions(+), 272 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index bc8e0af140..f9d6c408b1 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -91,7 +91,10 @@ "__bits": "cpp", "variant": "cpp", "condition_variable": "cpp", - "valarray": "cpp" + "valarray": "cpp", + "strstream": "cpp", + "future": "cpp", + "shared_mutex": "cpp" }, "files.exclude": { "**/.dep": true, diff --git a/Foundation/include/Poco/Any.h b/Foundation/include/Poco/Any.h index 2321937ee3..bfe1f6f466 100644 --- a/Foundation/include/Poco/Any.h +++ b/Foundation/include/Poco/Any.h @@ -66,7 +66,7 @@ union Placeholder public: struct Size { - static const unsigned int value = SizeV; + enum { value = SizeV }; }; Placeholder(const Placeholder&) = delete; diff --git a/Foundation/include/Poco/TaskManager.h b/Foundation/include/Poco/TaskManager.h index 6b7cd20900..4499ceceff 100644 --- a/Foundation/include/Poco/TaskManager.h +++ b/Foundation/include/Poco/TaskManager.h @@ -50,13 +50,17 @@ class Foundation_API TaskManager using TaskPtr = AutoPtr; using TaskList = std::list; - TaskManager(); - /// Creates the TaskManager, using the - /// default ThreadPool. + TaskManager(const std::string& name = "", + int minCapacity = 2, + int maxCapacity = 16, + int idleTime = 60, + int stackSize = POCO_THREAD_STACK_SIZE); + /// Creates the TaskManager. TaskManager(ThreadPool& pool); /// Creates the TaskManager, using the - /// given ThreadPool. + /// given ThreadPool (should be used + /// by this TaskManager exclusively). ~TaskManager(); /// Destroys the TaskManager. @@ -110,11 +114,15 @@ class Foundation_API TaskManager void taskFailed(Task* pTask, const Exception& exc); private: + using MutexT = FastMutex; + using ScopedLockT = MutexT::ScopedLock; + ThreadPool& _threadPool; + bool _ownPool; TaskList _taskList; Timestamp _lastProgressNotification; NotificationCenter _nc; - mutable FastMutex _mutex; + mutable MutexT _mutex; friend class Task; }; @@ -125,7 +133,7 @@ class Foundation_API TaskManager // inline int TaskManager::count() const { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); return (int) _taskList.size(); } diff --git a/Foundation/include/Poco/Thread.h b/Foundation/include/Poco/Thread.h index 891773f85e..c7e51e5979 100644 --- a/Foundation/include/Poco/Thread.h +++ b/Foundation/include/Poco/Thread.h @@ -21,6 +21,8 @@ #include "Poco/Foundation.h" #include "Poco/Event.h" #include "Poco/Mutex.h" +#include +#include #if defined(POCO_OS_FAMILY_WINDOWS) @@ -207,6 +209,9 @@ class Foundation_API Thread: private ThreadImpl /// wakeUp() before calling trySleep() will prevent the next /// trySleep() call to actually suspend the thread (which, in /// some scenarios, may be desirable behavior). + /// + /// Note that, unlike Thread::sleep(), this function can only + /// be succesfully called from a thread started as Poco::Thread. void wakeUp(); /// Wakes up the thread which is in the state of interruptible @@ -231,6 +236,17 @@ class Foundation_API Thread: private ThreadImpl static long currentOsTid(); /// Returns the operating system specific thread ID for the current thread. + bool setAffinity(int coreId); + /// Sets the thread affinity to the coreID. + /// Returns true if succesful. + /// Returns false if not succesful or not + /// implemented. + + int getAffinity() const; + /// Returns the thread affinity. + /// Negative value means the thread has + /// no CPU core affinity. + protected: ThreadLocalStorage& tls(); /// Returns a reference to the thread's local storage. @@ -317,15 +333,15 @@ inline bool Thread::isRunning() const } -inline void Thread::sleep(long milliseconds) +inline void Thread::yield() { - sleepImpl(milliseconds); + yieldImpl(); } -inline void Thread::yield() +inline void Thread::sleep(long milliseconds) { - yieldImpl(); + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); } @@ -381,6 +397,18 @@ inline long Thread::currentOsTid() return currentOsTidImpl(); } +inline bool Thread::setAffinity(int coreId) +{ + return setAffinityImpl(coreId); +} + + +inline int Thread::getAffinity() const +{ + return getAffinityImpl(); +} + + } // namespace Poco diff --git a/Foundation/include/Poco/Thread_POSIX.h b/Foundation/include/Poco/Thread_POSIX.h index cd3b1e62d1..4d8ef6a5ea 100644 --- a/Foundation/include/Poco/Thread_POSIX.h +++ b/Foundation/include/Poco/Thread_POSIX.h @@ -80,11 +80,12 @@ class Foundation_API ThreadImpl void joinImpl(); bool joinImpl(long milliseconds); bool isRunningImpl() const; - static void sleepImpl(long milliseconds); static void yieldImpl(); static ThreadImpl* currentImpl(); static TIDImpl currentTidImpl(); static long currentOsTidImpl(); + bool setAffinityImpl(int coreID); + int getAffinityImpl() const; protected: static void* runnableEntry(void* pThread); @@ -146,6 +147,7 @@ class Foundation_API ThreadImpl bool started; bool joined; std::string name; + int affinity; mutable FastMutex mutex; }; diff --git a/Foundation/include/Poco/Thread_VX.h b/Foundation/include/Poco/Thread_VX.h index d44c869bdb..f9af4542c8 100644 --- a/Foundation/include/Poco/Thread_VX.h +++ b/Foundation/include/Poco/Thread_VX.h @@ -90,11 +90,12 @@ class Foundation_API ThreadImpl void joinImpl(); bool joinImpl(long milliseconds); bool isRunningImpl() const; - static void sleepImpl(long milliseconds); static void yieldImpl(); static ThreadImpl* currentImpl(); static TIDImpl currentTidImpl(); static long currentOsTidImpl(); + bool setAffinityImpl(int coreID); + int getAffinityImpl() const; protected: static void runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int); @@ -171,6 +172,18 @@ inline ThreadImpl::TIDImpl ThreadImpl::tidImpl() const } +inline bool ThreadImpl::setAffinityImpl(int) +{ + return false; +} + + +inline int ThreadImpl::getAffinityImpl() const +{ + return -1; +} + + } // namespace Poco diff --git a/Foundation/include/Poco/Thread_WIN32.h b/Foundation/include/Poco/Thread_WIN32.h index 4967d60a66..3aa34277f9 100644 --- a/Foundation/include/Poco/Thread_WIN32.h +++ b/Foundation/include/Poco/Thread_WIN32.h @@ -75,11 +75,12 @@ class Foundation_API ThreadImpl void joinImpl(); bool joinImpl(long milliseconds); bool isRunningImpl() const; - static void sleepImpl(long milliseconds); static void yieldImpl(); static ThreadImpl* currentImpl(); static TIDImpl currentTidImpl(); static long currentOsTidImpl(); + bool setAffinityImpl(int); + int getAffinityImpl() const; protected: #if defined(_DLL) @@ -155,12 +156,6 @@ inline int ThreadImpl::getMaxOSPriorityImpl(int /* policy */) } -inline void ThreadImpl::sleepImpl(long milliseconds) -{ - Sleep(DWORD(milliseconds)); -} - - inline void ThreadImpl::yieldImpl() { Sleep(0); diff --git a/Foundation/include/Poco/Thread_WINCE.h b/Foundation/include/Poco/Thread_WINCE.h index e4996c92d5..e998e8d320 100644 --- a/Foundation/include/Poco/Thread_WINCE.h +++ b/Foundation/include/Poco/Thread_WINCE.h @@ -75,11 +75,12 @@ class Foundation_API ThreadImpl void joinImpl(); bool joinImpl(long milliseconds); bool isRunningImpl() const; - static void sleepImpl(long milliseconds); static void yieldImpl(); static ThreadImpl* currentImpl(); static TIDImpl currentTidImpl(); static long currentOsTidImpl(); + bool setAffinityImpl(int); + int getAffinityImpl() const; protected: static DWORD WINAPI runnableEntry(LPVOID pThread); @@ -151,12 +152,6 @@ inline int ThreadImpl::getMaxOSPriorityImpl(int /* policy */) } -inline void ThreadImpl::sleepImpl(long milliseconds) -{ - Sleep(DWORD(milliseconds)); -} - - inline void ThreadImpl::yieldImpl() { Sleep(0); @@ -181,6 +176,18 @@ inline ThreadImpl::TIDImpl ThreadImpl::tidImpl() const } +inline bool ThreadImpl::setAffinityImpl(int) +{ + return false; +} + + +inline int ThreadImpl::getAffinityImpl() const +{ + return -1; +} + + } // namespace Poco diff --git a/Foundation/include/Poco/UnWindows.h b/Foundation/include/Poco/UnWindows.h index b4778bf319..e9a7eeba2c 100644 --- a/Foundation/include/Poco/UnWindows.h +++ b/Foundation/include/Poco/UnWindows.h @@ -25,12 +25,14 @@ #endif #endif +// disable min/max macros +#define NOMINMAX #if !defined(POCO_NO_WINDOWS_H) #include #ifdef __MINGW32__ - #include - #include + #include + #include #include #endif // __MINGW32__ #endif diff --git a/Foundation/src/Task.cpp b/Foundation/src/Task.cpp index 8367d571a3..4b1e1fa1b3 100644 --- a/Foundation/src/Task.cpp +++ b/Foundation/src/Task.cpp @@ -79,8 +79,7 @@ void Task::run() pOwner->taskFailed(this, SystemException("unknown exception")); } _state = TASK_FINISHED; - if (pOwner) - pOwner->taskFinished(this); + if (pOwner) pOwner->taskFinished(this); } diff --git a/Foundation/src/TaskManager.cpp b/Foundation/src/TaskManager.cpp index 615541ed95..cddf027b69 100644 --- a/Foundation/src/TaskManager.cpp +++ b/Foundation/src/TaskManager.cpp @@ -15,6 +15,7 @@ #include "Poco/TaskManager.h" #include "Poco/TaskNotification.h" #include "Poco/ThreadPool.h" +#include "Poco/Timespan.h" namespace Poco { @@ -23,20 +24,31 @@ namespace Poco { const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds -TaskManager::TaskManager(): - _threadPool(ThreadPool::defaultPool()) +TaskManager::TaskManager(const std::string& name, + int minCapacity, + int maxCapacity, + int idleTime, + int stackSize): + _threadPool(*new ThreadPool(name, minCapacity, maxCapacity, idleTime, stackSize)), + _ownPool(true) { + // prevent skipping the first progress update + _lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2); } TaskManager::TaskManager(ThreadPool& pool): - _threadPool(pool) + _threadPool(pool), + _ownPool(false) { + // prevent skipping the first progress update + _lastProgressNotification -= Timespan(MIN_PROGRESS_NOTIFICATION_INTERVAL*2); } TaskManager::~TaskManager() { + if (_ownPool) delete &_threadPool; } @@ -46,7 +58,7 @@ void TaskManager::start(Task* pTask) pAutoTask->setOwner(this); pAutoTask->setState(Task::TASK_STARTING); - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); _taskList.push_back(pAutoTask); try { @@ -65,7 +77,7 @@ void TaskManager::start(Task* pTask) void TaskManager::cancelAll() { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); for (auto& pTask: _taskList) { @@ -82,7 +94,7 @@ void TaskManager::joinAll() TaskManager::TaskList TaskManager::taskList() const { - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); return _taskList; } @@ -114,7 +126,7 @@ void TaskManager::taskStarted(Task* pTask) void TaskManager::taskProgress(Task* pTask, float progress) { - ScopedLockWithUnlock lock(_mutex); + ScopedLockWithUnlock lock(_mutex); if (_lastProgressNotification.isElapsed(MIN_PROGRESS_NOTIFICATION_INTERVAL)) { @@ -135,7 +147,7 @@ void TaskManager::taskFinished(Task* pTask) { _nc.postNotification(new TaskFinishedNotification(pTask)); - FastMutex::ScopedLock lock(_mutex); + ScopedLockT lock(_mutex); for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it) { if (*it == pTask) diff --git a/Foundation/src/Thread_POSIX.cpp b/Foundation/src/Thread_POSIX.cpp index 189c52e23c..4235ab2f4f 100644 --- a/Foundation/src/Thread_POSIX.cpp +++ b/Foundation/src/Thread_POSIX.cpp @@ -15,9 +15,11 @@ #include "Poco/Thread_POSIX.h" #include "Poco/Thread.h" #include "Poco/Exception.h" +#include "Poco/Error.h" #include "Poco/ErrorHandler.h" #include "Poco/Timespan.h" #include "Poco/Timestamp.h" +#include "Poco/Format.h" #include #if defined(__sun) && defined(__SVR4) # if !defined(__EXTENSIONS__) @@ -35,6 +37,8 @@ #include #include /* For SYS_xxx definitions */ #endif +#include + // // Block SIGPIPE in main thread. @@ -323,61 +327,6 @@ long ThreadImpl::currentOsTidImpl() #endif } -void ThreadImpl::sleepImpl(long milliseconds) -{ -#if defined(__digital__) - // This is specific to DECThreads - struct timespec interval; - interval.tv_sec = milliseconds / 1000; - interval.tv_nsec = (milliseconds % 1000)*1000000; - pthread_delay_np(&interval); -#elif POCO_OS == POCO_OS_LINUX || POCO_OS == POCO_OS_ANDROID || POCO_OS == POCO_OS_MAC_OS_X || POCO_OS == POCO_OS_QNX || POCO_OS == POCO_OS_VXWORKS - Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds)); - int rc; - do - { - struct timespec ts; - ts.tv_sec = (long) remainingTime.totalSeconds(); - ts.tv_nsec = (long) remainingTime.useconds()*1000; - Poco::Timestamp start; - rc = ::nanosleep(&ts, 0); - if (rc < 0 && errno == EINTR) - { - Poco::Timestamp end; - Poco::Timespan waited = start.elapsed(); - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; - } - } - while (remainingTime > 0 && rc < 0 && errno == EINTR); - if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): nanosleep() failed"); -#else - Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds)); - int rc; - do - { - struct timeval tv; - tv.tv_sec = (long) remainingTime.totalSeconds(); - tv.tv_usec = (long) remainingTime.useconds(); - Poco::Timestamp start; - rc = ::select(0, NULL, NULL, NULL, &tv); - if (rc < 0 && errno == EINTR) - { - Poco::Timestamp end; - Poco::Timespan waited = start.elapsed(); - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; - } - } - while (remainingTime > 0 && rc < 0 && errno == EINTR); - if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): select() failed"); -#endif -} - void* ThreadImpl::runnableEntry(void* pThread) { @@ -470,4 +419,39 @@ int ThreadImpl::reverseMapPrio(int prio, int policy) } +bool ThreadImpl::setAffinityImpl(int coreID) +{ +#if POCO_OS == POCO_OS_LINUX + int numCores = sysconf(_SC_NPROCESSORS_ONLN); + if (coreID < 0 || coreID >= numCores) + return false; + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(coreID, &cpuset); + + return 0 == pthread_setaffinity_np(_pData->thread, sizeof(cpu_set_t), &cpuset); +#else + return false; +#endif +} + + +int ThreadImpl::getAffinityImpl() const +{ +#if POCO_OS == POCO_OS_LINUX + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + if (0 == pthread_getaffinity_np(_pData->thread, sizeof(cpu_set_t), &cpuset)) + { + for (int i = 0; i < CPU_SETSIZE; ++i) + { + if (CPU_ISSET(i, &cpuset)) return i; + } + } +#endif + return -1; +} + + } // namespace Poco diff --git a/Foundation/src/Thread_VX.cpp b/Foundation/src/Thread_VX.cpp index acb64a63c4..4a4d13e95a 100644 --- a/Foundation/src/Thread_VX.cpp +++ b/Foundation/src/Thread_VX.cpp @@ -181,31 +181,6 @@ long ThreadImpl::currentOsTidImpl() return taskIdSelf(); } -void ThreadImpl::sleepImpl(long milliseconds) -{ - Poco::Timespan remainingTime(1000*Poco::Timespan::TimeDiff(milliseconds)); - int rc; - do - { - struct timespec ts; - ts.tv_sec = (long) remainingTime.totalSeconds(); - ts.tv_nsec = (long) remainingTime.useconds()*1000; - Poco::Timestamp start; - rc = ::nanosleep(&ts, 0); - if (rc < 0 && errno == EINTR) - { - Poco::Timestamp end; - Poco::Timespan waited = start.elapsed(); - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; - } - } - while (remainingTime > 0 && rc < 0 && errno == EINTR); - if (rc < 0 && remainingTime > 0) throw Poco::SystemException("Thread::sleep(): nanosleep() failed"); -} - void ThreadImpl::runnableEntry(void* pThread, int, int, int, int, int, int, int, int, int) { diff --git a/Foundation/src/Thread_WIN32.cpp b/Foundation/src/Thread_WIN32.cpp index 87f8bb2199..197ed03845 100644 --- a/Foundation/src/Thread_WIN32.cpp +++ b/Foundation/src/Thread_WIN32.cpp @@ -16,6 +16,7 @@ #include "Poco/Exception.h" #include "Poco/ErrorHandler.h" #include +#include namespace @@ -219,6 +220,56 @@ long ThreadImpl::currentOsTidImpl() return GetCurrentThreadId(); } + +bool ThreadImpl::setAffinityImpl(int affinity) +{ + HANDLE hProcess = GetCurrentProcess(); + DWORD_PTR procMask = 0, sysMask = 0; + if (GetProcessAffinityMask(hProcess, &procMask, &sysMask)) + { + HANDLE hThread = GetCurrentThread(); + DWORD_PTR threadMask = 0; + threadMask |= 1ULL << affinity; + + // thread and process affinities must match + if (!(threadMask & procMask)) return false; + + if (SetThreadAffinityMask(hThread, threadMask)) + return true; + } + return false; +} + + +int ThreadImpl::getAffinityImpl() const +{ + // bit ugly, but there's no explicit API for this + // https://stackoverflow.com/a/6601917/205386 + HANDLE hThread = GetCurrentThread(); + DWORD_PTR mask = 1; + DWORD_PTR old = 0; + + // try every CPU one by one until one works or none are left + while (mask) + { + old = SetThreadAffinityMask(hThread, mask); + if (old) + { // this one worked + SetThreadAffinityMask(hThread, old); // restore original + if (old > std::numeric_limits::max()) return -1; + return static_cast(old); + } + else + { + if (GetLastError() != ERROR_INVALID_PARAMETER) + return -1; + } + mask <<= 1; + } + return -1; +} + + #if defined(_DLL) DWORD WINAPI ThreadImpl::runnableEntry(LPVOID pThread) #else diff --git a/Foundation/testsuite/src/TaskManagerTest.cpp b/Foundation/testsuite/src/TaskManagerTest.cpp index 4660259134..070b0ed407 100644 --- a/Foundation/testsuite/src/TaskManagerTest.cpp +++ b/Foundation/testsuite/src/TaskManagerTest.cpp @@ -169,11 +169,11 @@ namespace } private: - std::atomic _started; - std::atomic _cancelled; - std::atomic _finished; - Exception* _pException; - float _progress; + std::atomic _started; + std::atomic _cancelled; + std::atomic _finished; + std::atomic _pException; + std::atomic _progress; }; @@ -253,10 +253,11 @@ void TaskManagerTest::testFinish() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); - while (pTT->progress() != 0.5) Thread::sleep(50); + while (to.progress() == 0) Thread::sleep(50); assertTrue (to.progress() == 0.5); assertTrue (to.started()); assertTrue (pTT->state() == Task::TASK_RUNNING); @@ -274,6 +275,8 @@ void TaskManagerTest::testFinish() list = tm.taskList(); assertTrue (list.empty()); assertTrue (!to.error()); + tm.cancelAll(); + tm.joinAll(); } @@ -288,6 +291,7 @@ void TaskManagerTest::testCancel() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); @@ -299,15 +303,20 @@ void TaskManagerTest::testCancel() assertTrue (list.size() == 1); assertTrue (tm.count() == 1); tm.cancelAll(); + while (pTT->state() != Task::TASK_CANCELLING) Thread::sleep(50); + pTT->cont(); assertTrue (to.cancelled()); pTT->cont(); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); assertTrue (pTT->state() == Task::TASK_FINISHED); + while (!to.finished()) Thread::sleep(50); assertTrue (to.finished()); while (tm.count() == 1) Thread::sleep(50); list = tm.taskList(); assertTrue (list.empty()); assertTrue (!to.error()); + tm.cancelAll(); + tm.joinAll(); } @@ -322,6 +331,7 @@ void TaskManagerTest::testError() tm.addObserver(Observer(to, &TaskObserver::taskProgress)); AutoPtr pTT = new TestTask; tm.start(pTT.duplicate()); + while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50); assertTrue (pTT->progress() == 0); Thread::sleep(200); pTT->cont(); @@ -335,12 +345,17 @@ void TaskManagerTest::testError() pTT->fail(); pTT->cont(); while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); + pTT->cont(); + while (pTT->state() != Task::TASK_FINISHED) Thread::sleep(50); assertTrue (pTT->state() == Task::TASK_FINISHED); + while (!to.finished()) Thread::sleep(50); assertTrue (to.finished()); assertTrue (to.error() != 0); while (tm.count() == 1) Thread::sleep(50); list = tm.taskList(); assertTrue (list.empty()); + tm.cancelAll(); + tm.joinAll(); } @@ -424,6 +439,7 @@ void TaskManagerTest::testCustom() tm.cancelAll(); while (tm.count() > 0) Thread::sleep(50); assertTrue (tm.count() == 0); + tm.joinAll(); } @@ -440,6 +456,7 @@ void TaskManagerTest::testMultiTasks() tm.cancelAll(); while (tm.count() > 0) Thread::sleep(100); assertTrue (tm.count() == 0); + tm.joinAll(); } @@ -472,7 +489,8 @@ void TaskManagerTest::testCustomThreadPool() assertTrue (tm.count() == tp.allocated()); - tp.joinAll(); + tm.cancelAll(); + tm.joinAll(); } void TaskManagerTest::setUp() diff --git a/Foundation/testsuite/src/ThreadTest.cpp b/Foundation/testsuite/src/ThreadTest.cpp index 65f981d424..eefdf0c9ea 100644 --- a/Foundation/testsuite/src/ThreadTest.cpp +++ b/Foundation/testsuite/src/ThreadTest.cpp @@ -22,6 +22,7 @@ #define __EXTENSIONS__ #endif #include +#include using Poco::Thread; @@ -471,6 +472,22 @@ void ThreadTest::testSleep() } +void ThreadTest::testAffinity() +{ +#if POCO_OS == POCO_OS_LINUX + MyRunnable mr; + Thread t; + t.start(mr); + assertTrue (t.setAffinity(0)); + assertEqual (t.getAffinity(), 0); + mr.notify(); + t.join(); +#else + std::cout << "not implemented"; +#endif +} + + void ThreadTest::setUp() { } @@ -499,6 +516,7 @@ CppUnit::Test* ThreadTest::suite() CppUnit_addTest(pSuite, ThreadTest, testThreadFunctor); CppUnit_addTest(pSuite, ThreadTest, testThreadStackSize); CppUnit_addTest(pSuite, ThreadTest, testSleep); + CppUnit_addTest(pSuite, ThreadTest, testAffinity); return pSuite; } diff --git a/Foundation/testsuite/src/ThreadTest.h b/Foundation/testsuite/src/ThreadTest.h index c6b1f1ce8f..1bda503da5 100644 --- a/Foundation/testsuite/src/ThreadTest.h +++ b/Foundation/testsuite/src/ThreadTest.h @@ -38,6 +38,7 @@ class ThreadTest: public CppUnit::TestCase void testThreadFunctor(); void testThreadStackSize(); void testSleep(); + void testAffinity(); void setUp(); void tearDown(); diff --git a/Net/include/Poco/Net/SocketNotification.h b/Net/include/Poco/Net/SocketNotification.h index aafb313b0b..b2770abceb 100644 --- a/Net/include/Poco/Net/SocketNotification.h +++ b/Net/include/Poco/Net/SocketNotification.h @@ -47,9 +47,10 @@ class Net_API SocketNotification: public Poco::Notification Socket socket() const; /// Returns the socket that caused the notification. -private: +protected: void setSocket(const Socket& socket); +private: SocketReactor* _pReactor; Socket _socket; @@ -88,6 +89,10 @@ class Net_API ErrorNotification: public SocketNotification ErrorNotification(SocketReactor* pReactor, int code = 0, const std::string& description = ""); /// Creates the ErrorNotification for the given SocketReactor. + ErrorNotification(SocketReactor* pReactor, const Socket& socket, + int code = 0, const std::string& description = ""); + /// Creates the ErrorNotification for the given SocketReactor. + ~ErrorNotification(); /// Destroys the ErrorNotification. diff --git a/Net/include/Poco/Net/SocketReactor.h b/Net/include/Poco/Net/SocketReactor.h index bedd1c3f71..46e5d3d3c7 100644 --- a/Net/include/Poco/Net/SocketReactor.h +++ b/Net/include/Poco/Net/SocketReactor.h @@ -20,11 +20,14 @@ #include "Poco/Net/Net.h" #include "Poco/Net/Socket.h" +#include "Poco/Net/SocketNotification.h" +#include "Poco/Net/SocketNotifier.h" #include "Poco/Net/PollSet.h" #include "Poco/Runnable.h" #include "Poco/Timespan.h" #include "Poco/Observer.h" #include "Poco/AutoPtr.h" +#include "Poco/Event.h" #include #include @@ -39,8 +42,6 @@ namespace Net { class Socket; -class SocketNotification; -class SocketNotifier; class Net_API SocketReactor: public Poco::Runnable @@ -71,31 +72,34 @@ class Net_API SocketReactor: public Poco::Runnable /// as argument. /// /// Once started, the SocketReactor waits for events - /// on the registered sockets, using Socket::select(). + /// on the registered sockets, using PollSet:poll(). /// If an event is detected, the corresponding event handler /// is invoked. There are five event types (and corresponding /// notification classes) defined: ReadableNotification, WritableNotification, - /// ErrorNotification, TimeoutNotification, IdleNotification and - /// ShutdownNotification. + /// ErrorNotification, TimeoutNotification and ShutdownNotification. /// /// The ReadableNotification will be dispatched if a socket becomes /// readable. The WritableNotification will be dispatched if a socket /// becomes writable. The ErrorNotification will be dispatched if /// there is an error condition on a socket. /// - /// If the timeout expires and no event has occurred, a + /// Timeout/sleep strategy operates as follows: + /// + /// If the poll timeout expires and no event has occurred, a /// TimeoutNotification will be dispatched to all event handlers /// registered for it. This is done in the onTimeout() method /// which can be overridden by subclasses to perform custom /// timeout processing. /// - /// If there are no sockets for the SocketReactor to pass to - /// Socket::select(), an IdleNotification will be dispatched to - /// all event handlers registered for it. This is done in the - /// onIdle() method which can be overridden by subclasses - /// to perform custom idle processing. Since onIdle() will be - /// called repeatedly in a loop, it is recommended to do a - /// short sleep or yield in the event handler. + /// By default, the SocketReactor is configured to start sleeping + /// when the poll timeout is zero and there are no socket events for + /// a certain amount of time; sleep duration is progressive, up to + /// the configured limit. This behavior can be disabled through + /// configuration parameters. + /// + /// When there are no registered handlers, the SocketRactor sleeps + /// an incremental amount of milliseconds, up to the sleep limit. + /// Increment step value and sleep limit are configurable. /// /// Finally, when the SocketReactor is about to shut down (as a result /// of stop() being called), it dispatches a ShutdownNotification @@ -103,22 +107,76 @@ class Net_API SocketReactor: public Poco::Runnable /// which can be overridded by subclasses to perform custom /// shutdown processing. /// - /// The SocketReactor is implemented so that it can - /// run in its own thread. It is also possible to run - /// multiple SocketReactors in parallel, as long as - /// they work on different sockets. + /// The SocketReactor is implemented so that it can run in its own thread. + /// Moreover, the thread affinity to a CPU core can optionally be set for the + /// thread on platforms where that functionality is supported and implemented. + /// It is also possible to run multiple SocketReactors in parallel, as long + /// as they work on different sockets. /// - /// It is safe to call addEventHandler() and removeEventHandler() - /// from another thread while the SocketReactor is running. Also, - /// it is safe to call addEventHandler() and removeEventHandler() - /// from event handlers. + /// It is safe to call addEventHandler() and removeEventHandler() from another + /// thread while the SocketReactor is running. Also, it is safe to call + /// addEventHandler() and removeEventHandler() from event handlers. + /// + /// SocketReactor uses NotificationCenter to notify observers. When a handler + /// throws an exception, the NotificationCenter stops notifying the rest of + /// the observers about that particular event instance and propagates the + /// exception, which is eventually caught in the SocketReactor::run() method. + /// This sequence of events is obviously not desirable and it is highly + /// recommended that handlers wrap the code in try/catch and deal with all + /// the exceptions internally, lest they disrupt the notification of the peers. { public: + struct Params + /// Reactor parameters. + /// Default values should work well for most scenarios. + /// + /// Note: the default behavior on zero poll timeout is to start + /// incrementally sleeping after `idleThreshold` and no socket events. + /// This prevents high CPU usage during periods without network + /// activity. To disable it, set `throttle` to false. + { + Poco::Timespan pollTimeout = DEFAULT_TIMEOUT; + /// Timeout for PolllSet::poll() + + long sleep = 0; + /// Amount of milliseconds to sleep, progressively incremented, + /// at `increment` step, up to the `sleepLimit`. + + long sleepLimit = DEFAULT_SLEEP_LIMIT; + /// Max sleep duration in milliseconds + /// This is the ceiling value in milliseconds for the sleep algorithm, + /// which kicks in in two cases: + /// + /// - when there are no subscribers and the reactor is just idle-spinning + /// - when there are subscribers, but there was no socket events signalled + /// for `sleepLimit` milliseconds and `throttle` is true + + int increment = 1; + /// Increment value for the sleep backoff algorithm. + + long idleThreshold = DEFAULT_SLEEP_LIMIT; + /// Indicates when to start sleeping (throttling) on zero poll timeout + + bool throttle = true; + /// Indicates whether to start sleeping when poll timeout is zero and + /// there's no socket events for a period longer than `idleThreshold` + }; + SocketReactor(); /// Creates the SocketReactor. - explicit SocketReactor(const Poco::Timespan& timeout); - /// Creates the SocketReactor, using the given timeout. + explicit SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity = -1); + /// Creates the SocketReactor, using the given poll timeout. + /// + /// The threadAffinity argument, when non-negative, indicates on which CPU core should + /// the run() method run. Nonexisting core or situation when this feature is not implemented + /// are silently ignored and this argument has no effect in such scenarios. + + SocketReactor(const Params& params, int threadAffinity = -1); + /// Creates the SocketReactor, using the given parameters. + /// The threadAffinity argument, when non-negative, indicates on which CPU core should + /// the run() method run. Nonexisting core or situation when this feature is not implemented + /// are silently ignored and this argument has no effect in such scenarios. virtual ~SocketReactor(); /// Destroys the SocketReactor. @@ -140,13 +198,12 @@ class Net_API SocketReactor: public Poco::Runnable void setTimeout(const Poco::Timespan& timeout); /// Sets the timeout. /// - /// If no other event occurs for the given timeout + /// If no socket event occurs for the given timeout /// interval, a timeout event is sent to all event listeners. /// /// The default timeout is 250 milliseconds; /// - /// The timeout is passed to the Socket::select() - /// method. + /// The timeout is passed to the PollSet::poll() method. const Poco::Timespan& getTimeout() const; /// Returns the timeout. @@ -186,9 +243,8 @@ class Net_API SocketReactor: public Poco::Runnable /// dispatches the ShutdownNotification and thus should be called by overriding /// implementations. - virtual void onBusy(); - /// Must be overridden by subclasses (alongside the run() override) to perform - /// additional periodic tasks. The default implementation does nothing. + void onError(const Socket& socket, int code, const std::string& description); + /// Notifies all subscribers when the reactor loop throws an exception. void onError(int code, const std::string& description); /// Notifies all subscribers when the reactor loop throws an exception. @@ -211,13 +267,19 @@ class Net_API SocketReactor: public Poco::Runnable void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); NotifierPtr getNotifier(const Socket& socket, bool makeNew = false); + void sleep(); + enum { - DEFAULT_TIMEOUT = 250000 + DEFAULT_TIMEOUT = 250000, + /// Default timeout for PollSet::poll() + DEFAULT_SLEEP_LIMIT = 250 + /// Default limit for event-based sleeping }; + Params _params; + int _threadAffinity = -1; std::atomic _stop; - Poco::Timespan _timeout; EventHandlerMap _handlers; PollSet _pollSet; NotificationPtr _pReadableNotification; @@ -226,11 +288,51 @@ class Net_API SocketReactor: public Poco::Runnable NotificationPtr _pTimeoutNotification; NotificationPtr _pShutdownNotification; MutexType _mutex; - Poco::Thread* _pThread; + Poco::Event _event; friend class SocketNotifier; }; +// +// inlines +// + + +inline void SocketReactor::setTimeout(const Poco::Timespan& timeout) +{ + _params.pollTimeout = timeout; +} + + +inline const Poco::Timespan& SocketReactor::getTimeout() const +{ + return _params.pollTimeout; +} + + +inline bool SocketReactor::has(const Socket& socket) const +{ + return _pollSet.has(socket); +} + + +inline void SocketReactor::onError(const Socket& socket, int code, const std::string& description) +{ + dispatch(new ErrorNotification(this, socket, code, description)); +} + + +inline void SocketReactor::onError(int code, const std::string& description) +{ + dispatch(new ErrorNotification(this, code, description)); +} + + +inline void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification) +{ + pNotifier->dispatch(pNotification); +} + } } // namespace Poco::Net diff --git a/Net/src/PollSet.cpp b/Net/src/PollSet.cpp index c60fa867e3..23924e1b31 100644 --- a/Net/src/PollSet.cpp +++ b/Net/src/PollSet.cpp @@ -69,6 +69,7 @@ class PollSetImpl using SocketMap = std::map; PollSetImpl(): _events(1024), + _port(0), _eventfd(eventfd(_port, 0)), _epollfd(epoll_create(1)) { @@ -170,18 +171,23 @@ class PollSetImpl // calls would round-robin through the remaining ready sockets, but it's better to give // the call enough room once we start hitting the boundary if (rc >= _events.size()) _events.resize(_events.size()*2); - if (rc < 0 && SocketImpl::lastError() == POCO_EINTR) + else if (rc < 0) { - Poco::Timestamp end; - Poco::Timespan waited = end - start; - if (waited < remainingTime) - remainingTime -= waited; - else - remainingTime = 0; + // if interrupted and there's still time left, keep waiting + if (SocketImpl::lastError() == POCO_EINTR) + { + Poco::Timestamp end; + Poco::Timespan waited = end - start; + if (waited < remainingTime) + { + remainingTime -= waited; + continue; + } + } + else SocketImpl::error(); } } - while (rc < 0 && SocketImpl::lastError() == POCO_EINTR); - if (rc < 0) SocketImpl::error(); + while (false); for (int i = 0; i < rc; i++) { @@ -198,8 +204,17 @@ class PollSetImpl result[it->second.first] |= PollSet::POLL_ERROR; } } + else if (_events[i].events & EPOLLIN) // eventfd signaled + { + uint64_t val; +#ifdef WEPOLL_H_ + if (_pSocket && _pSocket->available()) + _pSocket->impl()->receiveBytes(&val, sizeof(val)); +#else + read(_eventfd, &val, sizeof(val)); +#endif + } } - return result; } @@ -277,6 +292,7 @@ class PollSetImpl if (rmFD == 0) { _pSocket = new ServerSocket(SocketAddress("127.0.0.1", 0)); + _pSocket->setBlocking(false); port = _pSocket->address().port(); return static_cast(_pSocket->impl()->sockfd()); } @@ -294,7 +310,7 @@ class PollSetImpl mutable Mutex _mutex; SocketMap _socketMap; std::vector _events; - int _port = 0; + int _port; std::atomic _eventfd; #ifdef WEPOLL_H_ std::atomic _epollfd; diff --git a/Net/src/SocketNotification.cpp b/Net/src/SocketNotification.cpp index d4ec667c69..de859b5efa 100644 --- a/Net/src/SocketNotification.cpp +++ b/Net/src/SocketNotification.cpp @@ -66,6 +66,16 @@ ErrorNotification::ErrorNotification(SocketReactor* pReactor, int code, const st } +ErrorNotification::ErrorNotification(SocketReactor* pReactor, const Socket& socket, + int code, const std::string& description): + SocketNotification(pReactor), + _code(code), + _description(description) +{ + setSocket(socket); +} + + ErrorNotification::~ErrorNotification() { } diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 14c2e95c60..f68457a081 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -13,10 +13,9 @@ #include "Poco/Net/SocketReactor.h" -#include "Poco/Net/SocketNotification.h" -#include "Poco/Net/SocketNotifier.h" #include "Poco/ErrorHandler.h" #include "Poco/Thread.h" +#include "Poco/Stopwatch.h" #include "Poco/Exception.h" @@ -30,27 +29,38 @@ namespace Net { SocketReactor::SocketReactor(): _stop(false), - _timeout(DEFAULT_TIMEOUT), _pReadableNotification(new ReadableNotification(this)), _pWritableNotification(new WritableNotification(this)), _pErrorNotification(new ErrorNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)), - _pShutdownNotification(new ShutdownNotification(this)), - _pThread(0) + _pShutdownNotification(new ShutdownNotification(this)) { } -SocketReactor::SocketReactor(const Poco::Timespan& timeout): +SocketReactor::SocketReactor(const Poco::Timespan& pollTimeout, int threadAffinity): + _threadAffinity(threadAffinity), _stop(false), - _timeout(timeout), _pReadableNotification(new ReadableNotification(this)), _pWritableNotification(new WritableNotification(this)), _pErrorNotification(new ErrorNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)), - _pShutdownNotification(new ShutdownNotification(this)), - _pThread(0) + _pShutdownNotification(new ShutdownNotification(this)) { + _params.pollTimeout = pollTimeout; +} + +SocketReactor::SocketReactor(const Params& params, int threadAffinity): + _params(params), + _threadAffinity(threadAffinity), + _stop(false), + _pReadableNotification(new ReadableNotification(this)), + _pWritableNotification(new WritableNotification(this)), + _pErrorNotification(new ErrorNotification(this)), + _pTimeoutNotification(new TimeoutNotification(this)), + _pShutdownNotification(new ShutdownNotification(this)) +{ + } @@ -61,36 +71,65 @@ SocketReactor::~SocketReactor() void SocketReactor::run() { - _pThread = Thread::current(); + if (_threadAffinity >= 0) + { + Poco::Thread* pThread = Thread::current(); + if (pThread) pThread->setAffinity(_threadAffinity); + } + Poco::Stopwatch sw; + if (_params.throttle) sw.start(); + PollSet::SocketModeMap sm; while (!_stop) { try { - if (!hasSocketHandlers()) + if (hasSocketHandlers()) { - Thread::trySleep(static_cast(_timeout.totalMilliseconds())); - } - else - { - bool readable = false; - PollSet::SocketModeMap sm = _pollSet.poll(_timeout); - if (sm.size() > 0) + sm = _pollSet.poll(_params.pollTimeout); + for (const auto& s : sm) { - PollSet::SocketModeMap::iterator it = sm.begin(); - PollSet::SocketModeMap::iterator end = sm.end(); - for (; it != end; ++it) + try { - if (it->second & PollSet::POLL_READ) + if (s.second & PollSet::POLL_READ) + { + dispatch(s.first, _pReadableNotification); + } + if (s.second & PollSet::POLL_WRITE) + { + dispatch(s.first, _pWritableNotification); + } + if (s.second & PollSet::POLL_ERROR) { - dispatch(it->first, _pReadableNotification); - readable = true; + dispatch(s.first, _pErrorNotification); } - if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); - if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); + } + catch (Exception& exc) + { + onError(s.first, exc.code(), exc.displayText()); + ErrorHandler::handle(exc); + } + catch (std::exception& exc) + { + onError(s.first, 0, exc.what()); + ErrorHandler::handle(exc); + } + catch (...) + { + onError(s.first, 0, "unknown exception"); + ErrorHandler::handle(); + } + } + if (0 == sm.size()) + { + onTimeout(); + if (_params.throttle && _params.pollTimeout == 0) + { + if ((sw.elapsed()/1000) > _params.sleepLimit) sleep(); } } - if (!readable) onTimeout(); + else if (_params.throttle) sw.restart(); } + else sleep(); } catch (Exception& exc) { @@ -112,48 +151,41 @@ void SocketReactor::run() } -bool SocketReactor::hasSocketHandlers() +void SocketReactor::sleep() { - if (!_pollSet.empty()) - { - ScopedLock lock(_mutex); - for (auto& p: _handlers) - { - if (p.second->accepts(_pReadableNotification) || - p.second->accepts(_pWritableNotification) || - p.second->accepts(_pErrorNotification)) return true; - } - } - - return false; + if (_params.sleep < _params.sleepLimit) ++_params.sleep; + _event.tryWait(_params.sleep); } void SocketReactor::stop() { _stop = true; + wakeUp(); } void SocketReactor::wakeUp() { - if (_pThread && _pThread != Thread::current()) - { - _pThread->wakeUp(); - _pollSet.wakeUp(); - } + _pollSet.wakeUp(); + _event.set(); } -void SocketReactor::setTimeout(const Poco::Timespan& timeout) +bool SocketReactor::hasSocketHandlers() { - _timeout = timeout; -} - + if (!_pollSet.empty()) + { + ScopedLock lock(_mutex); + for (auto& p: _handlers) + { + if (p.second->accepts(_pReadableNotification) || + p.second->accepts(_pWritableNotification) || + p.second->accepts(_pErrorNotification)) return true; + } + } -const Poco::Timespan& SocketReactor::getTimeout() const -{ - return _timeout; + return false; } @@ -224,12 +256,6 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac } -bool SocketReactor::has(const Socket& socket) const -{ - return _pollSet.has(socket); -} - - void SocketReactor::onTimeout() { dispatch(_pTimeoutNotification); @@ -242,17 +268,6 @@ void SocketReactor::onShutdown() } -void SocketReactor::onBusy() -{ -} - - -void SocketReactor::onError(int code, const std::string& description) -{ - dispatch(new ErrorNotification(this, code, description)); -} - - void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotification) { NotifierPtr pNotifier = getNotifier(socket); @@ -277,25 +292,4 @@ void SocketReactor::dispatch(SocketNotification* pNotification) } -void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification) -{ - try - { - pNotifier->dispatch(pNotification); - } - catch (Exception& exc) - { - ErrorHandler::handle(exc); - } - catch (std::exception& exc) - { - ErrorHandler::handle(exc); - } - catch (...) - { - ErrorHandler::handle(); - } -} - - } } // namespace Poco::Net diff --git a/Net/testsuite/src/PollSetTest.cpp b/Net/testsuite/src/PollSetTest.cpp index 76f7240811..0f125060f4 100644 --- a/Net/testsuite/src/PollSetTest.cpp +++ b/Net/testsuite/src/PollSetTest.cpp @@ -184,9 +184,14 @@ void PollSetTest::testTimeout() sw.stop(); assertTrue(ps.poll(timeout).size() == 1); - // just here to prevent server exception on connection reset char buffer[5]; ss.receiveBytes(buffer, sizeof(buffer)); + + sw.restart(); + sm = ps.poll(timeout); + sw.stop(); + assertTrue(sm.empty()); + assertTrue(sw.elapsed() >= 900000); } diff --git a/Net/testsuite/src/SocketConnectorTest.cpp b/Net/testsuite/src/SocketConnectorTest.cpp index 1d8af8986a..50cffa2860 100644 --- a/Net/testsuite/src/SocketConnectorTest.cpp +++ b/Net/testsuite/src/SocketConnectorTest.cpp @@ -74,16 +74,21 @@ namespace _socket(socket), _reactor(reactor), _or(*this, &ClientServiceHandler::onReadable), - _ow(*this, &ClientServiceHandler::onWritable) + _ow(*this, &ClientServiceHandler::onWritable), + _os(*this, &ClientServiceHandler::onShutdown) { _reactor.addEventHandler(_socket, _or); _reactor.addEventHandler(_socket, _ow); + _reactor.addEventHandler(_socket, _os); doSomething(); } ~ClientServiceHandler() { + _reactor.removeEventHandler(_socket, _os); + _reactor.removeEventHandler(_socket, _ow); + _reactor.removeEventHandler(_socket, _or); } void doSomething() @@ -91,21 +96,24 @@ namespace Thread::sleep(100); } + void onShutdown(ShutdownNotification* pNf) + { + if (pNf) pNf->release(); + _reactor.removeEventHandler(_socket, _os); + delete this; + } + void onReadable(ReadableNotification* pNf) { - pNf->release(); + if (pNf) pNf->release(); char buffer[32]; int n = _socket.receiveBytes(buffer, sizeof(buffer)); - if (n <= 0) - { - _reactor.removeEventHandler(_socket, _or); - delete this; - } + if (n <= 0) onShutdown(0); } void onWritable(WritableNotification* pNf) { - pNf->release(); + if (pNf) pNf->release(); _reactor.removeEventHandler(_socket, _ow); std::string data(5, 'x'); _socket.sendBytes(data.data(), (int) data.length()); @@ -116,6 +124,7 @@ namespace SocketReactor& _reactor; Observer _or; Observer _ow; + Observer _os; }; } diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 5eaad6962b..06a26e0467 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -20,6 +20,7 @@ #include "Poco/Net/ServerSocket.h" #include "Poco/Net/SocketAddress.h" #include "Poco/Observer.h" +#include "Poco/Stopwatch.h" #include "Poco/Exception.h" #include "Poco/Thread.h" #include @@ -39,6 +40,7 @@ using Poco::Net::TimeoutNotification; using Poco::Net::ErrorNotification; using Poco::Net::ShutdownNotification; using Poco::Observer; +using Poco::Stopwatch; using Poco::IllegalStateException; using Poco::Thread; @@ -135,12 +137,18 @@ namespace checkReadableObserverCount(1); _reactor.removeEventHandler(_socket, Observer(*this, &ClientServiceHandler::onReadable)); checkReadableObserverCount(0); - if (_once || _data.size() == MAX_DATA_SIZE) + if (_once) { _reactor.stop(); delete this; + return; } } + if (_data.size() == MAX_DATA_SIZE) + { + _reactor.stop(); + delete this; + } } void onWritable(WritableNotification* pNf) @@ -260,7 +268,6 @@ namespace static bool _once; }; - std::string ClientServiceHandler::_data; bool ClientServiceHandler::_readableError = false; bool ClientServiceHandler::_writableError = false; @@ -599,6 +606,23 @@ void SocketReactorTest::testSocketConnectorDeadlock() } +void SocketReactorTest::testSocketReactorWakeup() +{ + SocketReactor::Params params; + params.pollTimeout = 1000000000; + params.sleepLimit = 1000000000; + SocketReactor reactor(params); + Thread thread; + Stopwatch sw; + sw.start(); + thread.start(reactor); + reactor.stop(); + thread.join(); + sw.stop(); + assertTrue (sw.elapsed() < 100000); +} + + void SocketReactorTest::setUp() { ClientServiceHandler::setCloseOnTimeout(false); @@ -621,6 +645,7 @@ CppUnit::Test* SocketReactorTest::suite() CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorDeadlock); + CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorWakeup); return pSuite; } diff --git a/Net/testsuite/src/SocketReactorTest.h b/Net/testsuite/src/SocketReactorTest.h index fa37e7ef26..ca5a7e9bd5 100644 --- a/Net/testsuite/src/SocketReactorTest.h +++ b/Net/testsuite/src/SocketReactorTest.h @@ -31,6 +31,7 @@ class SocketReactorTest: public CppUnit::TestCase void testSocketConnectorTimeout(); void testDataCollection(); void testSocketConnectorDeadlock(); + void testSocketReactorWakeup(); void setUp(); void tearDown();