diff --git a/core/jobserver/JobServer.cc b/core/jobserver/JobServer.cc index 8fd24588..bbd33f41 100644 --- a/core/jobserver/JobServer.cc +++ b/core/jobserver/JobServer.cc @@ -1,4 +1,4 @@ -// Author: Martin Hoffmann, Richard Hellwig +// Author: Martin Hoffmann, Richard Hellwig, Adrian Böckenkamp // Date: 07.10.11 // needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter @@ -26,7 +26,6 @@ using namespace std; namespace fi { - void JobServer::addParam(ExperimentData* exp){ #ifndef __puma m_undoneJobs.Enqueue(exp); @@ -151,9 +150,10 @@ void JobServer::run(){ #endif } -/// Communication thread implementation void CommThread::operator()() { + // The communication thread implementation: + Minion minion; FailControlMessage ctrlmsg; minion.setSocketDescriptor(m_sock); @@ -186,52 +186,88 @@ void CommThread::operator()() close(m_sock); } -bool CommThread::sendPendingExperimentData(Minion& minion) +#ifndef __puma +boost::mutex CommThread::m_CommMutex; +#endif // __puma + +void CommThread::sendPendingExperimentData(Minion& minion) { - FailControlMessage ctrlmsg; - ctrlmsg.set_build_id(42); - ExperimentData * exp = 0; - if(m_js.m_undoneJobs.Dequeue_nb(exp) == true){ - // Got an element from queue, assign ID to workload and send to minion - uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter - exp->setWorkloadID(workloadID); // store ID for identification when receiving result - if(!m_js.m_runningJobs.insert(workloadID, exp)){ - cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - } - ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS); - ctrlmsg.set_workloadid(workloadID); // set workload id - //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; - cout << ">>[" << workloadID << "] " << flush; - SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); - SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); - }else if( m_js.noMoreExperiments() == false ){ - // Currently we have no workload, but the campaign is not over yet. Minion can try again later - ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN); - SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); - cout << "--[Server] No workload, come again..." << endl; - }else{ - // No more elements, and campaign is over. Minion can die. - ctrlmsg.set_command(FailControlMessage_Command_DIE); - cout << "--[Server] No workload, and no campaign, please die." << endl; - SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); - } - return true; + FailControlMessage ctrlmsg; + ctrlmsg.set_build_id(42); + ExperimentData * exp = 0; + if(m_js.m_undoneJobs.Dequeue_nb(exp) == true) { + // Got an element from queue, assign ID to workload and send to minion + uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter + exp->setWorkloadID(workloadID); // store ID for identification when receiving result + if(!m_js.m_runningJobs.insert(workloadID, exp)) { + cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; + } + ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS); + ctrlmsg.set_workloadid(workloadID); // set workload id + //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; + cout << ">>[" << workloadID << "] " << flush; + SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); + SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); + return; + } + + #ifndef __puma + boost::unique_lock lock(m_CommMutex); + #endif + if((exp = m_js.m_runningJobs.first()) != NULL) { // 2nd priority + // (This simply gets the first running-job.) + // TODO: Improve selection of parameter-set to be resend (the first is not + // necessarily the best...especially when the specific parameter-set + // causes the experiment-client to terminate abnormally -> endless loop!) + // Further ideas: sequential, random, ...? (+ "retry-counter" for each job) + + // Implement resend of running-parameter sets to improve campaign speed + // and to prevent result loss due to (unexpected) termination of experiment + // clients. + // (Note: Therefore we need to be aware of receiving multiple results for a + // single parameter-set, @see receiveExperimentResults.) + uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously) + // Resend the parameter-set. + ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS); + ctrlmsg.set_workloadid(workloadID); // set workload id + //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; + cout << ">>R[" << workloadID << "] " << flush; + SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); + SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); + } else if(m_js.noMoreExperiments() == false) { + // Currently we have no workload (even the running-job-queue is empty!), but + // the campaign is not over yet. Minion can try again later. + ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN); + SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); + cout << "--[Server] No workload, come again..." << endl; + } else { + // No more elements, and campaign is over. Minion can die. + ctrlmsg.set_command(FailControlMessage_Command_DIE); + cout << "--[Server] No workload, and no campaign, please die." << endl; + SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); + } } - -bool CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) +void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) { - ExperimentData * exp; // Get exp* from running jobs - //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; - cout << "<<[" << workloadID << "] " << flush; - if( m_js.m_runningJobs.remove(workloadID, exp) ){ /// ExperimentData* found - SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); /// deserialize results. - m_js.m_doneJobs.Enqueue(exp); /// Put results in done queue.. - return true; - }else{ - cout << "!![Server] workload id not found in running jobs map :( [" << workloadID << "]" << endl; - return false; - } +#ifndef __puma + boost::unique_lock lock(m_CommMutex); +#endif + + ExperimentData * exp; // Get exp* from running jobs + //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; + cout << "<<[" << workloadID << "] " << flush; + if(m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found + SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results. + m_js.m_doneJobs.Enqueue(exp); // Put results in done queue.. + } else { + // We can receive several results for the same workload id because + // we (may) distribute the (running) jobs to a *few* experiment-clients. + cout << "[Server] Received another result for workload id [" + << workloadID << "] -- ignored." << endl; + + // TODO: Any need for error-handling here? + } } }; diff --git a/core/jobserver/JobServer.hpp b/core/jobserver/JobServer.hpp index 835ebc04..61a48c6c 100644 --- a/core/jobserver/JobServer.hpp +++ b/core/jobserver/JobServer.hpp @@ -1,7 +1,8 @@ /** - * \brief The JobServer supplies the Minions with ExperimentData's and receives the result data. + * \brief The JobServer supplies the Minions with ExperimentData's + * and receives the result data. * - * \author Martin Hoffmann, Richard Hellwig + * \author Martin Hoffmann, Richard Hellwig, Adrian Böckenkamp */ @@ -65,7 +66,13 @@ public: m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread. #endif }; - ~JobServer() {} + ~JobServer() + { +#ifndef __puma + // Cleanup of m_serverThread, etc. + delete m_serverThread; +#endif // __puma + }; private: @@ -75,7 +82,7 @@ public: * and listen for connections. */ void run(); - + void sendWork(int sockfd); public: @@ -126,32 +133,32 @@ public: class CommThread { int m_sock; //! Socket descriptor of the connection JobServer& m_js; //! Calling jobserver +#ifndef __puma + static boost::mutex m_CommMutex; //! to synchronise the communication +#endif // __puma public: CommThread(int sockfd, JobServer& p) : m_sock(sockfd), m_js(p) {}; /** - * The thread's entry point + * The thread's entry point. */ void operator() (); private: - /// FIXME concerns are not really separated yet ;) + /// FIXME concerns are not really separated yet ;) /** * Called after minion calls for work. * Tries to deque a parameter set non blocking, and * sends it back to the requesting minion. * @param minion The minion asking for input - * @return FIXME return value not evaluated yet. */ - bool sendPendingExperimentData(Minion& minion); - + void sendPendingExperimentData(Minion& minion); /** * Called after minion offers a result message. * Evaluates the Workload ID and puts the corresponding * job result into the result queue. * @param minion The minion offering results * @param workloadID The workload id of the result message - * @return \c true if Worload ID could be mapped, \c false if not */ - bool receiveExperimentResults(Minion& minion, uint32_t workloadID); + void receiveExperimentResults(Minion& minion, uint32_t workloadID); }; }; diff --git a/core/util/SynchronizedMap.hpp b/core/util/SynchronizedMap.hpp index 4761ddc6..b1d2e77f 100644 --- a/core/util/SynchronizedMap.hpp +++ b/core/util/SynchronizedMap.hpp @@ -27,6 +27,20 @@ private: #endif return m_map.size(); } + /** + * Retrieves the first element in the map. + * @return a pointer to the first element, or \c NULL if empty + */ + Tvalue first() + { + #ifndef __puma + boost::unique_lock lock(m_mutex); + #endif + if(m_map.size() > 0) + return m_map.begin()->second; + else + return NULL; + } // Lock is automatically released here /** * Add data to the map, return false if already present * @param key Map key