From 1f6e275e5eadeb82782af09d955e7a4a5bee469c Mon Sep 17 00:00:00 2001 From: Horst Schirmeier Date: Mon, 20 Jan 2014 22:21:40 +0100 Subject: [PATCH] jobserver: bugfix: potential race Delay insertion of to-be-sent jobs into m_runningJobs until they are really sent, as getMessage() won't work anymore (as in: segfault) if this job is concurrently re-sent (due to campaign end), its result is received, and deleted in the campaign. This becomes non-hypothetical with larger values for CLIENT_JOB_LIMIT and CLIENT_JOB_REQUEST_SEC. Additionally, reinsert the remaining jobs into the input queue if communication fails, instead of inefficiently delaying redistribution until the campaign end. Change-Id: If85e3c8261deda86beb8d4d93343429223753f22 --- src/core/cpn/JobServer.cc | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index b510b6bb..250e6e56 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -266,10 +266,6 @@ void CommThread::sendPendingExperimentData(Minion& minion) } else { break; } - - if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { - cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - } } if (exp.size() != 0) { ctrlmsg.set_job_size(exp.size()); @@ -280,8 +276,22 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { for (i = 0; i < ctrlmsg.job_size(); i++) { if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + + // delay insertion into m_runningJobs until here, as + // getMessage() won't work anymore if this job is re-sent, + // received, and deleted in the meantime + if (!m_js.m_runningJobs.insert(exp.front()->getWorkloadID(), exp.front())) { + cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; + } + exp.pop_front(); } else { + // add remaining jobs back to the queue + cout << "!![Server] failed to send scheduled " << exp.size() << " jobs" << endl; + while (exp.size()) { + m_js.m_undoneJobs.Enqueue(exp.front()); + exp.pop_front(); + } break; }