diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index 690eca25..c669023c 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -247,40 +247,45 @@ void CommThread::sendPendingExperimentData(Minion& minion) ctrlmsg.set_build_id(42); ctrlmsg.set_run_id(m_js.m_runid); ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); - - for(i = 0; i < m_job_size ; i++) { - if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) { - // Got an element from queue, assign ID to workload and send to minion - workloadID = m_js.m_counter.increment(); // increment workload counter - temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result - ctrlmsg.add_workloadid(workloadID); - exp.push_back(temp_exp); - } - - if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { - cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - } - } - ctrlmsg.set_job_size(exp.size()); - - cout << " >>["; - for ( i = 0; i < exp.size() ; i++) { - cout << " "<< ctrlmsg.workloadid(i) <<" "; - } - cout << "] " << flush; - - - if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { - for (i = 0; i < ctrlmsg.job_size() ; i++) { - if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { - exp.erase(exp.begin()); + + for(i = 0; i < m_job_size ; i++) { + if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) { + // Got an element from queue, assign ID to workload and send to minion + workloadID = m_js.m_counter.increment(); // increment workload counter + temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result + ctrlmsg.add_workloadid(workloadID); + exp.push_back(temp_exp); } else { break; } + if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { + cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; + sleep(10); + } + } + if (exp.size() != 0) { + ctrlmsg.set_job_size(exp.size()); + + cout << " >>["; + for ( i = 0; i < exp.size() ; i++) { + cout << " "<< ctrlmsg.workloadid(i) <<" "; + } + cout << "] " << flush; + + + if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { + for (i = 0; i < ctrlmsg.job_size() ; i++) { + if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + exp.erase(exp.begin()); + } else { + break; + } + + } + return; + } } - return; - } #ifndef __puma // Prevent receiveExperimentResults from modifying (or indirectly, via diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index ffdc8c48..f0daebbc 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -148,15 +148,17 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp } close(m_sockfd); - //Take front from m_parameters and copy to exp. - exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); - exp.setWorkloadID(m_parameters.front()->getWorkloadID()); - //Delete front element of m_parameters - delete &m_parameters.front()->getMessage(); - delete m_parameters.front(); - m_parameters.erase(m_parameters.begin()); - //start time measurement for throughput calculation - m_job_runtime.startTimer(); + if (m_parameters.size() != 0) { + //Take front from m_parameters and copy to exp. + exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); + exp.setWorkloadID(m_parameters.front()->getWorkloadID()); + //Delete front element of m_parameters + delete &m_parameters.front()->getMessage(); + delete m_parameters.front(); + m_parameters.erase(m_parameters.begin()); + //start time measurement for throughput calculation + m_job_runtime.startTimer(); + } return ctrlmsg.command(); }