From e409ae2f7662acf92e8000cff046c29040606206 Mon Sep 17 00:00:00 2001 From: hsc Date: Tue, 20 Nov 2012 15:01:52 +0000 Subject: [PATCH] JobServer: synchronization issues Synchronize re-sending jobs in sendPendingExperimentData() and modifying (or indirectly, via getDone() and the campaign, deleting) jobs in the m_runningJobs queue. a) sendPendingExperimentData needs an intact job to serialize and send it. b) After moving the job to m_doneJobs, it may be retrieved and deleted by the campaign at any time. Additionally, receiving a result overwrites the job's contents. This already may cause breakage in sendPendingExperimentData (a). git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1943 8c4709b5-6ec9-48aa-a5cd-a96041d1645a --- src/core/cpn/JobServer.cc | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index f5e3d872..f528b779 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -26,8 +26,8 @@ namespace fail { void JobServer::addParam(ExperimentData* exp) { #ifndef __puma - m_undoneJobs.Enqueue(exp); m_inOutCounter.increment(); + m_undoneJobs.Enqueue(exp); #endif } @@ -43,8 +43,6 @@ ExperimentData *JobServer::getDone() { #ifndef __puma - boost::unique_lock lock(CommThread::m_CommMutex); - if (m_undoneJobs.Size() == 0 && noMoreExperiments() && m_runningJobs.Size() == 0 @@ -261,9 +259,12 @@ void CommThread::sendPendingExperimentData(Minion& minion) return; } - #ifndef __puma +#ifndef __puma + // Prevent receiveExperimentResults from modifying (or indirectly, via + // getDone and the campaign, deleting) jobs in the m_runningJobs queue. + // (See details in receiveExperimentResults) boost::unique_lock lock(m_CommMutex); - #endif +#endif if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority // (This picks one running job.) // TODO: Improve selection of parameter set to be resent: @@ -304,6 +305,15 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) ExperimentData* exp = NULL; // Get exp* from running jobs //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; cout << "<<[" << workloadID << "] " << flush; +#ifndef __puma + // Prevent re-sending jobs in sendPendingExperimentData: + // a) sendPendingExperimentData needs an intact job to serialize and send it. + // b) After moving the job to m_doneJobs, it may be retrieved and deleted + // by the campaign at any time. + // Additionally, receiving a result overwrites the job's contents. This + // already may cause breakage in sendPendingExperimentData (a). + boost::unique_lock lock(m_CommMutex); +#endif if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found // deserialize results, expect failures if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {