jobserver: join remaining threads on shutdown
To avoid accessing destroyed resources in CommThreads talking to clients, we need to properly join them on shutdown. The m_CommMutex becomes a JobServer member to make sure it isn't destroyed before the JobServer itself. Change-Id: I35b9fb93ace08a7a9476650f8f5e93597a3a8aa0
This commit is contained in:
@ -34,10 +34,6 @@ void JobServer::addParam(ExperimentData* exp)
|
|||||||
volatile unsigned JobServer::m_DoneCount = 0;
|
volatile unsigned JobServer::m_DoneCount = 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef __puma
|
|
||||||
boost::mutex CommThread::m_CommMutex;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
ExperimentData *JobServer::getDone()
|
ExperimentData *JobServer::getDone()
|
||||||
{
|
{
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
@ -52,7 +48,7 @@ ExperimentData *JobServer::getDone()
|
|||||||
void JobServer::setNoMoreExperiments()
|
void JobServer::setNoMoreExperiments()
|
||||||
{
|
{
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
boost::unique_lock<boost::mutex> lock(CommThread::m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||||
#endif
|
#endif
|
||||||
// currently not really necessary, as we only non-blockingly dequeue:
|
// currently not really necessary, as we only non-blockingly dequeue:
|
||||||
m_undoneJobs.setIsFinished();
|
m_undoneJobs.setIsFinished();
|
||||||
@ -294,7 +290,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
// Prevent receiveExperimentResults from modifying (or indirectly, via
|
// Prevent receiveExperimentResults from modifying (or indirectly, via
|
||||||
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
|
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
|
||||||
// (See details in receiveExperimentResults)
|
// (See details in receiveExperimentResults)
|
||||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
|
||||||
#endif
|
#endif
|
||||||
if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
||||||
// (This picks one running job.)
|
// (This picks one running job.)
|
||||||
@ -347,7 +343,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
|
|||||||
// by the campaign at any time.
|
// by the campaign at any time.
|
||||||
// Additionally, receiving a result overwrites the job's contents. This
|
// Additionally, receiving a result overwrites the job's contents. This
|
||||||
// already may cause breakage in sendPendingExperimentData (a).
|
// already may cause breakage in sendPendingExperimentData (a).
|
||||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
|
||||||
#endif
|
#endif
|
||||||
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
|
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
|
||||||
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
|
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
|
||||||
|
|||||||
@ -66,6 +66,9 @@ private:
|
|||||||
SynchronizedQueue<ExperimentData*> m_undoneJobs;
|
SynchronizedQueue<ExperimentData*> m_undoneJobs;
|
||||||
//! List of finished experiment results.
|
//! List of finished experiment results.
|
||||||
SynchronizedQueue<ExperimentData*> m_doneJobs;
|
SynchronizedQueue<ExperimentData*> m_doneJobs;
|
||||||
|
#ifndef __puma
|
||||||
|
boost::mutex m_CommMutex; //! to synchronise the communication
|
||||||
|
#endif // __puma
|
||||||
friend class CommThread; //!< CommThread is allowed access the job queues.
|
friend class CommThread; //!< CommThread is allowed access the job queues.
|
||||||
/**
|
/**
|
||||||
* The actual startup of the Jobserver.
|
* The actual startup of the Jobserver.
|
||||||
@ -93,10 +96,13 @@ public:
|
|||||||
}
|
}
|
||||||
~JobServer()
|
~JobServer()
|
||||||
{
|
{
|
||||||
|
done();
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
// Cleanup of m_serverThread, etc.
|
// Cleanup of m_serverThread, etc.
|
||||||
|
m_serverThread->join();
|
||||||
delete m_serverThread;
|
delete m_serverThread;
|
||||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||||
|
m_measureThread->join();
|
||||||
delete m_measureThread;
|
delete m_measureThread;
|
||||||
#endif
|
#endif
|
||||||
#endif // __puma
|
#endif // __puma
|
||||||
@ -162,9 +168,6 @@ private:
|
|||||||
*/
|
*/
|
||||||
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
|
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
|
||||||
public:
|
public:
|
||||||
#ifndef __puma
|
|
||||||
static boost::mutex m_CommMutex; //! to synchronise the communication
|
|
||||||
#endif // __puma
|
|
||||||
CommThread(int sockfd, JobServer& p)
|
CommThread(int sockfd, JobServer& p)
|
||||||
: m_sock(sockfd), m_job_size(1), m_js(p) { }
|
: m_sock(sockfd), m_job_size(1), m_js(p) { }
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user