From 8671669053079b6a78d22cb141751baf79d89fe2 Mon Sep 17 00:00:00 2001 From: Horst Schirmeier Date: Fri, 17 Jan 2014 17:35:34 +0100 Subject: [PATCH] jobserver: join remaining threads on shutdown To avoid accessing destroyed resources in CommThreads talking to clients, we need to properly join them on shutdown. The m_CommMutex becomes a JobServer member to make sure it isn't destroyed before the JobServer itself. Change-Id: I35b9fb93ace08a7a9476650f8f5e93597a3a8aa0 --- src/core/cpn/JobServer.cc | 10 +++------- src/core/cpn/JobServer.hpp | 9 ++++++--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index cfc25c28..79136d9c 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -34,10 +34,6 @@ void JobServer::addParam(ExperimentData* exp) volatile unsigned JobServer::m_DoneCount = 0; #endif -#ifndef __puma -boost::mutex CommThread::m_CommMutex; -#endif - ExperimentData *JobServer::getDone() { #ifndef __puma @@ -52,7 +48,7 @@ ExperimentData *JobServer::getDone() void JobServer::setNoMoreExperiments() { #ifndef __puma - boost::unique_lock lock(CommThread::m_CommMutex); + boost::unique_lock lock(m_CommMutex); #endif // currently not really necessary, as we only non-blockingly dequeue: m_undoneJobs.setIsFinished(); @@ -294,7 +290,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) // Prevent receiveExperimentResults from modifying (or indirectly, via // getDone and the campaign, deleting) jobs in the m_runningJobs queue. // (See details in receiveExperimentResults) - boost::unique_lock lock(m_CommMutex); + boost::unique_lock lock(m_js.m_CommMutex); #endif if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority // (This picks one running job.) @@ -347,7 +343,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct // by the campaign at any time. // Additionally, receiving a result overwrites the job's contents. This // already may cause breakage in sendPendingExperimentData (a). - boost::unique_lock lock(m_CommMutex); + boost::unique_lock lock(m_js.m_CommMutex); #endif for (i = 0; i < ctrlmsg.workloadid_size(); i++) { if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 89928cb0..99255a79 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -66,6 +66,9 @@ private: SynchronizedQueue m_undoneJobs; //! List of finished experiment results. SynchronizedQueue m_doneJobs; +#ifndef __puma + boost::mutex m_CommMutex; //! to synchronise the communication +#endif // __puma friend class CommThread; //!< CommThread is allowed access the job queues. /** * The actual startup of the Jobserver. @@ -93,10 +96,13 @@ public: } ~JobServer() { + done(); #ifndef __puma // Cleanup of m_serverThread, etc. + m_serverThread->join(); delete m_serverThread; #ifdef SERVER_PERFORMANCE_MEASURE + m_measureThread->join(); delete m_measureThread; #endif #endif // __puma @@ -162,9 +168,6 @@ private: */ void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg); public: -#ifndef __puma - static boost::mutex m_CommMutex; //! to synchronise the communication -#endif // __puma CommThread(int sockfd, JobServer& p) : m_sock(sockfd), m_job_size(1), m_js(p) { } /**