diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index b510b6bb..250e6e56 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -266,10 +266,6 @@ void CommThread::sendPendingExperimentData(Minion& minion) } else { break; } - - if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { - cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - } } if (exp.size() != 0) { ctrlmsg.set_job_size(exp.size()); @@ -280,8 +276,22 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { for (i = 0; i < ctrlmsg.job_size(); i++) { if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + + // delay insertion into m_runningJobs until here, as + // getMessage() won't work anymore if this job is re-sent, + // received, and deleted in the meantime + if (!m_js.m_runningJobs.insert(exp.front()->getWorkloadID(), exp.front())) { + cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; + } + exp.pop_front(); } else { + // add remaining jobs back to the queue + cout << "!![Server] failed to send scheduled " << exp.size() << " jobs" << endl; + while (exp.size()) { + m_js.m_undoneJobs.Enqueue(exp.front()); + exp.pop_front(); + } break; }