diff --git a/src/core/comm/SocketComm.cc b/src/core/comm/SocketComm.cc index 092748c9..0ae632e9 100644 --- a/src/core/comm/SocketComm.cc +++ b/src/core/comm/SocketComm.cc @@ -9,17 +9,16 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg) { #ifdef USE_SIZE_PREFIX int size = htonl(msg.ByteSize()); - if (safe_write(sockfd, &size, sizeof(size)) == -1) { - return false; - } std::string buf; - msg.SerializeToString(&buf); - if (safe_write(sockfd, buf.c_str(), buf.size()) == -1) { + if (safe_write(sockfd, &size, sizeof(size)) == -1 + || !msg.SerializeToString(&buf) + || safe_write(sockfd, buf.c_str(), buf.size()) == -1) { return false; } #else char c = 0; - if (!msg.SerializeToFileDescriptor(sockfd) || safe_write(sockfd, &c, 1) == -1) { + if (!msg.SerializeToFileDescriptor(sockfd) + || safe_write(sockfd, &c, 1) == -1) { return false; } #endif @@ -41,8 +40,7 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg) } std::string st(buf, size); delete [] buf; - msg.ParseFromString(st); - return true; + return msg.ParseFromString(st); #else return msg.ParseFromFileDescriptor(sockfd); #endif @@ -77,10 +75,13 @@ ssize_t SocketComm::safe_read(int fd, void *buf, size_t count) continue; } return -1; + } else if (ret == 0) { + // this deliberately deviates from read(2) + return -1; } count -= ret; cbuf += ret; - } while (ret && count); + } while (count); return cbuf - (const char *) buf; } diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index dba6ecc3..44ad5210 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -254,8 +254,9 @@ void CommThread::sendPendingExperimentData(Minion& minion) ctrlmsg.set_workloadid(workloadID); // set workload id //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; cout << ">>[" << workloadID << "] " << flush; - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); - SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { + SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + } return; } @@ -280,8 +281,9 @@ void CommThread::sendPendingExperimentData(Minion& minion) ctrlmsg.set_workloadid(workloadID); // set workload id //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; cout << ">>R[" << workloadID << "] " << flush; - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); - SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { + SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + } } else if (m_js.noMoreExperiments() == false) { // Currently we have no workload (even the running-job-queue is empty!), but // the campaign is not over yet. Minion can try again later. @@ -302,8 +304,12 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; cout << "<<[" << workloadID << "] " << flush; if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found - SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results. - m_js.m_doneJobs.Enqueue(exp); // Put results in done queue.. + // deserialize results, expect failures + if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) { + m_js.m_runningJobs.insert(workloadID, exp); + } else { + m_js.m_doneJobs.Enqueue(exp); // Put results in done queue + } #ifdef SERVER_PERFORMANCE_MEASURE ++JobServer::m_DoneCount; #endif diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index 6c1587bc..42a36590 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -71,9 +71,9 @@ bool JobClient::getParam(ExperimentData& exp) while (1) { // Here we try to acquire a parameter set switch (tryToGetExperimentData(exp)) { // Jobserver will sent workload, params are set in \c exp - case FailControlMessage_Command_WORK_FOLLOWS: return true; + case FailControlMessage::WORK_FOLLOWS: return true; // Nothing to do right now, but maybe later - case FailControlMessage_Command_COME_AGAIN: + case FailControlMessage::COME_AGAIN: sleep(1); continue; default: @@ -84,29 +84,43 @@ bool JobClient::getParam(ExperimentData& exp) FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp) { + FailControlMessage ctrlmsg; + // Connection failed, minion can die - if (!connectToServer()) - return FailControlMessage_Command_DIE; + if (!connectToServer()) { + return FailControlMessage::DIE; + } // Retrieve ExperimentData - FailControlMessage ctrlmsg; - ctrlmsg.set_command(FailControlMessage_Command_NEED_WORK); + ctrlmsg.set_command(FailControlMessage::NEED_WORK); ctrlmsg.set_build_id(42); ctrlmsg.set_run_id(m_server_runid); - SocketComm::sendMsg(m_sockfd, ctrlmsg); + if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { + // Failed to send message? Retry. + close(m_sockfd); + return FailControlMessage::COME_AGAIN; + } ctrlmsg.Clear(); - SocketComm::rcvMsg(m_sockfd, ctrlmsg); + if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) { + // Failed to receive message? Retry. + close(m_sockfd); + return FailControlMessage::COME_AGAIN; + } // now we know the current run ID m_server_runid = ctrlmsg.run_id(); switch (ctrlmsg.command()) { - case FailControlMessage_Command_WORK_FOLLOWS: - SocketComm::rcvMsg(m_sockfd, exp.getMessage()); + case FailControlMessage::WORK_FOLLOWS: + if (!SocketComm::rcvMsg(m_sockfd, exp.getMessage())) { + // Failed to receive message? Retry. + close(m_sockfd); + return FailControlMessage::COME_AGAIN; + } exp.setWorkloadID(ctrlmsg.workloadid()); // Store workload id of experiment data break; - case FailControlMessage_Command_COME_AGAIN: + case FailControlMessage::COME_AGAIN: break; default: break; @@ -122,7 +136,7 @@ bool JobClient::sendResult(ExperimentData& result) // Send back results FailControlMessage ctrlmsg; - ctrlmsg.set_command(FailControlMessage_Command_RESULT_FOLLOWS); + ctrlmsg.set_command(FailControlMessage::RESULT_FOLLOWS); ctrlmsg.set_build_id(42); ctrlmsg.set_run_id(m_server_runid); ctrlmsg.set_workloadid(result.getWorkloadID());