cosmetics

Change-Id: Ifae805ae1e2dac95324e054af09a7b70f5d5b60c
This commit is contained in:
Horst Schirmeier
2013-04-22 14:24:02 +02:00
parent 2d45a2c52c
commit 0f16f18d75
19 changed files with 101 additions and 103 deletions

View File

@ -40,7 +40,6 @@ boost::mutex CommThread::m_CommMutex;
ExperimentData *JobServer::getDone()
{
#ifndef __puma
if (m_undoneJobs.Size() == 0
&& noMoreExperiments()
@ -151,7 +150,7 @@ void JobServer::run()
// TODO: Log-level?
return;
}
cout << "JobServer listening...." << endl;
cout << "JobServer listening ..." << endl;
// TODO: Log-level?
#ifndef __puma
boost::thread* th;
@ -248,43 +247,39 @@ void CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.set_run_id(m_js.m_runid);
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
for (i = 0; i < m_job_size ; i++) {
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
// Got an element from queue, assign ID to workload and send to minion
workloadID = m_js.m_counter.increment(); // increment workload counter
temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result
ctrlmsg.add_workloadid(workloadID);
exp.push_back(temp_exp);
} else {
break;
}
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
for (i = 0; i < m_job_size; i++) {
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
// Got an element from queue, assign ID to workload and send to minion
workloadID = m_js.m_counter.increment(); // increment workload counter
temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result
ctrlmsg.add_workloadid(workloadID);
exp.push_back(temp_exp);
} else {
break;
}
if (exp.size() != 0) {
ctrlmsg.set_job_size(exp.size());
cout << " >>[";
for ( i = 0; i < exp.size() ; i++) {
cout << " "<< ctrlmsg.workloadid(i) <<" ";
}
cout << "] " << flush;
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
}
if (exp.size() != 0) {
ctrlmsg.set_job_size(exp.size());
cout << " >>[" << ctrlmsg.workloadid(0) << "+"
<< exp.size() << "] \r" << flush;
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
for (i = 0; i < ctrlmsg.job_size() ; i++) {
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
exp.pop_front();
} else {
break;
}
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
for (i = 0; i < ctrlmsg.job_size(); i++) {
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
exp.pop_front();
} else {
break;
}
}
return;
}
return;
}
#ifndef __puma
// Prevent receiveExperimentResults from modifying (or indirectly, via
@ -310,7 +305,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.add_workloadid(workloadID); // set workload id
ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
cout << ">>R[" << workloadID << "] " << flush;
cout << ">>R[" << workloadID << "] \r" << flush;
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage());
}
@ -332,11 +327,10 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
{
int i;
ExperimentData* exp = NULL; // Get exp* from running jobs
cout << " <<[ ";
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
cout << ctrlmsg.workloadid(i) << " ";
if (ctrlmsg.workloadid_size() > 0) {
cout << " <<[" << ctrlmsg.workloadid(0) << "+"
<< ctrlmsg.workloadid_size() << "] \r" << flush;
}
cout << "] " << flush;
#ifndef __puma
// Prevent re-sending jobs in sendPendingExperimentData:
// a) sendPendingExperimentData needs an intact job to serialize and send it.
@ -346,8 +340,8 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
// already may cause breakage in sendPendingExperimentData (a).
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
for (i = 0 ; i < ctrlmsg.workloadid_size() ; i++) {
if (m_js.m_runningJobs.remove( ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
// deserialize results, expect failures
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp);