// needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter #include #include "JobServer.hpp" #include #include #include #include #include #include #include #include #include "jobserver/messagedefs/FailControlMessage.pb.h" #include "SocketComm.hpp" #include "controller/Minion.hpp" #ifndef __puma #include #include #endif using namespace std; namespace fi { void JobServer::addParam(ExperimentData* exp){ #ifndef __puma m_undoneJobs.Enqueue(exp); #endif } #ifdef SERVER_PERFORMANCE_MEASURE volatile unsigned JobServer::m_DoneCount = 0; #endif ExperimentData *JobServer::getDone() { // FIXME race condition, need to synchronize with // sendPendingExperimentData() and receiveExperimentResults() #ifndef __puma if (m_undoneJobs.Size() == 0 && noMoreExperiments() && m_runningJobs.Size() == 0 && m_doneJobs.Size() == 0) { // FRICKEL workaround sleep(1); ExperimentData *exp; if (m_doneJobs.Dequeue_nb(exp)) { return exp; } return 0; } return m_doneJobs.Dequeue(); #endif } #ifdef SERVER_PERFORMANCE_MEASURE void JobServer::measure() { cout << "\n[Server] Logging throughput in \"" << SERVER_PERF_LOG_PATH << "\"..." << endl; ofstream m_file(SERVER_PERF_LOG_PATH, std::ios::trunc); // overwrite existing perf-logs if(!m_file.is_open()) { cerr << "[Server] Perf-logging has been enabled" << "but I was not able to write the log-file \"" << SERVER_PERF_LOG_PATH << "\"." << endl; exit(1); } unsigned counter = 0; m_file << "time\tthroughput" << endl; unsigned diff = 0; while(!m_finish) { // Format: 1st column (seconds)[TAB]2nd column (throughput) m_file << counter << "\t" << (m_DoneCount - diff) << endl; counter += SERVER_PERF_STEPPING_SEC; diff = m_DoneCount; sleep(SERVER_PERF_STEPPING_SEC); } // NOTE: Summing up the values written in the 2nd column does not // necessarily yield the number of completed experiments/jobs // (due to thread-scheduling behaviour -> not sync'd!) } #endif // SERVER_PERFORMANCE_MEASURE #ifndef __puma /** * This is a predicate class for the remove_if operator * on the thread list. The operator waits for * timeout milliseconds to join each thread in the list. * If the join was successful, the exited thread is deallocated * and removed from the list. */ struct timed_join_successful { int timeout_ms; timed_join_successful(int timeout) : timeout_ms(timeout){}; bool operator()( boost::thread * threadelement ){ boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms); if(threadelement->timed_join(timeout)){ delete threadelement; return true; }else{ return false; } } }; #endif void JobServer::run(){ struct sockaddr_in clientaddr; socklen_t clen = sizeof(clientaddr); // implementation of server-client communication int s; if((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { perror("socket"); return; } /* Enable address reuse */ int on = 1; if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { perror("setsockopt"); return; } /* IPv4, Port: 1111, accept all IP adresses */ struct sockaddr_in saddr; saddr.sin_family = AF_INET; saddr.sin_port = htons(m_port); saddr.sin_addr.s_addr = htons(INADDR_ANY); /* bind to port */ if(bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) { perror("bind"); return; } /* Listen with a backlog of maxThreads */ if(listen(s, m_maxThreads) == -1) { perror("listen"); return; } cout << "JobServer listening...." << endl; #ifndef __puma boost::thread* th; while(!m_finish){ // Accept connection int cs = accept(s, (struct sockaddr*)&clientaddr, &clen); if(cs == -1) { perror("accept"); return; } // Spawn a thread for further communication, // and add this thread to a list threads // We can limit the generation of threads here. if(m_threadlist.size() < m_maxThreads){ th = new boost::thread(CommThread(cs, *this)); m_threadlist.push_back(th); }else{ // Run over list with a timed_join, // removing finished threads. do { m_threadlist.remove_if( timed_join_successful(m_threadtimeout) ); } while(m_threadlist.size() == m_maxThreads); // Start new thread th = new boost::thread(CommThread(cs, *this)); m_threadlist.push_back(th); } } close(s); // when all undone Jobs are distributed -> call a timed_join on all spawned // TODO: interrupt threads that do not want to join.. while(m_threadlist.size() > 0){ m_threadlist.remove_if( timed_join_successful(m_threadtimeout) ); } #endif } void CommThread::operator()() { // The communication thread implementation: Minion minion; FailControlMessage ctrlmsg; minion.setSocketDescriptor(m_sock); if (!SocketComm::rcv_msg(minion.getSocketDescriptor(), ctrlmsg)) { cout << "!![Server] failed to read complete message from client" << endl; close(m_sock); return; } switch (ctrlmsg.command()) { case FailControlMessage_Command_NEED_WORK: // give minion something to do.. sendPendingExperimentData(minion); break; case FailControlMessage_Command_RESULT_FOLLOWS: // get results and put to done queue. receiveExperimentResults(minion, ctrlmsg.workloadid()); break; default: // hm.. don't know what to do. please die. cout << "!![Server] no idea what to do with command #" << ctrlmsg.command() << ", telling minion to die." << endl; ctrlmsg.Clear(); ctrlmsg.set_command(FailControlMessage_Command_DIE); ctrlmsg.set_build_id(42); SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); } close(m_sock); } #ifndef __puma boost::mutex CommThread::m_CommMutex; #endif // __puma void CommThread::sendPendingExperimentData(Minion& minion) { FailControlMessage ctrlmsg; ctrlmsg.set_build_id(42); ExperimentData * exp = 0; if(m_js.m_undoneJobs.Dequeue_nb(exp) == true) { // Got an element from queue, assign ID to workload and send to minion uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter exp->setWorkloadID(workloadID); // store ID for identification when receiving result if(!m_js.m_runningJobs.insert(workloadID, exp)) { cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; } ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS); ctrlmsg.set_workloadid(workloadID); // set workload id //cout << ">>[Server] Sending workload [" << workloadID << "]" << endl; cout << ">>[" << workloadID << "] " << flush; SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); return; } #ifndef __puma boost::unique_lock lock(m_CommMutex); #endif if((exp = m_js.m_runningJobs.first()) != NULL) { // 2nd priority // (This simply gets the first running-job.) // TODO: Improve selection of parameter-set to be resend (the first is not // necessarily the best...especially when the specific parameter-set // causes the experiment-client to terminate abnormally -> endless loop!) // Further ideas: sequential, random, ...? (+ "retry-counter" for each job) // Implement resend of running-parameter sets to improve campaign speed // and to prevent result loss due to (unexpected) termination of experiment // clients. // (Note: Therefore we need to be aware of receiving multiple results for a // single parameter-set, @see receiveExperimentResults.) uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously) // Resend the parameter-set. ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS); ctrlmsg.set_workloadid(workloadID); // set workload id //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; cout << ">>R[" << workloadID << "] " << flush; SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); } else if(m_js.noMoreExperiments() == false) { // Currently we have no workload (even the running-job-queue is empty!), but // the campaign is not over yet. Minion can try again later. ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN); SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); cout << "--[Server] No workload, come again..." << endl; } else { // No more elements, and campaign is over. Minion can die. ctrlmsg.set_command(FailControlMessage_Command_DIE); cout << "--[Server] No workload, and no campaign, please die." << endl; SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); } } void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID) { #ifndef __puma boost::unique_lock lock(m_CommMutex); #endif ExperimentData * exp; // Get exp* from running jobs //cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl; cout << "<<[" << workloadID << "] " << flush; if(m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results. m_js.m_doneJobs.Enqueue(exp); // Put results in done queue.. #ifdef SERVER_PERFORMANCE_MEASURE ++JobServer::m_DoneCount; #endif } else { // We can receive several results for the same workload id because // we (may) distribute the (running) jobs to a *few* experiment-clients. cout << "[Server] Received another result for workload id [" << workloadID << "] -- ignored." << endl; // TODO: Any need for error-handling here? } } };