From d7842c2ad7dc8ef31ad1d7e98e86e0e542dd6f88 Mon Sep 17 00:00:00 2001 From: hellwig Date: Fri, 30 Nov 2012 16:50:02 +0000 Subject: [PATCH] The Jobclient can get several jobs with one request git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1963 8c4709b5-6ec9-48aa-a5cd-a96041d1645a --- src/core/comm/FailControlMessage.proto | 9 +- src/core/config/CMakeLists.txt | 2 + src/core/config/FailConfig.hpp.in | 2 + src/core/cpn/JobServer.cc | 103 +++++++++----- src/core/cpn/JobServer.hpp | 6 +- src/core/efw/JobClient.cc | 178 ++++++++++++++++++------- src/core/efw/JobClient.hpp | 8 +- 7 files changed, 214 insertions(+), 94 deletions(-) diff --git a/src/core/comm/FailControlMessage.proto b/src/core/comm/FailControlMessage.proto index 2d5c55c7..5af627aa 100644 --- a/src/core/comm/FailControlMessage.proto +++ b/src/core/comm/FailControlMessage.proto @@ -5,15 +5,16 @@ message FailControlMessage { RESULT_FOLLOWS = 1; // followed by experiment-specific ExperimentData message (holding both original parameters and experiment result) // JobServer may send these: - WORK_FOLLOWS = 5; // followed by experiment-specific ExperimentData message - COME_AGAIN = 6; // no experiment-specific ExperimentData at the moment, but Campaign is not over yet - DIE = 7; // tells the client to terminate + WORK_FOLLOWS = 6; // followed by experiment-specific ExperimentData message + COME_AGAIN = 7; // no experiment-specific ExperimentData at the moment, but Campaign is not over yet + DIE = 8; // tells the client to terminate } required Command command = 1; - optional uint32 workloadID = 2; + repeated uint32 workloadID = 2; // identifying the client/server build (e.g., build time in unixtime format) required uint64 build_id = 3; // campaign server run ID: prevents old clients talking to new servers optional uint64 run_id = 4; + optional uint32 job_size = 5; } diff --git a/src/core/config/CMakeLists.txt b/src/core/config/CMakeLists.txt index 3c2ded8d..1886bf63 100644 --- a/src/core/config/CMakeLists.txt +++ b/src/core/config/CMakeLists.txt @@ -29,6 +29,8 @@ SET(SERVER_PERF_STEPPING_SEC "1" CACHE STRING "Stepping of performan SET(CLIENT_RAND_BACKOFF_TSTART "3" CACHE STRING "Lower limit of client's backoff phase in seconds") SET(CLIENT_RAND_BACKOFF_TEND "8" CACHE STRING "Upper limit of client's backoff phase in seconds") SET(CLIENT_RETRY_COUNT "3" CACHE STRING "Client's number of reconnect retries") +SET(CLIENT_JOB_REQUEST_SEC "60" CACHE STRING "Determines how often the client asks for new jobs") +SET(CLIENT_JOB_LIMIT_SEC "1000" CACHE STRING "How many jobs can a client ask for") configure_file(${CMAKE_CURRENT_SOURCE_DIR}/FailConfig.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/FailConfig.hpp) diff --git a/src/core/config/FailConfig.hpp.in b/src/core/config/FailConfig.hpp.in index 4e9ac0d3..4977908f 100644 --- a/src/core/config/FailConfig.hpp.in +++ b/src/core/config/FailConfig.hpp.in @@ -39,6 +39,8 @@ #define CLIENT_RAND_BACKOFF_TSTART @CLIENT_RAND_BACKOFF_TSTART@ #define CLIENT_RAND_BACKOFF_TEND @CLIENT_RAND_BACKOFF_TEND@ #define CLIENT_RETRY_COUNT @CLIENT_RETRY_COUNT@ +#define CLIENT_JOB_REQUEST_SEC @CLIENT_JOB_REQUEST_SEC@ +#define CLIENT_JOB_LIMIT_SEC @CLIENT_JOB_LIMIT_SEC@ #define PROJECT_VERSION "@PROJECT_VERSION@" #define FAIL_VERSION PROJECT_VERSION diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index f528b779..690eca25 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -9,7 +9,6 @@ #include #include -#include "comm/FailControlMessage.pb.h" #include "comm/SocketComm.hpp" #include "JobServer.hpp" #include "Minion.hpp" @@ -212,6 +211,7 @@ void CommThread::operator()() break; } // give minion something to do.. + m_job_size = ctrlmsg.job_size(); sendPendingExperimentData(minion); break; case FailControlMessage::RESULT_FOLLOWS: @@ -221,7 +221,7 @@ void CommThread::operator()() break; } // get results and put to done queue. - receiveExperimentResults(minion, ctrlmsg.workloadid()); + receiveExperimentResults(minion, ctrlmsg); break; default: // hm.. don't know what to do. please die. @@ -238,23 +238,46 @@ void CommThread::operator()() void CommThread::sendPendingExperimentData(Minion& minion) { + uint32_t i; + uint32_t workloadID; + std::vector exp; + ExperimentData* temp_exp = 0; FailControlMessage ctrlmsg; + ctrlmsg.set_build_id(42); ctrlmsg.set_run_id(m_js.m_runid); - ExperimentData * exp = 0; - if (m_js.m_undoneJobs.Dequeue_nb(exp) == true) { - // Got an element from queue, assign ID to workload and send to minion - uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter - exp->setWorkloadID(workloadID); // store ID for identification when receiving result - if (!m_js.m_runningJobs.insert(workloadID, exp)) { + ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); + + for(i = 0; i < m_job_size ; i++) { + if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) { + // Got an element from queue, assign ID to workload and send to minion + workloadID = m_js.m_counter.increment(); // increment workload counter + temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result + ctrlmsg.add_workloadid(workloadID); + exp.push_back(temp_exp); + } + + if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; } - ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); - ctrlmsg.set_workloadid(workloadID); // set workload id - //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; - cout << ">>[" << workloadID << "] " << flush; - if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { - SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + } + ctrlmsg.set_job_size(exp.size()); + + cout << " >>["; + for ( i = 0; i < exp.size() ; i++) { + cout << " "<< ctrlmsg.workloadid(i) <<" "; + } + cout << "] " << flush; + + + if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { + for (i = 0; i < ctrlmsg.job_size() ; i++) { + if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + exp.erase(exp.begin()); + } else { + break; + } + } return; } @@ -265,7 +288,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) // (See details in receiveExperimentResults) boost::unique_lock lock(m_CommMutex); #endif - if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority + if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority // (This picks one running job.) // TODO: Improve selection of parameter set to be resent: // - currently: Linear complexity! @@ -277,14 +300,15 @@ void CommThread::sendPendingExperimentData(Minion& minion) // clients. // (Note: Therefore we need to be aware of receiving multiple results for a // single parameter-set, @see receiveExperimentResults.) - uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously) + uint32_t workloadID = temp_exp->getWorkloadID(); // (this ID has been set previously) // Resend the parameter-set. ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); - ctrlmsg.set_workloadid(workloadID); // set workload id + ctrlmsg.add_workloadid(workloadID); // set workload id + ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; cout << ">>R[" << workloadID << "] " << flush; if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { - SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage()); + SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage()); } } else if (m_js.noMoreExperiments() == false) { // Currently we have no workload (even the running-job-queue is empty!), but @@ -300,11 +324,15 @@ void CommThread::sendPendingExperimentData(Minion& minion) } } -void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) +void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg) { + int i; ExperimentData* exp = NULL; // Get exp* from running jobs - //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; - cout << "<<[" << workloadID << "] " << flush; + cout << " <<[ "; + for (i = 0; i < ctrlmsg.workloadid_size(); i++) { + cout << ctrlmsg.workloadid(i) << " "; + } + cout << "] " << flush; #ifndef __puma // Prevent re-sending jobs in sendPendingExperimentData: // a) sendPendingExperimentData needs an intact job to serialize and send it. @@ -314,24 +342,27 @@ void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) // already may cause breakage in sendPendingExperimentData (a). boost::unique_lock lock(m_CommMutex); #endif - if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found - // deserialize results, expect failures - if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) { - m_js.m_runningJobs.insert(workloadID, exp); + for (i = 0 ; i < ctrlmsg.workloadid_size() ; i++) { + if (m_js.m_runningJobs.remove( ctrlmsg.workloadid(i), exp)) { // ExperimentData* found + // deserialize results, expect failures + if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) { + m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp); + } else { + m_js.m_doneJobs.Enqueue(exp); // Put results in done queue + } + #ifdef SERVER_PERFORMANCE_MEASURE + ++JobServer::m_DoneCount; + #endif } else { - m_js.m_doneJobs.Enqueue(exp); // Put results in done queue + // We can receive several results for the same workload id because + // we (may) distribute the (running) jobs to a *few* experiment-clients. + cout << "[Server] Received another result for workload id [" + << ctrlmsg.workloadid(i) << "] -- ignored." << endl; + + // TODO: Any need for error-handling here? } - #ifdef SERVER_PERFORMANCE_MEASURE - ++JobServer::m_DoneCount; - #endif - } else { - // We can receive several results for the same workload id because - // we (may) distribute the (running) jobs to a *few* experiment-clients. - cout << "[Server] Received another result for workload id [" - << workloadID << "] -- ignored." << endl; - - // TODO: Any need for error-handling here? } + } } // end-of-namespace: fail diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 74ef1ab1..1f6ca999 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -6,6 +6,7 @@ #include "util/SynchronizedCounter.hpp" #include "util/SynchronizedMap.hpp" #include "config/FailConfig.hpp" +#include "comm/FailControlMessage.pb.h" #include #include @@ -139,6 +140,7 @@ public: class CommThread { private: int m_sock; //! Socket descriptor of the connection + uint32_t m_job_size; JobServer& m_js; //! Calling jobserver // FIXME: Concerns are not really separated yet ;) @@ -156,13 +158,13 @@ private: * @param minion The minion offering results * @param workloadID The workload id of the result message */ - void receiveExperimentResults(Minion& minion, uint32_t workloadID); + void receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg); public: #ifndef __puma static boost::mutex m_CommMutex; //! to synchronise the communication #endif // __puma CommThread(int sockfd, JobServer& p) - : m_sock(sockfd), m_js(p) { } + : m_sock(sockfd), m_job_size(1), m_js(p) { } /** * The thread's entry point. */ diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index 42a36590..ffdc8c48 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -16,6 +16,7 @@ JobClient::JobClient(const std::string& server, int port) } srand(time(NULL)); // needed for random backoff (see connectToServer) m_server_runid = 0; // server accepts this for virgin clients + m_job_throughput = 1; // client gets only one job at the first request } bool JobClient::connectToServer() @@ -71,7 +72,8 @@ bool JobClient::getParam(ExperimentData& exp) while (1) { // Here we try to acquire a parameter set switch (tryToGetExperimentData(exp)) { // Jobserver will sent workload, params are set in \c exp - case FailControlMessage::WORK_FOLLOWS: return true; + case FailControlMessage::WORK_FOLLOWS: + return true; // Nothing to do right now, but maybe later case FailControlMessage::COME_AGAIN: sleep(1); @@ -84,70 +86,144 @@ bool JobClient::getParam(ExperimentData& exp) FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp) { - FailControlMessage ctrlmsg; + //Are there other jobs for the experiment + if (m_parameters.size() != 0) { + exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); + exp.setWorkloadID(m_parameters.front()->getWorkloadID()); + + delete &m_parameters.front()->getMessage(); + delete m_parameters.front(); + m_parameters.erase(m_parameters.begin()); + + return FailControlMessage::WORK_FOLLOWS; + } else { + FailControlMessage ctrlmsg; - // Connection failed, minion can die - if (!connectToServer()) { - return FailControlMessage::DIE; - } + // Connection failed, minion can die + if (!connectToServer()) { + return FailControlMessage::DIE; + } - // Retrieve ExperimentData - ctrlmsg.set_command(FailControlMessage::NEED_WORK); - ctrlmsg.set_build_id(42); - ctrlmsg.set_run_id(m_server_runid); + // Retrieve ExperimentData + ctrlmsg.set_command(FailControlMessage::NEED_WORK); + ctrlmsg.set_build_id(42); + ctrlmsg.set_run_id(m_server_runid); + ctrlmsg.set_job_size(m_job_throughput); //Request for a number of jobs - if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { - // Failed to send message? Retry. - close(m_sockfd); - return FailControlMessage::COME_AGAIN; - } - ctrlmsg.Clear(); - if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) { - // Failed to receive message? Retry. - close(m_sockfd); - return FailControlMessage::COME_AGAIN; - } - - // now we know the current run ID - m_server_runid = ctrlmsg.run_id(); - - switch (ctrlmsg.command()) { - case FailControlMessage::WORK_FOLLOWS: - if (!SocketComm::rcvMsg(m_sockfd, exp.getMessage())) { + if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { + // Failed to send message? Retry. + close(m_sockfd); + return FailControlMessage::COME_AGAIN; + } + ctrlmsg.Clear(); + if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) { // Failed to receive message? Retry. close(m_sockfd); return FailControlMessage::COME_AGAIN; } - exp.setWorkloadID(ctrlmsg.workloadid()); // Store workload id of experiment data - break; - case FailControlMessage::COME_AGAIN: - break; - default: - break; + + // now we know the current run ID + m_server_runid = ctrlmsg.run_id(); + + switch (ctrlmsg.command()) { + case FailControlMessage::WORK_FOLLOWS: + uint32_t i; + for (i = 0 ; i < ctrlmsg.job_size() ; i++) { + ExperimentData* temp_exp = new ExperimentData(exp.getMessage().New()); + + if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) { + // Failed to receive message? Retry. + close(m_sockfd); + return FailControlMessage::COME_AGAIN; + } + + temp_exp->setWorkloadID(ctrlmsg.workloadid(i)); //Store workload id of experiment data + m_parameters.push_back(temp_exp); + } + break; + case FailControlMessage::COME_AGAIN: + break; + default: + break; + } + + close(m_sockfd); + //Take front from m_parameters and copy to exp. + exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); + exp.setWorkloadID(m_parameters.front()->getWorkloadID()); + //Delete front element of m_parameters + delete &m_parameters.front()->getMessage(); + delete m_parameters.front(); + m_parameters.erase(m_parameters.begin()); + //start time measurement for throughput calculation + m_job_runtime.startTimer(); + + return ctrlmsg.command(); } - close(m_sockfd); - return ctrlmsg.command(); } bool JobClient::sendResult(ExperimentData& result) { - if (!connectToServer()) - return false; + //Create new ExperimentData for result + ExperimentData* temp_exp = new ExperimentData(result.getMessage().New()); + temp_exp->getMessage().CopyFrom(result.getMessage()); + temp_exp->setWorkloadID(result.getWorkloadID()); + + if (m_parameters.size() != 0) { + //If there are more jobs for the experiment store result + m_results.push_back( temp_exp ); + + return true; + } else { + m_results.push_back( temp_exp ); + + //Stop time measurement and calculate new throughput + m_job_runtime.stopTimer(); + m_job_throughput = CLIENT_JOB_REQUEST_SEC/((double)m_job_runtime/m_results.size()); + + if (m_job_throughput > CLIENT_JOB_LIMIT_SEC) + m_job_throughput = CLIENT_JOB_LIMIT_SEC; + + if (m_job_throughput < 1) + m_job_throughput = 1; + + //Reset timer for new time measurement + m_job_runtime.reset(); + + if (!connectToServer()) + return false; - // Send back results - FailControlMessage ctrlmsg; - ctrlmsg.set_command(FailControlMessage::RESULT_FOLLOWS); - ctrlmsg.set_build_id(42); - ctrlmsg.set_run_id(m_server_runid); - ctrlmsg.set_workloadid(result.getWorkloadID()); - cout << "[Client] Sending back result [" << std::dec << result.getWorkloadID() << "]..." << endl; - // TODO: Log-level? - SocketComm::sendMsg(m_sockfd, ctrlmsg); - SocketComm::sendMsg(m_sockfd, result.getMessage()); + //Send back results + FailControlMessage ctrlmsg; + ctrlmsg.set_command(FailControlMessage::RESULT_FOLLOWS); + ctrlmsg.set_build_id(42); + ctrlmsg.set_run_id(m_server_runid); + ctrlmsg.set_job_size(m_results.size()); //Store how many results will be sent + + cout << "[Client] Sending back result ["; + + uint32_t i; + for (i = 0; i < m_results.size() ; i++) { + ctrlmsg.add_workloadid(m_results[i]->getWorkloadID()); + cout << std::dec << m_results[i]->getWorkloadID(); + cout << " "; + } + cout << "]"; + + // TODO: Log-level? + SocketComm::sendMsg(m_sockfd, ctrlmsg); + + for (i = 0; i < ctrlmsg.job_size() ; i++) { + SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage()); + delete &m_results.front()->getMessage(); + delete m_results.front(); + m_results.erase(m_results.begin()); + } - // Close connection. - close(m_sockfd); - return true; + // Close connection. + close(m_sockfd); + return true; + } } } // end-of-namespace: fail diff --git a/src/core/efw/JobClient.hpp b/src/core/efw/JobClient.hpp index 0a9e274e..5a4806a8 100644 --- a/src/core/efw/JobClient.hpp +++ b/src/core/efw/JobClient.hpp @@ -11,6 +11,7 @@ #include "comm/ExperimentData.hpp" #include "comm/FailControlMessage.pb.h" #include "config/FailConfig.hpp" +#include "util/WallclockTimer.hpp" namespace fail { @@ -27,7 +28,12 @@ private: struct hostent* m_server_ent; int m_sockfd; uint64_t m_server_runid; - + + WallclockTimer m_job_runtime; + int m_job_throughput; + std::vector m_parameters; + std::vector m_results; + bool connectToServer(); FailControlMessage_Command tryToGetExperimentData(ExperimentData& exp);