diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 107f30a6..204c3f3b 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,5 +1,5 @@ ### Add Boost and Threads -find_package(Boost 1.42 COMPONENTS thread REQUIRED) +find_package(Boost 1.42 COMPONENTS thread coroutine context REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) link_directories(${Boost_LIBRARY_DIRS}) diff --git a/src/core/cpn/CMakeLists.txt b/src/core/cpn/CMakeLists.txt index 3d9c3147..3023dab1 100644 --- a/src/core/cpn/CMakeLists.txt +++ b/src/core/cpn/CMakeLists.txt @@ -4,6 +4,8 @@ set(SRCS DatabaseCampaign.cc ) +set_source_files_properties(JobServer.cc CampaignManager.cc PROPERTIES COMPILE_FLAGS -std=c++11) + find_package(MySQL REQUIRED) include_directories(${MYSQL_INCLUDE_DIR}) @@ -15,7 +17,7 @@ else(CONFIG_INJECTIONPOINT_HOPS) endif(CONFIG_INJECTIONPOINT_HOPS) add_library(fail-cpn ${SRCS}) -target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES}) +target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES} ${Boost_COROUTINE_LIBRARY} ${Boost_CONTEXT_LIBRARY}) # if hop-chains need to be calculated by the server, we # the smarthopping module diff --git a/src/core/cpn/CampaignManager.cc b/src/core/cpn/CampaignManager.cc index 2c54fc3e..5654115c 100644 --- a/src/core/cpn/CampaignManager.cc +++ b/src/core/cpn/CampaignManager.cc @@ -1,7 +1,46 @@ +#include + #include "CampaignManager.hpp" +#include "util/Logger.hpp" +#include "JobServer.hpp" namespace fail { +static Logger log_send("CampaignManager"); + CampaignManager campaignmanager; +CampaignManager::~CampaignManager() { delete m_jobserver; } + +bool CampaignManager::runCampaign(Campaign *c) { + fail::CommandLine &cmd = fail::CommandLine::Inst(); + if (!cmd.parse()) { + log_send << "Error parsing arguments." << std::endl; + exit(-1); + } + + if (!m_jobserver) { + m_jobserver = (cmd[port].count() > 0) + ? new JobServer(std::atoi(cmd[port].first()->arg)) + : new JobServer; + } + m_currentCampaign = c; + bool ret = c->run(); + m_jobserver->done(); + return ret; +} + +void CampaignManager::addParam(ExperimentData *exp) +{ + m_jobserver->addParam(exp); +} + +ExperimentData *CampaignManager::getDone() { return m_jobserver->getDone(); } + +void CampaignManager::noMoreParameters() +{ + m_jobserver->setNoMoreExperiments(); +} + +void CampaignManager::done() { m_jobserver->done(); } } // end-of-namespace: fail diff --git a/src/core/cpn/CampaignManager.hpp b/src/core/cpn/CampaignManager.hpp index c6dc7cca..3d4a1446 100644 --- a/src/core/cpn/CampaignManager.hpp +++ b/src/core/cpn/CampaignManager.hpp @@ -8,8 +8,8 @@ #include "sal/SALInst.hpp" #include "comm/ExperimentData.hpp" -#include "JobServer.hpp" #include "Campaign.hpp" +#include "util/CommandLine.hpp" namespace fail { @@ -19,26 +19,25 @@ namespace fail { * The CampaignManager allows a user-campaign access to all constant * simulator information and forwards single experiments to the JobServer. */ +class JobServer; class CampaignManager { private: JobServer *m_jobserver; Campaign* m_currentCampaign; + CommandLine::option_handle port; public: - CampaignManager() : m_jobserver(0), m_currentCampaign(0) { } - ~CampaignManager() { delete m_jobserver; } + CampaignManager() : m_jobserver(0), m_currentCampaign(0) + { + fail::CommandLine &cmd = fail::CommandLine::Inst(); + port = cmd.addOption("p", "port", Arg::Required, + "-p,--port \tListening port of server; no " + "value chooses port automatically"); + } + ~CampaignManager(); /** * Executes a user campaign */ - bool runCampaign(Campaign* c) - { - if (!m_jobserver) { - m_jobserver = new JobServer; - } - m_currentCampaign = c; - bool ret = c->run(); - m_jobserver->done(); - return ret; - } + bool runCampaign(Campaign* c); /** * Returns a const reference for acquiring constant simulator specific information. * e.g., Registernames, to ease experiment data construction. @@ -54,21 +53,21 @@ public: * A Parameter set includes space for results. * @param exp A pointer to a ExperimentData set. */ - void addParam(ExperimentData* exp) { m_jobserver->addParam(exp); } + void addParam(ExperimentData* exp); /** * A user campaign can request a single result (blocking) from the queue. * @return Pointer to a parameter object with filled result data * @see addParam() */ - ExperimentData* getDone() { return m_jobserver->getDone(); } + ExperimentData* getDone(); /** * Signal, that there will not come any further parameter sets. */ - void noMoreParameters() { m_jobserver->setNoMoreExperiments(); } + void noMoreParameters(); /** * User campaign has finished. */ - void done() { m_jobserver->done(); } + void done(); /** * Wait actively, until all experiments expired. */ diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index 024efdf3..bcf96417 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -1,19 +1,24 @@ // needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter +#include +#include +#include +#include #include #include #include #include -#include -#include #include #include -#include +#include +#include #include "comm/SocketComm.hpp" #include "JobServer.hpp" -#include "Minion.hpp" #ifndef __puma +#include +#include + #include #include #endif @@ -22,6 +27,156 @@ using namespace std; namespace fail { +using namespace boost::asio; +using namespace boost::asio::ip; +using namespace boost::system; + +/** + * @class CommThread + * Implementation of the communication threads. + * This class implements the actual communication + * with the minions. + */ +class CommThread { +private: + tcp::socket m_socket; + uint32_t m_job_size; + JobServer &m_js; //! Calling jobserver + + // FIXME: Concerns are not really separated yet ;) + /** + * Called after minion calls for work. + * Tries to deque a parameter set non blocking, and + * sends it back to the requesting minion. + * @param minion The minion asking for input + */ + void sendPendingExperimentData(yield_context yield); + /** + * Called after minion offers a result message. + * Evaluates the Workload ID and puts the corresponding + * job result into the result queue. + * @param minion The minion offering results + * @param workloadID The workload id of the result message + */ + void receiveExperimentResults(FailControlMessage &ctrlmsg, + yield_context yield); + + enum ProgressType { Send, Receive, Resend }; + void print_progress(const enum ProgressType, const uint32_t, + const uint32_t); + +public: + CommThread(tcp::socket socket, JobServer &p) + : m_socket(std::move(socket)), m_job_size(1), m_js(p) {} + + /** + * The thread's entry point. + */ + void operator()(yield_context yield); +}; + +namespace AsyncSocket { + +static bool rcvMsg(tcp::socket &socket, google::protobuf::Message &msg, + yield_context yield, bool drop = false) +{ + boost::system::error_code ec; + int size; + size_t len = async_read(socket, buffer(&size, sizeof(size)), yield[ec]); + if (len != sizeof(size)) { + std::cerr << "Read " << len << " instead of " << sizeof(size) + << " bytes from socket" << std::endl; + return false; + } + if (ec) { + std::cerr << ec.message() << std::endl; + return false; + } + const size_t msg_size = ntohl(size); + + std::vector buf(msg_size); + len = async_read(socket, buffer(buf), yield[ec]); + if (len != msg_size) { + std::cerr << "Read " << len << " instead of " << msg_size + << " bytes from socket" << std::endl; + return false; + } + if (ec) { + std::cerr << ec.message() << std::endl; + return false; + } + return drop ? true : msg.ParseFromArray(buf.data(), buf.size()); +} + +static bool dropMsg(tcp::socket &socket, yield_context yield) { + FailControlMessage msg; + return rcvMsg(socket, msg, yield, true); +} + +static bool sendMsg(tcp::socket &socket, google::protobuf::Message &msg, + yield_context yield) +{ + std::string buf; + if (!msg.SerializeToString(&buf)) { + return false; + } + + const int size = htonl(msg.ByteSize()); + boost::array bufs{buffer(&size, sizeof(size)), + buffer(buf)}; + boost::system::error_code ec; + async_write(socket, bufs, yield[ec]); + if (ec) { + std::cerr << ec.message() << std::endl; + return false; + } + + return true; +} +} + +struct JobServer::impl { + io_service accept_service; + io_service comm_service; + std::thread comm_thread; + std::atomic redundant_results{0}; + + //! Campaign signaled last experiment data set + std::atomic_bool noMoreExps{false}; + + impl() + : comm_thread([this] { + io_service::work work(comm_service); + comm_service.run(); + std::cout << "Received " << redundant_results + << " redundant results." << std::endl; + }) + { + } + + ~impl() + { + comm_service.stop(); + if (comm_thread.joinable()) { + comm_thread.join(); + } + } +}; + +JobServer::JobServer(const unsigned short port) + : m_d(std::make_shared()), m_port(port), m_finish(false), + m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE) +{ + SocketComm::init(); + m_runid = std::time(0); +#ifndef __puma + m_serverThread = new boost::thread(&JobServer::run, this); +#ifdef SERVER_PERFORMANCE_MEASURE + m_measureThread = new boost::thread(&JobServer::measure, this); +#endif +#endif +} + void JobServer::addParam(ExperimentData* exp) { #ifndef __puma @@ -45,17 +200,15 @@ ExperimentData *JobServer::getDone() #endif } +bool JobServer::noMoreExperiments() const { return m_d->noMoreExps; } + void JobServer::setNoMoreExperiments() { -#ifndef __puma boost::unique_lock lock(m_CommMutex); -#endif - // currently not really necessary, as we only non-blockingly dequeue: + m_d->noMoreExps = true; m_undoneJobs.setIsFinished(); - m_noMoreExps = true; if (m_undoneJobs.Size() == 0 && - noMoreExperiments() && m_runningJobs.Size() == 0) { m_doneJobs.setIsFinished(); } @@ -90,134 +243,77 @@ void JobServer::measure() } #endif // SERVER_PERFORMANCE_MEASURE -#ifndef __puma -/** - * This is a predicate class for the remove_if operator on the thread - * list. The operator waits for timeout milliseconds to join each - * thread in the list. If the join was successful, the exited thread - * is deallocated and removed from the list. - */ -struct timed_join_successful { - int timeout_ms; - timed_join_successful(int timeout) - : timeout_ms(timeout) { } - - bool operator()(boost::thread* threadelement) - { - boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms); - if (threadelement->timed_join(timeout)) { - delete threadelement; - return true; - } else { - return false; - } - } -}; -#endif +void JobServer::done() +{ + m_d->accept_service.stop(); + m_d->comm_service.stop(); +} void JobServer::run() { - struct sockaddr_in clientaddr; - socklen_t clen = sizeof(clientaddr); - - // implementation of server-client communication - int s; - if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - perror("socket"); - // TODO: Log-level? - return; + const auto ep = tcp::endpoint(tcp::v4(), m_port); + auto acceptor = tcp::acceptor(m_d->accept_service, ep, true); + { + const auto local_ep = acceptor.local_endpoint(); + std::cout << "Listening on " << local_ep.address() << ":" + << local_ep.port() << std::endl; } - /* Enable address reuse */ - int on = 1; - if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { - perror("setsockopt"); - // TODO: Log-level? - return; - } - - /* IPv4, bind to all interfaces */ - struct sockaddr_in saddr = {0}; - saddr.sin_family = AF_INET; - saddr.sin_port = htons(m_port); - saddr.sin_addr.s_addr = htons(INADDR_ANY); - - /* bind to port */ - if (::bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) { - perror("bind"); - // TODO: Log-level? - return; - } - - /* Listen with a backlog of maxThreads */ - if (listen(s, m_maxThreads) == -1) { - perror("listen"); - close(s); - // TODO: Log-level? - return; - } - cout << "JobServer listening ..." << endl; - // TODO: Log-level? -#ifndef __puma - boost::thread* th; - while (!m_finish) { - // Accept connection - int cs = SocketComm::timedAccept(s, (struct sockaddr*)&clientaddr, &clen, 100); - if (cs < 0) { - if (errno != EWOULDBLOCK) { - perror("poll/accept"); - close(s); - // TODO: Log-level? - return; - } else { + spawn(m_d->accept_service, [this, &acceptor](yield_context yield) { + for (;;) { + tcp::socket socket(m_d->comm_service); + boost::system::error_code ec; + acceptor.async_accept(socket, yield[ec]); + if (ec) { + std::cerr + << "Error accept()ing a new connection " + << ec.message() << std::endl; continue; } + + spawn(m_d->comm_service, + [ socket = std::move(socket), + this ](yield_context yield) mutable { + CommThread coro(std::move(socket), *this); + coro(yield); + }); } + }); - bool creation_failed = false; - do { - // Spawn a thread for further communication, - // and add this thread to a list threads - // We can limit the generation of threads here. - if (m_threadlist.size() >= m_maxThreads || creation_failed) { - // Run over list with a timed_join, - // removing finished threads. - do { - m_threadlist.remove_if(timed_join_successful(m_threadtimeout)); - } while (m_threadlist.size() >= m_maxThreads); - } - // Start new thread - try { - th = new boost::thread(CommThread(cs, *this)); - creation_failed = false; - } catch (boost::thread_resource_error e) { - cout << "failed to spawn thread, throttling ..." << endl; - creation_failed = true; - sleep(1); - } - } while (creation_failed); - - m_threadlist.push_back(th); - } - close(s); - // when all undone Jobs are distributed -> call a timed_join on all spawned - // TODO: interrupt threads that do not want to join.. - while (m_threadlist.size() > 0) - m_threadlist.remove_if( timed_join_successful(m_threadtimeout) ); -#endif + m_d->accept_service.run(); } -void CommThread::operator()() +void CommThread::print_progress(const enum ProgressType type, + const uint32_t w_id, const uint32_t count) +{ + using namespace std::chrono; + static system_clock::time_point last; + const auto now = system_clock::now(); + if (last + seconds{1} > now) { + return; + } + last = now; + std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/" + << std::setw(6) << m_js.m_runningJobs.Size() << "/" + << std::setw(6) << m_js.m_doneJobs.Size() << " - "; + const char *sep; + if (type == ProgressType::Send) { + sep = " >"; + } else if (type == ProgressType::Resend) { + sep = ">>"; + } else { + sep = " <"; + } + std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush; +} + +void CommThread::operator()(yield_context yield) { // The communication thread implementation: - - Minion minion; FailControlMessage ctrlmsg; - minion.setSocketDescriptor(m_sock); - if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), ctrlmsg)) { + if (!AsyncSocket::rcvMsg(m_socket, ctrlmsg, yield)) { cout << "!![Server] failed to read complete message from client" << endl; - close(m_sock); return; } @@ -229,12 +325,12 @@ void CommThread::operator()() ctrlmsg.Clear(); ctrlmsg.set_command(FailControlMessage::DIE); ctrlmsg.set_build_id(42); - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); + AsyncSocket::sendMsg(m_socket, ctrlmsg, yield); break; } // give minion something to do.. m_job_size = ctrlmsg.job_size(); - sendPendingExperimentData(minion); + sendPendingExperimentData(yield); break; case FailControlMessage::RESULT_FOLLOWS: // ignore old client's results @@ -243,7 +339,7 @@ void CommThread::operator()() break; } // get results and put to done queue. - receiveExperimentResults(minion, ctrlmsg); + receiveExperimentResults(ctrlmsg, yield); break; default: // hm.. don't know what to do. please die. @@ -252,13 +348,11 @@ void CommThread::operator()() ctrlmsg.Clear(); ctrlmsg.set_command(FailControlMessage::DIE); ctrlmsg.set_build_id(42); - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); + AsyncSocket::sendMsg(m_socket, ctrlmsg, yield); } - - close(m_sock); } -void CommThread::sendPendingExperimentData(Minion& minion) +void CommThread::sendPendingExperimentData(yield_context yield) { uint32_t i; uint32_t workloadID; @@ -284,12 +378,12 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (exp.size() != 0) { ctrlmsg.set_job_size(exp.size()); - cout << " >>[" << ctrlmsg.workloadid(0) << "+" - << exp.size() << "] \r" << flush; + print_progress(ProgressType::Send, ctrlmsg.workloadid(0), + exp.size()); - if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { + if (AsyncSocket::sendMsg(m_socket, ctrlmsg, yield)) { for (i = 0; i < ctrlmsg.job_size(); i++) { - if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + if (AsyncSocket::sendMsg(m_socket, exp.front()->getMessage(), yield)) { // delay insertion into m_runningJobs until here, as // getMessage() won't work anymore if this job is re-sent, @@ -314,6 +408,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) return; } + { // Don't indent properly to reduce patch-noise. #ifndef __puma // Prevent receiveExperimentResults from modifying (or indirectly, via // getDone and the campaign, deleting) jobs in the m_runningJobs queue. @@ -337,33 +432,39 @@ void CommThread::sendPendingExperimentData(Minion& minion) ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); ctrlmsg.add_workloadid(workloadID); // set workload id ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job - //cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; - cout << ">>R[" << workloadID << "] \r" << flush; - if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { - SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage()); - } + print_progress(ProgressType::Resend, workloadID, 1); } else if (m_js.noMoreExperiments() == false) { // Currently we have no workload (even the running-job-queue is empty!), but // the campaign is not over yet. Minion can try again later. ctrlmsg.set_command(FailControlMessage::COME_AGAIN); - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); cout << "--[Server] No workload, come again..." << endl; } else { - // No more elements, and campaign is over. Minion can die. + // No work & Camapaign is done. Go away. ctrlmsg.set_command(FailControlMessage::DIE); - cout << "--[Server] No workload, and no campaign, please die." << endl; - SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); + } + } + + const bool success = AsyncSocket::sendMsg(m_socket, ctrlmsg, yield); + if (temp_exp != nullptr && success) { + AsyncSocket::sendMsg(m_socket, temp_exp->getMessage(), yield); } } -void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg) +void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg, + yield_context yield) { int i; ExperimentData* exp = NULL; // Get exp* from running jobs if (ctrlmsg.workloadid_size() > 0) { - cout << " <<[" << ctrlmsg.workloadid(0) << "+" - << ctrlmsg.workloadid_size() << "] \r" << flush; + print_progress(ProgressType::Receive, ctrlmsg.workloadid(0), + ctrlmsg.workloadid_size()); } + + // No yielding under locks ;) + std::vector> msgs; + msgs.reserve(ctrlmsg.workloadid_size()); + + { // Don't indent properly to reduce patch-noise. #ifndef __puma // Prevent re-sending jobs in sendPendingExperimentData: // a) sendPendingExperimentData needs an intact job to serialize and send it. @@ -374,23 +475,44 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct boost::unique_lock lock(m_js.m_CommMutex); #endif for (i = 0; i < ctrlmsg.workloadid_size(); i++) { - if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found - // deserialize results, expect failures - if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) { - m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp); - } else { - m_js.m_doneJobs.Enqueue(exp); // Put results in done queue - } + const uint32_t id = ctrlmsg.workloadid(i); + const bool success = m_js.m_runningJobs.remove(id, exp); + msgs.emplace_back(success ? exp : nullptr, id); + } + } + + /* Do I/O */ + std::vector> recvd; + msgs.reserve(ctrlmsg.workloadid_size()); + for (auto &&msg : msgs) { + auto &&exp = std::get<0>(msg); + if (exp != nullptr) { + const auto w_id = std::get<1>(msg); + const bool success = AsyncSocket::rcvMsg( + m_socket, exp->getMessage(), yield); + recvd.emplace_back(exp, w_id, success); #ifdef SERVER_PERFORMANCE_MEASURE ++JobServer::m_DoneCount; #endif } else { // We can receive several results for the same workload id because // we (may) distribute the (running) jobs to a *few* experiment-clients. - cout << "[Server] Received another result for workload id [" - << ctrlmsg.workloadid(i) << "] -- ignored." << endl; + ++m_js.m_d->redundant_results; + AsyncSocket::dropMsg(m_socket, yield); + } + } - SocketComm::dropMsg(minion.getSocketDescriptor()); +#ifndef __puma + boost::unique_lock lock(m_js.m_CommMutex); +#endif + for (auto &&rcv : recvd) { + auto &&exp = std::get<0>(rcv); + const auto received = std::get<2>(rcv); + if (received) { + m_js.m_doneJobs.Enqueue(exp); + } else { + const auto w_id = std::get<1>(rcv); + m_js.m_runningJobs.insert(w_id, exp); } } diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 6cbe56ac..18efc7cb 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -1,16 +1,18 @@ #ifndef __JOB_SERVER_H__ #define __JOB_SERVER_H__ -#include "Minion.hpp" #include "util/SynchronizedQueue.hpp" #include "util/SynchronizedCounter.hpp" #include "util/SynchronizedMap.hpp" #include "config/FailConfig.hpp" +#include "comm/ExperimentData.hpp" #include "comm/FailControlMessage.pb.h" #include "comm/SocketComm.hpp" #include #include +#include +#include #ifndef __puma #include @@ -30,14 +32,13 @@ class CommThread; */ class JobServer { private: + struct impl; + std::shared_ptr m_d; + //! The TCP Port number - int m_port; + const unsigned short m_port; //! TODO nice termination concept bool m_finish; - //! Campaign signaled last expirement data set - bool m_noMoreExps; - //! the maximal number of threads spawned for TCP communication - unsigned m_maxThreads; //! the maximal timeout per communication thread int m_threadtimeout; //! list of spawned threads @@ -82,18 +83,7 @@ private: void sendWork(int sockfd); public: - JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false), - m_maxThreads(128), m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE) - { - SocketComm::init(); - m_runid = std::time(0); -#ifndef __puma - m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread. -#ifdef SERVER_PERFORMANCE_MEASURE - m_measureThread = new boost::thread(&JobServer::measure, this); -#endif -#endif - } + JobServer(const unsigned short port = SERVER_COMM_TCP_PORT); ~JobServer() { done(); @@ -130,50 +120,13 @@ public: * @return \c true if no more parameter sets available, \c false otherwise * @see setNoMoreExperiments */ - bool noMoreExperiments() const { return m_noMoreExps; } + bool noMoreExperiments() const; /** * The Campaign Controller may signal that the jobserver can stop listening * for client connections. */ - void done() { m_finish = true; } -}; - -/** - * @class CommThread - * Implementation of the communication threads. - * This class implements the actual communication - * with the minions. - */ -class CommThread { -private: - int m_sock; //! Socket descriptor of the connection - uint32_t m_job_size; - JobServer& m_js; //! Calling jobserver - - // FIXME: Concerns are not really separated yet ;) - /** - * Called after minion calls for work. - * Tries to deque a parameter set non blocking, and - * sends it back to the requesting minion. - * @param minion The minion asking for input - */ - void sendPendingExperimentData(Minion& minion); - /** - * Called after minion offers a result message. - * Evaluates the Workload ID and puts the corresponding - * job result into the result queue. - * @param minion The minion offering results - * @param workloadID The workload id of the result message - */ - void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg); -public: - CommThread(int sockfd, JobServer& p) - : m_sock(sockfd), m_job_size(1), m_js(p) { } - /** - * The thread's entry point. - */ - void operator()(); + void done(); }; } // end-of-namespace: fail diff --git a/src/core/cpn/Minion.hpp b/src/core/cpn/Minion.hpp deleted file mode 100644 index 8f461e0e..00000000 --- a/src/core/cpn/Minion.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/** - * \file Minion.hpp - * \brief The representation of a minion. - */ - -#ifndef __MINION_HPP__ -#define __MINION_HPP__ - -#include - -#include "comm/ExperimentData.hpp" - -namespace fail { - -/** - * \class Minion - * - * Contains all informations about a minion. - */ -class Minion { -private: - std::string hostname; - bool isWorking; - ExperimentData* currentExperimentData; - int sockfd; -public: - Minion() : isWorking(false), currentExperimentData(0), sockfd(-1) { } - /** - * Sets the socket descriptor. - * @param sock the new socket descriptor (used internal) - */ - void setSocketDescriptor(int sock) { sockfd = sock; } - /** - * Retrives the socket descriptor. - * @return the socket descriptor - */ - int getSocketDescriptor() const { return (sockfd); } - /** - * Returns the hostname of the minion. - * @return the hostname - */ - const std::string& getHostname() { return (hostname); } - /** - * Sets the hostname of the minion. - * @param host the hostname - */ - void setHostname(const std::string& host) { hostname = host; } - /** - * Returns the current ExperimentData which the minion is working with. - * @return a pointer of the current ExperimentData - */ - ExperimentData* getCurrentExperimentData() { return currentExperimentData; } - /** - * Sets the current ExperimentData which the minion is working with. - * @param exp the current ExperimentData - */ - void setCurrentExperimentData(ExperimentData* exp) { currentExperimentData = exp; } - /** - * Returns the current state of the minion. - * @return the current state - */ - bool isBusy() { return (isWorking); } - /** - * Sets the current state of the minion - * @param state the current state - */ - void setBusy(bool state) { isWorking = state; } -}; - -} // end-of-namespace: fail - -#endif // __MINION_HPP__