JobServer: synchronization issues
Synchronize re-sending jobs in sendPendingExperimentData() and modifying (or indirectly, via getDone() and the campaign, deleting) jobs in the m_runningJobs queue. a) sendPendingExperimentData needs an intact job to serialize and send it. b) After moving the job to m_doneJobs, it may be retrieved and deleted by the campaign at any time. Additionally, receiving a result overwrites the job's contents. This already may cause breakage in sendPendingExperimentData (a). git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1943 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -26,8 +26,8 @@ namespace fail {
|
|||||||
void JobServer::addParam(ExperimentData* exp)
|
void JobServer::addParam(ExperimentData* exp)
|
||||||
{
|
{
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
m_undoneJobs.Enqueue(exp);
|
|
||||||
m_inOutCounter.increment();
|
m_inOutCounter.increment();
|
||||||
|
m_undoneJobs.Enqueue(exp);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,8 +43,6 @@ ExperimentData *JobServer::getDone()
|
|||||||
{
|
{
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
boost::unique_lock<boost::mutex> lock(CommThread::m_CommMutex);
|
|
||||||
|
|
||||||
if (m_undoneJobs.Size() == 0
|
if (m_undoneJobs.Size() == 0
|
||||||
&& noMoreExperiments()
|
&& noMoreExperiments()
|
||||||
&& m_runningJobs.Size() == 0
|
&& m_runningJobs.Size() == 0
|
||||||
@ -261,9 +259,12 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
|
// Prevent receiveExperimentResults from modifying (or indirectly, via
|
||||||
|
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
|
||||||
|
// (See details in receiveExperimentResults)
|
||||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||||
#endif
|
#endif
|
||||||
if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
||||||
// (This picks one running job.)
|
// (This picks one running job.)
|
||||||
// TODO: Improve selection of parameter set to be resent:
|
// TODO: Improve selection of parameter set to be resent:
|
||||||
@ -304,6 +305,15 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
|
|||||||
ExperimentData* exp = NULL; // Get exp* from running jobs
|
ExperimentData* exp = NULL; // Get exp* from running jobs
|
||||||
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
|
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
|
||||||
cout << "<<[" << workloadID << "] " << flush;
|
cout << "<<[" << workloadID << "] " << flush;
|
||||||
|
#ifndef __puma
|
||||||
|
// Prevent re-sending jobs in sendPendingExperimentData:
|
||||||
|
// a) sendPendingExperimentData needs an intact job to serialize and send it.
|
||||||
|
// b) After moving the job to m_doneJobs, it may be retrieved and deleted
|
||||||
|
// by the campaign at any time.
|
||||||
|
// Additionally, receiving a result overwrites the job's contents. This
|
||||||
|
// already may cause breakage in sendPendingExperimentData (a).
|
||||||
|
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||||
|
#endif
|
||||||
if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
|
if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
|
||||||
// deserialize results, expect failures
|
// deserialize results, expect failures
|
||||||
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
|
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
|
||||||
|
|||||||
Reference in New Issue
Block a user