correct sanity checks for client/server communication

git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1933 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
hsc
2012-11-14 13:31:53 +00:00
parent b8a4797360
commit 49d1608969
3 changed files with 48 additions and 27 deletions

View File

@ -9,17 +9,16 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
{ {
#ifdef USE_SIZE_PREFIX #ifdef USE_SIZE_PREFIX
int size = htonl(msg.ByteSize()); int size = htonl(msg.ByteSize());
if (safe_write(sockfd, &size, sizeof(size)) == -1) {
return false;
}
std::string buf; std::string buf;
msg.SerializeToString(&buf); if (safe_write(sockfd, &size, sizeof(size)) == -1
if (safe_write(sockfd, buf.c_str(), buf.size()) == -1) { || !msg.SerializeToString(&buf)
|| safe_write(sockfd, buf.c_str(), buf.size()) == -1) {
return false; return false;
} }
#else #else
char c = 0; char c = 0;
if (!msg.SerializeToFileDescriptor(sockfd) || safe_write(sockfd, &c, 1) == -1) { if (!msg.SerializeToFileDescriptor(sockfd)
|| safe_write(sockfd, &c, 1) == -1) {
return false; return false;
} }
#endif #endif
@ -41,8 +40,7 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
} }
std::string st(buf, size); std::string st(buf, size);
delete [] buf; delete [] buf;
msg.ParseFromString(st); return msg.ParseFromString(st);
return true;
#else #else
return msg.ParseFromFileDescriptor(sockfd); return msg.ParseFromFileDescriptor(sockfd);
#endif #endif
@ -77,10 +75,13 @@ ssize_t SocketComm::safe_read(int fd, void *buf, size_t count)
continue; continue;
} }
return -1; return -1;
} else if (ret == 0) {
// this deliberately deviates from read(2)
return -1;
} }
count -= ret; count -= ret;
cbuf += ret; cbuf += ret;
} while (ret && count); } while (count);
return cbuf - (const char *) buf; return cbuf - (const char *) buf;
} }

View File

@ -254,8 +254,9 @@ void CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.set_workloadid(workloadID); // set workload id ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
cout << ">>[" << workloadID << "] " << flush; cout << ">>[" << workloadID << "] " << flush;
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
}
return; return;
} }
@ -280,8 +281,9 @@ void CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.set_workloadid(workloadID); // set workload id ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
cout << ">>R[" << workloadID << "] " << flush; cout << ">>R[" << workloadID << "] " << flush;
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
}
} else if (m_js.noMoreExperiments() == false) { } else if (m_js.noMoreExperiments() == false) {
// Currently we have no workload (even the running-job-queue is empty!), but // Currently we have no workload (even the running-job-queue is empty!), but
// the campaign is not over yet. Minion can try again later. // the campaign is not over yet. Minion can try again later.
@ -302,8 +304,12 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
cout << "<<[" << workloadID << "] " << flush; cout << "<<[" << workloadID << "] " << flush;
if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results. // deserialize results, expect failures
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue.. if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
m_js.m_runningJobs.insert(workloadID, exp);
} else {
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue
}
#ifdef SERVER_PERFORMANCE_MEASURE #ifdef SERVER_PERFORMANCE_MEASURE
++JobServer::m_DoneCount; ++JobServer::m_DoneCount;
#endif #endif

View File

@ -71,9 +71,9 @@ bool JobClient::getParam(ExperimentData& exp)
while (1) { // Here we try to acquire a parameter set while (1) { // Here we try to acquire a parameter set
switch (tryToGetExperimentData(exp)) { switch (tryToGetExperimentData(exp)) {
// Jobserver will sent workload, params are set in \c exp // Jobserver will sent workload, params are set in \c exp
case FailControlMessage_Command_WORK_FOLLOWS: return true; case FailControlMessage::WORK_FOLLOWS: return true;
// Nothing to do right now, but maybe later // Nothing to do right now, but maybe later
case FailControlMessage_Command_COME_AGAIN: case FailControlMessage::COME_AGAIN:
sleep(1); sleep(1);
continue; continue;
default: default:
@ -84,29 +84,43 @@ bool JobClient::getParam(ExperimentData& exp)
FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp) FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp)
{ {
FailControlMessage ctrlmsg;
// Connection failed, minion can die // Connection failed, minion can die
if (!connectToServer()) if (!connectToServer()) {
return FailControlMessage_Command_DIE; return FailControlMessage::DIE;
}
// Retrieve ExperimentData // Retrieve ExperimentData
FailControlMessage ctrlmsg; ctrlmsg.set_command(FailControlMessage::NEED_WORK);
ctrlmsg.set_command(FailControlMessage_Command_NEED_WORK);
ctrlmsg.set_build_id(42); ctrlmsg.set_build_id(42);
ctrlmsg.set_run_id(m_server_runid); ctrlmsg.set_run_id(m_server_runid);
SocketComm::sendMsg(m_sockfd, ctrlmsg); if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) {
// Failed to send message? Retry.
close(m_sockfd);
return FailControlMessage::COME_AGAIN;
}
ctrlmsg.Clear(); ctrlmsg.Clear();
SocketComm::rcvMsg(m_sockfd, ctrlmsg); if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) {
// Failed to receive message? Retry.
close(m_sockfd);
return FailControlMessage::COME_AGAIN;
}
// now we know the current run ID // now we know the current run ID
m_server_runid = ctrlmsg.run_id(); m_server_runid = ctrlmsg.run_id();
switch (ctrlmsg.command()) { switch (ctrlmsg.command()) {
case FailControlMessage_Command_WORK_FOLLOWS: case FailControlMessage::WORK_FOLLOWS:
SocketComm::rcvMsg(m_sockfd, exp.getMessage()); if (!SocketComm::rcvMsg(m_sockfd, exp.getMessage())) {
// Failed to receive message? Retry.
close(m_sockfd);
return FailControlMessage::COME_AGAIN;
}
exp.setWorkloadID(ctrlmsg.workloadid()); // Store workload id of experiment data exp.setWorkloadID(ctrlmsg.workloadid()); // Store workload id of experiment data
break; break;
case FailControlMessage_Command_COME_AGAIN: case FailControlMessage::COME_AGAIN:
break; break;
default: default:
break; break;
@ -122,7 +136,7 @@ bool JobClient::sendResult(ExperimentData& result)
// Send back results // Send back results
FailControlMessage ctrlmsg; FailControlMessage ctrlmsg;
ctrlmsg.set_command(FailControlMessage_Command_RESULT_FOLLOWS); ctrlmsg.set_command(FailControlMessage::RESULT_FOLLOWS);
ctrlmsg.set_build_id(42); ctrlmsg.set_build_id(42);
ctrlmsg.set_run_id(m_server_runid); ctrlmsg.set_run_id(m_server_runid);
ctrlmsg.set_workloadid(result.getWorkloadID()); ctrlmsg.set_workloadid(result.getWorkloadID());