The Jobclient can get several jobs with one request
git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1963 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -9,7 +9,6 @@
|
||||
#include <string.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include "comm/FailControlMessage.pb.h"
|
||||
#include "comm/SocketComm.hpp"
|
||||
#include "JobServer.hpp"
|
||||
#include "Minion.hpp"
|
||||
@ -212,6 +211,7 @@ void CommThread::operator()()
|
||||
break;
|
||||
}
|
||||
// give minion something to do..
|
||||
m_job_size = ctrlmsg.job_size();
|
||||
sendPendingExperimentData(minion);
|
||||
break;
|
||||
case FailControlMessage::RESULT_FOLLOWS:
|
||||
@ -221,7 +221,7 @@ void CommThread::operator()()
|
||||
break;
|
||||
}
|
||||
// get results and put to done queue.
|
||||
receiveExperimentResults(minion, ctrlmsg.workloadid());
|
||||
receiveExperimentResults(minion, ctrlmsg);
|
||||
break;
|
||||
default:
|
||||
// hm.. don't know what to do. please die.
|
||||
@ -238,23 +238,46 @@ void CommThread::operator()()
|
||||
|
||||
void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
{
|
||||
uint32_t i;
|
||||
uint32_t workloadID;
|
||||
std::vector<ExperimentData*> exp;
|
||||
ExperimentData* temp_exp = 0;
|
||||
FailControlMessage ctrlmsg;
|
||||
|
||||
ctrlmsg.set_build_id(42);
|
||||
ctrlmsg.set_run_id(m_js.m_runid);
|
||||
ExperimentData * exp = 0;
|
||||
if (m_js.m_undoneJobs.Dequeue_nb(exp) == true) {
|
||||
// Got an element from queue, assign ID to workload and send to minion
|
||||
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
|
||||
exp->setWorkloadID(workloadID); // store ID for identification when receiving result
|
||||
if (!m_js.m_runningJobs.insert(workloadID, exp)) {
|
||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||
|
||||
for(i = 0; i < m_job_size ; i++) {
|
||||
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
|
||||
// Got an element from queue, assign ID to workload and send to minion
|
||||
workloadID = m_js.m_counter.increment(); // increment workload counter
|
||||
temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result
|
||||
ctrlmsg.add_workloadid(workloadID);
|
||||
exp.push_back(temp_exp);
|
||||
}
|
||||
|
||||
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
|
||||
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
|
||||
}
|
||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||
ctrlmsg.set_workloadid(workloadID); // set workload id
|
||||
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
|
||||
cout << ">>[" << workloadID << "] " << flush;
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
|
||||
}
|
||||
ctrlmsg.set_job_size(exp.size());
|
||||
|
||||
cout << " >>[";
|
||||
for ( i = 0; i < exp.size() ; i++) {
|
||||
cout << " "<< ctrlmsg.workloadid(i) <<" ";
|
||||
}
|
||||
cout << "] " << flush;
|
||||
|
||||
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
for (i = 0; i < ctrlmsg.job_size() ; i++) {
|
||||
if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
||||
exp.erase(exp.begin());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -265,7 +288,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
// (See details in receiveExperimentResults)
|
||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||
#endif
|
||||
if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
||||
if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
|
||||
// (This picks one running job.)
|
||||
// TODO: Improve selection of parameter set to be resent:
|
||||
// - currently: Linear complexity!
|
||||
@ -277,14 +300,15 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
// clients.
|
||||
// (Note: Therefore we need to be aware of receiving multiple results for a
|
||||
// single parameter-set, @see receiveExperimentResults.)
|
||||
uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously)
|
||||
uint32_t workloadID = temp_exp->getWorkloadID(); // (this ID has been set previously)
|
||||
// Resend the parameter-set.
|
||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||
ctrlmsg.set_workloadid(workloadID); // set workload id
|
||||
ctrlmsg.add_workloadid(workloadID); // set workload id
|
||||
ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job
|
||||
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
|
||||
cout << ">>R[" << workloadID << "] " << flush;
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
|
||||
SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage());
|
||||
}
|
||||
} else if (m_js.noMoreExperiments() == false) {
|
||||
// Currently we have no workload (even the running-job-queue is empty!), but
|
||||
@ -300,11 +324,15 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
}
|
||||
}
|
||||
|
||||
void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
|
||||
void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg)
|
||||
{
|
||||
int i;
|
||||
ExperimentData* exp = NULL; // Get exp* from running jobs
|
||||
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
|
||||
cout << "<<[" << workloadID << "] " << flush;
|
||||
cout << " <<[ ";
|
||||
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
|
||||
cout << ctrlmsg.workloadid(i) << " ";
|
||||
}
|
||||
cout << "] " << flush;
|
||||
#ifndef __puma
|
||||
// Prevent re-sending jobs in sendPendingExperimentData:
|
||||
// a) sendPendingExperimentData needs an intact job to serialize and send it.
|
||||
@ -314,24 +342,27 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
|
||||
// already may cause breakage in sendPendingExperimentData (a).
|
||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||
#endif
|
||||
if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
|
||||
// deserialize results, expect failures
|
||||
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
|
||||
m_js.m_runningJobs.insert(workloadID, exp);
|
||||
for (i = 0 ; i < ctrlmsg.workloadid_size() ; i++) {
|
||||
if (m_js.m_runningJobs.remove( ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
|
||||
// deserialize results, expect failures
|
||||
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
|
||||
m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp);
|
||||
} else {
|
||||
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue
|
||||
}
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
++JobServer::m_DoneCount;
|
||||
#endif
|
||||
} else {
|
||||
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue
|
||||
// We can receive several results for the same workload id because
|
||||
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
||||
cout << "[Server] Received another result for workload id ["
|
||||
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl;
|
||||
|
||||
// TODO: Any need for error-handling here?
|
||||
}
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
++JobServer::m_DoneCount;
|
||||
#endif
|
||||
} else {
|
||||
// We can receive several results for the same workload id because
|
||||
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
||||
cout << "[Server] Received another result for workload id ["
|
||||
<< workloadID << "] -- ignored." << endl;
|
||||
|
||||
// TODO: Any need for error-handling here?
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} // end-of-namespace: fail
|
||||
|
||||
Reference in New Issue
Block a user