Use boost-asio to improve FAIL* server performance
This patch overhauls the FAIL* server code to leverage Boost asio to be able to handle a large number of clients (>4000). In this implementation the server is now single threaded. I've not encountered any problems with this for up to about 10k clients. Boost ASIO can also be used multithreaded, but I assume the FAIL* internal data structures (Synchronized*) will become a bottleneck first. The code now additionally depends on Boost Coro and Boost Context, as well as a C++ 14 compiler, although the only C++14 feature required is a lambda capture with initializer, such as [ x = std::move(x) ]. gcc-4.9.2 does this. The code could (and probably should) be cleaned up more. Comments are wordy, code is unnecessary now (multiple server threads), code is not self-contained (headers spread dependencies), many ifdef's (server performance measuring should be runtime rather than a compile time option), and much more. But for this patch I was going for a minimal changeset the get the functionality in, to have an easier review. Alas, FAIL* has no Unit-test suite to run the changes against. To handle such a large number of clients more changes were necessary, for example server status output is now performed every 1s, instead for every request. The class Minion was removed completely; the only thing it was doing was encapsulate an int. The server has now a runtime-configurable port, or it can select a free port on its own if none is specified. This requires the CampaignManager to add a port argument and instantiate the JobServer dynamically. Change-Id: Iad9238972161f95f5802bd2251116f8aeee14884
This commit is contained in:
@ -1,5 +1,5 @@
|
|||||||
### Add Boost and Threads
|
### Add Boost and Threads
|
||||||
find_package(Boost 1.42 COMPONENTS thread REQUIRED)
|
find_package(Boost 1.42 COMPONENTS thread coroutine context REQUIRED)
|
||||||
include_directories(${Boost_INCLUDE_DIRS})
|
include_directories(${Boost_INCLUDE_DIRS})
|
||||||
link_directories(${Boost_LIBRARY_DIRS})
|
link_directories(${Boost_LIBRARY_DIRS})
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,8 @@ set(SRCS
|
|||||||
DatabaseCampaign.cc
|
DatabaseCampaign.cc
|
||||||
)
|
)
|
||||||
|
|
||||||
|
set_source_files_properties(JobServer.cc CampaignManager.cc PROPERTIES COMPILE_FLAGS -std=c++11)
|
||||||
|
|
||||||
find_package(MySQL REQUIRED)
|
find_package(MySQL REQUIRED)
|
||||||
include_directories(${MYSQL_INCLUDE_DIR})
|
include_directories(${MYSQL_INCLUDE_DIR})
|
||||||
|
|
||||||
@ -15,7 +17,7 @@ else(CONFIG_INJECTIONPOINT_HOPS)
|
|||||||
endif(CONFIG_INJECTIONPOINT_HOPS)
|
endif(CONFIG_INJECTIONPOINT_HOPS)
|
||||||
|
|
||||||
add_library(fail-cpn ${SRCS})
|
add_library(fail-cpn ${SRCS})
|
||||||
target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES})
|
target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES} ${Boost_COROUTINE_LIBRARY} ${Boost_CONTEXT_LIBRARY})
|
||||||
|
|
||||||
# if hop-chains need to be calculated by the server, we
|
# if hop-chains need to be calculated by the server, we
|
||||||
# the smarthopping module
|
# the smarthopping module
|
||||||
|
|||||||
@ -1,7 +1,46 @@
|
|||||||
|
#include <cstdlib>
|
||||||
|
|
||||||
#include "CampaignManager.hpp"
|
#include "CampaignManager.hpp"
|
||||||
|
#include "util/Logger.hpp"
|
||||||
|
#include "JobServer.hpp"
|
||||||
|
|
||||||
namespace fail {
|
namespace fail {
|
||||||
|
|
||||||
|
static Logger log_send("CampaignManager");
|
||||||
|
|
||||||
CampaignManager campaignmanager;
|
CampaignManager campaignmanager;
|
||||||
|
|
||||||
|
CampaignManager::~CampaignManager() { delete m_jobserver; }
|
||||||
|
|
||||||
|
bool CampaignManager::runCampaign(Campaign *c) {
|
||||||
|
fail::CommandLine &cmd = fail::CommandLine::Inst();
|
||||||
|
if (!cmd.parse()) {
|
||||||
|
log_send << "Error parsing arguments." << std::endl;
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_jobserver) {
|
||||||
|
m_jobserver = (cmd[port].count() > 0)
|
||||||
|
? new JobServer(std::atoi(cmd[port].first()->arg))
|
||||||
|
: new JobServer;
|
||||||
|
}
|
||||||
|
m_currentCampaign = c;
|
||||||
|
bool ret = c->run();
|
||||||
|
m_jobserver->done();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void CampaignManager::addParam(ExperimentData *exp)
|
||||||
|
{
|
||||||
|
m_jobserver->addParam(exp);
|
||||||
|
}
|
||||||
|
|
||||||
|
ExperimentData *CampaignManager::getDone() { return m_jobserver->getDone(); }
|
||||||
|
|
||||||
|
void CampaignManager::noMoreParameters()
|
||||||
|
{
|
||||||
|
m_jobserver->setNoMoreExperiments();
|
||||||
|
}
|
||||||
|
|
||||||
|
void CampaignManager::done() { m_jobserver->done(); }
|
||||||
} // end-of-namespace: fail
|
} // end-of-namespace: fail
|
||||||
|
|||||||
@ -8,8 +8,8 @@
|
|||||||
|
|
||||||
#include "sal/SALInst.hpp"
|
#include "sal/SALInst.hpp"
|
||||||
#include "comm/ExperimentData.hpp"
|
#include "comm/ExperimentData.hpp"
|
||||||
#include "JobServer.hpp"
|
|
||||||
#include "Campaign.hpp"
|
#include "Campaign.hpp"
|
||||||
|
#include "util/CommandLine.hpp"
|
||||||
|
|
||||||
namespace fail {
|
namespace fail {
|
||||||
|
|
||||||
@ -19,26 +19,25 @@ namespace fail {
|
|||||||
* The CampaignManager allows a user-campaign access to all constant
|
* The CampaignManager allows a user-campaign access to all constant
|
||||||
* simulator information and forwards single experiments to the JobServer.
|
* simulator information and forwards single experiments to the JobServer.
|
||||||
*/
|
*/
|
||||||
|
class JobServer;
|
||||||
class CampaignManager {
|
class CampaignManager {
|
||||||
private:
|
private:
|
||||||
JobServer *m_jobserver;
|
JobServer *m_jobserver;
|
||||||
Campaign* m_currentCampaign;
|
Campaign* m_currentCampaign;
|
||||||
|
CommandLine::option_handle port;
|
||||||
public:
|
public:
|
||||||
CampaignManager() : m_jobserver(0), m_currentCampaign(0) { }
|
CampaignManager() : m_jobserver(0), m_currentCampaign(0)
|
||||||
~CampaignManager() { delete m_jobserver; }
|
{
|
||||||
|
fail::CommandLine &cmd = fail::CommandLine::Inst();
|
||||||
|
port = cmd.addOption("p", "port", Arg::Required,
|
||||||
|
"-p,--port \tListening port of server; no "
|
||||||
|
"value chooses port automatically");
|
||||||
|
}
|
||||||
|
~CampaignManager();
|
||||||
/**
|
/**
|
||||||
* Executes a user campaign
|
* Executes a user campaign
|
||||||
*/
|
*/
|
||||||
bool runCampaign(Campaign* c)
|
bool runCampaign(Campaign* c);
|
||||||
{
|
|
||||||
if (!m_jobserver) {
|
|
||||||
m_jobserver = new JobServer;
|
|
||||||
}
|
|
||||||
m_currentCampaign = c;
|
|
||||||
bool ret = c->run();
|
|
||||||
m_jobserver->done();
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Returns a const reference for acquiring constant simulator specific information.
|
* Returns a const reference for acquiring constant simulator specific information.
|
||||||
* e.g., Registernames, to ease experiment data construction.
|
* e.g., Registernames, to ease experiment data construction.
|
||||||
@ -54,21 +53,21 @@ public:
|
|||||||
* A Parameter set includes space for results.
|
* A Parameter set includes space for results.
|
||||||
* @param exp A pointer to a ExperimentData set.
|
* @param exp A pointer to a ExperimentData set.
|
||||||
*/
|
*/
|
||||||
void addParam(ExperimentData* exp) { m_jobserver->addParam(exp); }
|
void addParam(ExperimentData* exp);
|
||||||
/**
|
/**
|
||||||
* A user campaign can request a single result (blocking) from the queue.
|
* A user campaign can request a single result (blocking) from the queue.
|
||||||
* @return Pointer to a parameter object with filled result data
|
* @return Pointer to a parameter object with filled result data
|
||||||
* @see addParam()
|
* @see addParam()
|
||||||
*/
|
*/
|
||||||
ExperimentData* getDone() { return m_jobserver->getDone(); }
|
ExperimentData* getDone();
|
||||||
/**
|
/**
|
||||||
* Signal, that there will not come any further parameter sets.
|
* Signal, that there will not come any further parameter sets.
|
||||||
*/
|
*/
|
||||||
void noMoreParameters() { m_jobserver->setNoMoreExperiments(); }
|
void noMoreParameters();
|
||||||
/**
|
/**
|
||||||
* User campaign has finished.
|
* User campaign has finished.
|
||||||
*/
|
*/
|
||||||
void done() { m_jobserver->done(); }
|
void done();
|
||||||
/**
|
/**
|
||||||
* Wait actively, until all experiments expired.
|
* Wait actively, until all experiments expired.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -1,19 +1,24 @@
|
|||||||
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
|
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
|
||||||
|
#include <algorithm>
|
||||||
|
#include <chrono>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <vector>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <sys/socket.h>
|
|
||||||
#include <netinet/in.h>
|
|
||||||
#include <strings.h>
|
#include <strings.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <arpa/inet.h>
|
#include <thread>
|
||||||
|
#include <tuple>
|
||||||
|
|
||||||
#include "comm/SocketComm.hpp"
|
#include "comm/SocketComm.hpp"
|
||||||
#include "JobServer.hpp"
|
#include "JobServer.hpp"
|
||||||
#include "Minion.hpp"
|
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
|
#include <boost/asio.hpp>
|
||||||
|
#include <boost/asio/spawn.hpp>
|
||||||
|
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
#include <boost/date_time.hpp>
|
#include <boost/date_time.hpp>
|
||||||
#endif
|
#endif
|
||||||
@ -22,6 +27,156 @@ using namespace std;
|
|||||||
|
|
||||||
namespace fail {
|
namespace fail {
|
||||||
|
|
||||||
|
using namespace boost::asio;
|
||||||
|
using namespace boost::asio::ip;
|
||||||
|
using namespace boost::system;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class CommThread
|
||||||
|
* Implementation of the communication threads.
|
||||||
|
* This class implements the actual communication
|
||||||
|
* with the minions.
|
||||||
|
*/
|
||||||
|
class CommThread {
|
||||||
|
private:
|
||||||
|
tcp::socket m_socket;
|
||||||
|
uint32_t m_job_size;
|
||||||
|
JobServer &m_js; //! Calling jobserver
|
||||||
|
|
||||||
|
// FIXME: Concerns are not really separated yet ;)
|
||||||
|
/**
|
||||||
|
* Called after minion calls for work.
|
||||||
|
* Tries to deque a parameter set non blocking, and
|
||||||
|
* sends it back to the requesting minion.
|
||||||
|
* @param minion The minion asking for input
|
||||||
|
*/
|
||||||
|
void sendPendingExperimentData(yield_context yield);
|
||||||
|
/**
|
||||||
|
* Called after minion offers a result message.
|
||||||
|
* Evaluates the Workload ID and puts the corresponding
|
||||||
|
* job result into the result queue.
|
||||||
|
* @param minion The minion offering results
|
||||||
|
* @param workloadID The workload id of the result message
|
||||||
|
*/
|
||||||
|
void receiveExperimentResults(FailControlMessage &ctrlmsg,
|
||||||
|
yield_context yield);
|
||||||
|
|
||||||
|
enum ProgressType { Send, Receive, Resend };
|
||||||
|
void print_progress(const enum ProgressType, const uint32_t,
|
||||||
|
const uint32_t);
|
||||||
|
|
||||||
|
public:
|
||||||
|
CommThread(tcp::socket socket, JobServer &p)
|
||||||
|
: m_socket(std::move(socket)), m_job_size(1), m_js(p) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The thread's entry point.
|
||||||
|
*/
|
||||||
|
void operator()(yield_context yield);
|
||||||
|
};
|
||||||
|
|
||||||
|
namespace AsyncSocket {
|
||||||
|
|
||||||
|
static bool rcvMsg(tcp::socket &socket, google::protobuf::Message &msg,
|
||||||
|
yield_context yield, bool drop = false)
|
||||||
|
{
|
||||||
|
boost::system::error_code ec;
|
||||||
|
int size;
|
||||||
|
size_t len = async_read(socket, buffer(&size, sizeof(size)), yield[ec]);
|
||||||
|
if (len != sizeof(size)) {
|
||||||
|
std::cerr << "Read " << len << " instead of " << sizeof(size)
|
||||||
|
<< " bytes from socket" << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (ec) {
|
||||||
|
std::cerr << ec.message() << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const size_t msg_size = ntohl(size);
|
||||||
|
|
||||||
|
std::vector<char> buf(msg_size);
|
||||||
|
len = async_read(socket, buffer(buf), yield[ec]);
|
||||||
|
if (len != msg_size) {
|
||||||
|
std::cerr << "Read " << len << " instead of " << msg_size
|
||||||
|
<< " bytes from socket" << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (ec) {
|
||||||
|
std::cerr << ec.message() << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return drop ? true : msg.ParseFromArray(buf.data(), buf.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool dropMsg(tcp::socket &socket, yield_context yield) {
|
||||||
|
FailControlMessage msg;
|
||||||
|
return rcvMsg(socket, msg, yield, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool sendMsg(tcp::socket &socket, google::protobuf::Message &msg,
|
||||||
|
yield_context yield)
|
||||||
|
{
|
||||||
|
std::string buf;
|
||||||
|
if (!msg.SerializeToString(&buf)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int size = htonl(msg.ByteSize());
|
||||||
|
boost::array<const_buffer, 2> bufs{buffer(&size, sizeof(size)),
|
||||||
|
buffer(buf)};
|
||||||
|
boost::system::error_code ec;
|
||||||
|
async_write(socket, bufs, yield[ec]);
|
||||||
|
if (ec) {
|
||||||
|
std::cerr << ec.message() << std::endl;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct JobServer::impl {
|
||||||
|
io_service accept_service;
|
||||||
|
io_service comm_service;
|
||||||
|
std::thread comm_thread;
|
||||||
|
std::atomic<uint64_t> redundant_results{0};
|
||||||
|
|
||||||
|
//! Campaign signaled last experiment data set
|
||||||
|
std::atomic_bool noMoreExps{false};
|
||||||
|
|
||||||
|
impl()
|
||||||
|
: comm_thread([this] {
|
||||||
|
io_service::work work(comm_service);
|
||||||
|
comm_service.run();
|
||||||
|
std::cout << "Received " << redundant_results
|
||||||
|
<< " redundant results." << std::endl;
|
||||||
|
})
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
~impl()
|
||||||
|
{
|
||||||
|
comm_service.stop();
|
||||||
|
if (comm_thread.joinable()) {
|
||||||
|
comm_thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
JobServer::JobServer(const unsigned short port)
|
||||||
|
: m_d(std::make_shared<impl>()), m_port(port), m_finish(false),
|
||||||
|
m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
|
||||||
|
{
|
||||||
|
SocketComm::init();
|
||||||
|
m_runid = std::time(0);
|
||||||
|
#ifndef __puma
|
||||||
|
m_serverThread = new boost::thread(&JobServer::run, this);
|
||||||
|
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||||
|
m_measureThread = new boost::thread(&JobServer::measure, this);
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
void JobServer::addParam(ExperimentData* exp)
|
void JobServer::addParam(ExperimentData* exp)
|
||||||
{
|
{
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
@ -45,17 +200,15 @@ ExperimentData *JobServer::getDone()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool JobServer::noMoreExperiments() const { return m_d->noMoreExps; }
|
||||||
|
|
||||||
void JobServer::setNoMoreExperiments()
|
void JobServer::setNoMoreExperiments()
|
||||||
{
|
{
|
||||||
#ifndef __puma
|
|
||||||
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_CommMutex);
|
||||||
#endif
|
m_d->noMoreExps = true;
|
||||||
// currently not really necessary, as we only non-blockingly dequeue:
|
|
||||||
m_undoneJobs.setIsFinished();
|
m_undoneJobs.setIsFinished();
|
||||||
|
|
||||||
m_noMoreExps = true;
|
|
||||||
if (m_undoneJobs.Size() == 0 &&
|
if (m_undoneJobs.Size() == 0 &&
|
||||||
noMoreExperiments() &&
|
|
||||||
m_runningJobs.Size() == 0) {
|
m_runningJobs.Size() == 0) {
|
||||||
m_doneJobs.setIsFinished();
|
m_doneJobs.setIsFinished();
|
||||||
}
|
}
|
||||||
@ -90,134 +243,77 @@ void JobServer::measure()
|
|||||||
}
|
}
|
||||||
#endif // SERVER_PERFORMANCE_MEASURE
|
#endif // SERVER_PERFORMANCE_MEASURE
|
||||||
|
|
||||||
#ifndef __puma
|
void JobServer::done()
|
||||||
/**
|
{
|
||||||
* This is a predicate class for the remove_if operator on the thread
|
m_d->accept_service.stop();
|
||||||
* list. The operator waits for timeout milliseconds to join each
|
m_d->comm_service.stop();
|
||||||
* thread in the list. If the join was successful, the exited thread
|
}
|
||||||
* is deallocated and removed from the list.
|
|
||||||
*/
|
|
||||||
struct timed_join_successful {
|
|
||||||
int timeout_ms;
|
|
||||||
timed_join_successful(int timeout)
|
|
||||||
: timeout_ms(timeout) { }
|
|
||||||
|
|
||||||
bool operator()(boost::thread* threadelement)
|
|
||||||
{
|
|
||||||
boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms);
|
|
||||||
if (threadelement->timed_join(timeout)) {
|
|
||||||
delete threadelement;
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void JobServer::run()
|
void JobServer::run()
|
||||||
{
|
{
|
||||||
struct sockaddr_in clientaddr;
|
const auto ep = tcp::endpoint(tcp::v4(), m_port);
|
||||||
socklen_t clen = sizeof(clientaddr);
|
auto acceptor = tcp::acceptor(m_d->accept_service, ep, true);
|
||||||
|
{
|
||||||
// implementation of server-client communication
|
const auto local_ep = acceptor.local_endpoint();
|
||||||
int s;
|
std::cout << "Listening on " << local_ep.address() << ":"
|
||||||
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
|
<< local_ep.port() << std::endl;
|
||||||
perror("socket");
|
|
||||||
// TODO: Log-level?
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Enable address reuse */
|
spawn(m_d->accept_service, [this, &acceptor](yield_context yield) {
|
||||||
int on = 1;
|
for (;;) {
|
||||||
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
|
tcp::socket socket(m_d->comm_service);
|
||||||
perror("setsockopt");
|
boost::system::error_code ec;
|
||||||
// TODO: Log-level?
|
acceptor.async_accept(socket, yield[ec]);
|
||||||
return;
|
if (ec) {
|
||||||
}
|
std::cerr
|
||||||
|
<< "Error accept()ing a new connection "
|
||||||
/* IPv4, bind to all interfaces */
|
<< ec.message() << std::endl;
|
||||||
struct sockaddr_in saddr = {0};
|
|
||||||
saddr.sin_family = AF_INET;
|
|
||||||
saddr.sin_port = htons(m_port);
|
|
||||||
saddr.sin_addr.s_addr = htons(INADDR_ANY);
|
|
||||||
|
|
||||||
/* bind to port */
|
|
||||||
if (::bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) {
|
|
||||||
perror("bind");
|
|
||||||
// TODO: Log-level?
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Listen with a backlog of maxThreads */
|
|
||||||
if (listen(s, m_maxThreads) == -1) {
|
|
||||||
perror("listen");
|
|
||||||
close(s);
|
|
||||||
// TODO: Log-level?
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
cout << "JobServer listening ..." << endl;
|
|
||||||
// TODO: Log-level?
|
|
||||||
#ifndef __puma
|
|
||||||
boost::thread* th;
|
|
||||||
while (!m_finish) {
|
|
||||||
// Accept connection
|
|
||||||
int cs = SocketComm::timedAccept(s, (struct sockaddr*)&clientaddr, &clen, 100);
|
|
||||||
if (cs < 0) {
|
|
||||||
if (errno != EWOULDBLOCK) {
|
|
||||||
perror("poll/accept");
|
|
||||||
close(s);
|
|
||||||
// TODO: Log-level?
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
bool creation_failed = false;
|
spawn(m_d->comm_service,
|
||||||
do {
|
[ socket = std::move(socket),
|
||||||
// Spawn a thread for further communication,
|
this ](yield_context yield) mutable {
|
||||||
// and add this thread to a list threads
|
CommThread coro(std::move(socket), *this);
|
||||||
// We can limit the generation of threads here.
|
coro(yield);
|
||||||
if (m_threadlist.size() >= m_maxThreads || creation_failed) {
|
});
|
||||||
// Run over list with a timed_join,
|
|
||||||
// removing finished threads.
|
|
||||||
do {
|
|
||||||
m_threadlist.remove_if(timed_join_successful(m_threadtimeout));
|
|
||||||
} while (m_threadlist.size() >= m_maxThreads);
|
|
||||||
}
|
}
|
||||||
// Start new thread
|
});
|
||||||
try {
|
|
||||||
th = new boost::thread(CommThread(cs, *this));
|
|
||||||
creation_failed = false;
|
|
||||||
} catch (boost::thread_resource_error e) {
|
|
||||||
cout << "failed to spawn thread, throttling ..." << endl;
|
|
||||||
creation_failed = true;
|
|
||||||
sleep(1);
|
|
||||||
}
|
|
||||||
} while (creation_failed);
|
|
||||||
|
|
||||||
m_threadlist.push_back(th);
|
m_d->accept_service.run();
|
||||||
}
|
|
||||||
close(s);
|
|
||||||
// when all undone Jobs are distributed -> call a timed_join on all spawned
|
|
||||||
// TODO: interrupt threads that do not want to join..
|
|
||||||
while (m_threadlist.size() > 0)
|
|
||||||
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CommThread::operator()()
|
void CommThread::print_progress(const enum ProgressType type,
|
||||||
|
const uint32_t w_id, const uint32_t count)
|
||||||
|
{
|
||||||
|
using namespace std::chrono;
|
||||||
|
static system_clock::time_point last;
|
||||||
|
const auto now = system_clock::now();
|
||||||
|
if (last + seconds{1} > now) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
last = now;
|
||||||
|
std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/"
|
||||||
|
<< std::setw(6) << m_js.m_runningJobs.Size() << "/"
|
||||||
|
<< std::setw(6) << m_js.m_doneJobs.Size() << " - ";
|
||||||
|
const char *sep;
|
||||||
|
if (type == ProgressType::Send) {
|
||||||
|
sep = " >";
|
||||||
|
} else if (type == ProgressType::Resend) {
|
||||||
|
sep = ">>";
|
||||||
|
} else {
|
||||||
|
sep = " <";
|
||||||
|
}
|
||||||
|
std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush;
|
||||||
|
}
|
||||||
|
|
||||||
|
void CommThread::operator()(yield_context yield)
|
||||||
{
|
{
|
||||||
// The communication thread implementation:
|
// The communication thread implementation:
|
||||||
|
|
||||||
Minion minion;
|
|
||||||
FailControlMessage ctrlmsg;
|
FailControlMessage ctrlmsg;
|
||||||
minion.setSocketDescriptor(m_sock);
|
|
||||||
|
|
||||||
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
if (!AsyncSocket::rcvMsg(m_socket, ctrlmsg, yield)) {
|
||||||
cout << "!![Server] failed to read complete message from client" << endl;
|
cout << "!![Server] failed to read complete message from client" << endl;
|
||||||
close(m_sock);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,12 +325,12 @@ void CommThread::operator()()
|
|||||||
ctrlmsg.Clear();
|
ctrlmsg.Clear();
|
||||||
ctrlmsg.set_command(FailControlMessage::DIE);
|
ctrlmsg.set_command(FailControlMessage::DIE);
|
||||||
ctrlmsg.set_build_id(42);
|
ctrlmsg.set_build_id(42);
|
||||||
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
|
AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// give minion something to do..
|
// give minion something to do..
|
||||||
m_job_size = ctrlmsg.job_size();
|
m_job_size = ctrlmsg.job_size();
|
||||||
sendPendingExperimentData(minion);
|
sendPendingExperimentData(yield);
|
||||||
break;
|
break;
|
||||||
case FailControlMessage::RESULT_FOLLOWS:
|
case FailControlMessage::RESULT_FOLLOWS:
|
||||||
// ignore old client's results
|
// ignore old client's results
|
||||||
@ -243,7 +339,7 @@ void CommThread::operator()()
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// get results and put to done queue.
|
// get results and put to done queue.
|
||||||
receiveExperimentResults(minion, ctrlmsg);
|
receiveExperimentResults(ctrlmsg, yield);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// hm.. don't know what to do. please die.
|
// hm.. don't know what to do. please die.
|
||||||
@ -252,13 +348,11 @@ void CommThread::operator()()
|
|||||||
ctrlmsg.Clear();
|
ctrlmsg.Clear();
|
||||||
ctrlmsg.set_command(FailControlMessage::DIE);
|
ctrlmsg.set_command(FailControlMessage::DIE);
|
||||||
ctrlmsg.set_build_id(42);
|
ctrlmsg.set_build_id(42);
|
||||||
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
|
AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
|
||||||
}
|
}
|
||||||
|
|
||||||
close(m_sock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CommThread::sendPendingExperimentData(Minion& minion)
|
void CommThread::sendPendingExperimentData(yield_context yield)
|
||||||
{
|
{
|
||||||
uint32_t i;
|
uint32_t i;
|
||||||
uint32_t workloadID;
|
uint32_t workloadID;
|
||||||
@ -284,12 +378,12 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
if (exp.size() != 0) {
|
if (exp.size() != 0) {
|
||||||
ctrlmsg.set_job_size(exp.size());
|
ctrlmsg.set_job_size(exp.size());
|
||||||
|
|
||||||
cout << " >>[" << ctrlmsg.workloadid(0) << "+"
|
print_progress(ProgressType::Send, ctrlmsg.workloadid(0),
|
||||||
<< exp.size() << "] \r" << flush;
|
exp.size());
|
||||||
|
|
||||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
if (AsyncSocket::sendMsg(m_socket, ctrlmsg, yield)) {
|
||||||
for (i = 0; i < ctrlmsg.job_size(); i++) {
|
for (i = 0; i < ctrlmsg.job_size(); i++) {
|
||||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
if (AsyncSocket::sendMsg(m_socket, exp.front()->getMessage(), yield)) {
|
||||||
|
|
||||||
// delay insertion into m_runningJobs until here, as
|
// delay insertion into m_runningJobs until here, as
|
||||||
// getMessage() won't work anymore if this job is re-sent,
|
// getMessage() won't work anymore if this job is re-sent,
|
||||||
@ -314,6 +408,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{ // Don't indent properly to reduce patch-noise.
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
// Prevent receiveExperimentResults from modifying (or indirectly, via
|
// Prevent receiveExperimentResults from modifying (or indirectly, via
|
||||||
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
|
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
|
||||||
@ -337,33 +432,39 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||||
ctrlmsg.add_workloadid(workloadID); // set workload id
|
ctrlmsg.add_workloadid(workloadID); // set workload id
|
||||||
ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job
|
ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job
|
||||||
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
|
print_progress(ProgressType::Resend, workloadID, 1);
|
||||||
cout << ">>R[" << workloadID << "] \r" << flush;
|
|
||||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
|
||||||
SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage());
|
|
||||||
}
|
|
||||||
} else if (m_js.noMoreExperiments() == false) {
|
} else if (m_js.noMoreExperiments() == false) {
|
||||||
// Currently we have no workload (even the running-job-queue is empty!), but
|
// Currently we have no workload (even the running-job-queue is empty!), but
|
||||||
// the campaign is not over yet. Minion can try again later.
|
// the campaign is not over yet. Minion can try again later.
|
||||||
ctrlmsg.set_command(FailControlMessage::COME_AGAIN);
|
ctrlmsg.set_command(FailControlMessage::COME_AGAIN);
|
||||||
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
|
|
||||||
cout << "--[Server] No workload, come again..." << endl;
|
cout << "--[Server] No workload, come again..." << endl;
|
||||||
} else {
|
} else {
|
||||||
// No more elements, and campaign is over. Minion can die.
|
// No work & Camapaign is done. Go away.
|
||||||
ctrlmsg.set_command(FailControlMessage::DIE);
|
ctrlmsg.set_command(FailControlMessage::DIE);
|
||||||
cout << "--[Server] No workload, and no campaign, please die." << endl;
|
}
|
||||||
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
|
}
|
||||||
|
|
||||||
|
const bool success = AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
|
||||||
|
if (temp_exp != nullptr && success) {
|
||||||
|
AsyncSocket::sendMsg(m_socket, temp_exp->getMessage(), yield);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg)
|
void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg,
|
||||||
|
yield_context yield)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
ExperimentData* exp = NULL; // Get exp* from running jobs
|
ExperimentData* exp = NULL; // Get exp* from running jobs
|
||||||
if (ctrlmsg.workloadid_size() > 0) {
|
if (ctrlmsg.workloadid_size() > 0) {
|
||||||
cout << " <<[" << ctrlmsg.workloadid(0) << "+"
|
print_progress(ProgressType::Receive, ctrlmsg.workloadid(0),
|
||||||
<< ctrlmsg.workloadid_size() << "] \r" << flush;
|
ctrlmsg.workloadid_size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No yielding under locks ;)
|
||||||
|
std::vector<std::tuple<ExperimentData *, uint32_t>> msgs;
|
||||||
|
msgs.reserve(ctrlmsg.workloadid_size());
|
||||||
|
|
||||||
|
{ // Don't indent properly to reduce patch-noise.
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
// Prevent re-sending jobs in sendPendingExperimentData:
|
// Prevent re-sending jobs in sendPendingExperimentData:
|
||||||
// a) sendPendingExperimentData needs an intact job to serialize and send it.
|
// a) sendPendingExperimentData needs an intact job to serialize and send it.
|
||||||
@ -374,23 +475,44 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
|
|||||||
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
|
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
|
||||||
#endif
|
#endif
|
||||||
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
|
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
|
||||||
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
|
const uint32_t id = ctrlmsg.workloadid(i);
|
||||||
// deserialize results, expect failures
|
const bool success = m_js.m_runningJobs.remove(id, exp);
|
||||||
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) {
|
msgs.emplace_back(success ? exp : nullptr, id);
|
||||||
m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp);
|
|
||||||
} else {
|
|
||||||
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Do I/O */
|
||||||
|
std::vector<std::tuple<ExperimentData *, uint32_t, bool>> recvd;
|
||||||
|
msgs.reserve(ctrlmsg.workloadid_size());
|
||||||
|
for (auto &&msg : msgs) {
|
||||||
|
auto &&exp = std::get<0>(msg);
|
||||||
|
if (exp != nullptr) {
|
||||||
|
const auto w_id = std::get<1>(msg);
|
||||||
|
const bool success = AsyncSocket::rcvMsg(
|
||||||
|
m_socket, exp->getMessage(), yield);
|
||||||
|
recvd.emplace_back(exp, w_id, success);
|
||||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||||
++JobServer::m_DoneCount;
|
++JobServer::m_DoneCount;
|
||||||
#endif
|
#endif
|
||||||
} else {
|
} else {
|
||||||
// We can receive several results for the same workload id because
|
// We can receive several results for the same workload id because
|
||||||
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
||||||
cout << "[Server] Received another result for workload id ["
|
++m_js.m_d->redundant_results;
|
||||||
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl;
|
AsyncSocket::dropMsg(m_socket, yield);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SocketComm::dropMsg(minion.getSocketDescriptor());
|
#ifndef __puma
|
||||||
|
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
|
||||||
|
#endif
|
||||||
|
for (auto &&rcv : recvd) {
|
||||||
|
auto &&exp = std::get<0>(rcv);
|
||||||
|
const auto received = std::get<2>(rcv);
|
||||||
|
if (received) {
|
||||||
|
m_js.m_doneJobs.Enqueue(exp);
|
||||||
|
} else {
|
||||||
|
const auto w_id = std::get<1>(rcv);
|
||||||
|
m_js.m_runningJobs.insert(w_id, exp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,16 +1,18 @@
|
|||||||
#ifndef __JOB_SERVER_H__
|
#ifndef __JOB_SERVER_H__
|
||||||
#define __JOB_SERVER_H__
|
#define __JOB_SERVER_H__
|
||||||
|
|
||||||
#include "Minion.hpp"
|
|
||||||
#include "util/SynchronizedQueue.hpp"
|
#include "util/SynchronizedQueue.hpp"
|
||||||
#include "util/SynchronizedCounter.hpp"
|
#include "util/SynchronizedCounter.hpp"
|
||||||
#include "util/SynchronizedMap.hpp"
|
#include "util/SynchronizedMap.hpp"
|
||||||
#include "config/FailConfig.hpp"
|
#include "config/FailConfig.hpp"
|
||||||
|
#include "comm/ExperimentData.hpp"
|
||||||
#include "comm/FailControlMessage.pb.h"
|
#include "comm/FailControlMessage.pb.h"
|
||||||
#include "comm/SocketComm.hpp"
|
#include "comm/SocketComm.hpp"
|
||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
|
#include <memory>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
@ -30,14 +32,13 @@ class CommThread;
|
|||||||
*/
|
*/
|
||||||
class JobServer {
|
class JobServer {
|
||||||
private:
|
private:
|
||||||
|
struct impl;
|
||||||
|
std::shared_ptr<impl> m_d;
|
||||||
|
|
||||||
//! The TCP Port number
|
//! The TCP Port number
|
||||||
int m_port;
|
const unsigned short m_port;
|
||||||
//! TODO nice termination concept
|
//! TODO nice termination concept
|
||||||
bool m_finish;
|
bool m_finish;
|
||||||
//! Campaign signaled last expirement data set
|
|
||||||
bool m_noMoreExps;
|
|
||||||
//! the maximal number of threads spawned for TCP communication
|
|
||||||
unsigned m_maxThreads;
|
|
||||||
//! the maximal timeout per communication thread
|
//! the maximal timeout per communication thread
|
||||||
int m_threadtimeout;
|
int m_threadtimeout;
|
||||||
//! list of spawned threads
|
//! list of spawned threads
|
||||||
@ -82,18 +83,7 @@ private:
|
|||||||
void sendWork(int sockfd);
|
void sendWork(int sockfd);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false),
|
JobServer(const unsigned short port = SERVER_COMM_TCP_PORT);
|
||||||
m_maxThreads(128), m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
|
|
||||||
{
|
|
||||||
SocketComm::init();
|
|
||||||
m_runid = std::time(0);
|
|
||||||
#ifndef __puma
|
|
||||||
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
|
|
||||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
|
||||||
m_measureThread = new boost::thread(&JobServer::measure, this);
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
~JobServer()
|
~JobServer()
|
||||||
{
|
{
|
||||||
done();
|
done();
|
||||||
@ -130,50 +120,13 @@ public:
|
|||||||
* @return \c true if no more parameter sets available, \c false otherwise
|
* @return \c true if no more parameter sets available, \c false otherwise
|
||||||
* @see setNoMoreExperiments
|
* @see setNoMoreExperiments
|
||||||
*/
|
*/
|
||||||
bool noMoreExperiments() const { return m_noMoreExps; }
|
bool noMoreExperiments() const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Campaign Controller may signal that the jobserver can stop listening
|
* The Campaign Controller may signal that the jobserver can stop listening
|
||||||
* for client connections.
|
* for client connections.
|
||||||
*/
|
*/
|
||||||
void done() { m_finish = true; }
|
void done();
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @class CommThread
|
|
||||||
* Implementation of the communication threads.
|
|
||||||
* This class implements the actual communication
|
|
||||||
* with the minions.
|
|
||||||
*/
|
|
||||||
class CommThread {
|
|
||||||
private:
|
|
||||||
int m_sock; //! Socket descriptor of the connection
|
|
||||||
uint32_t m_job_size;
|
|
||||||
JobServer& m_js; //! Calling jobserver
|
|
||||||
|
|
||||||
// FIXME: Concerns are not really separated yet ;)
|
|
||||||
/**
|
|
||||||
* Called after minion calls for work.
|
|
||||||
* Tries to deque a parameter set non blocking, and
|
|
||||||
* sends it back to the requesting minion.
|
|
||||||
* @param minion The minion asking for input
|
|
||||||
*/
|
|
||||||
void sendPendingExperimentData(Minion& minion);
|
|
||||||
/**
|
|
||||||
* Called after minion offers a result message.
|
|
||||||
* Evaluates the Workload ID and puts the corresponding
|
|
||||||
* job result into the result queue.
|
|
||||||
* @param minion The minion offering results
|
|
||||||
* @param workloadID The workload id of the result message
|
|
||||||
*/
|
|
||||||
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
|
|
||||||
public:
|
|
||||||
CommThread(int sockfd, JobServer& p)
|
|
||||||
: m_sock(sockfd), m_job_size(1), m_js(p) { }
|
|
||||||
/**
|
|
||||||
* The thread's entry point.
|
|
||||||
*/
|
|
||||||
void operator()();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end-of-namespace: fail
|
} // end-of-namespace: fail
|
||||||
|
|||||||
@ -1,72 +0,0 @@
|
|||||||
/**
|
|
||||||
* \file Minion.hpp
|
|
||||||
* \brief The representation of a minion.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef __MINION_HPP__
|
|
||||||
#define __MINION_HPP__
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
#include "comm/ExperimentData.hpp"
|
|
||||||
|
|
||||||
namespace fail {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* \class Minion
|
|
||||||
*
|
|
||||||
* Contains all informations about a minion.
|
|
||||||
*/
|
|
||||||
class Minion {
|
|
||||||
private:
|
|
||||||
std::string hostname;
|
|
||||||
bool isWorking;
|
|
||||||
ExperimentData* currentExperimentData;
|
|
||||||
int sockfd;
|
|
||||||
public:
|
|
||||||
Minion() : isWorking(false), currentExperimentData(0), sockfd(-1) { }
|
|
||||||
/**
|
|
||||||
* Sets the socket descriptor.
|
|
||||||
* @param sock the new socket descriptor (used internal)
|
|
||||||
*/
|
|
||||||
void setSocketDescriptor(int sock) { sockfd = sock; }
|
|
||||||
/**
|
|
||||||
* Retrives the socket descriptor.
|
|
||||||
* @return the socket descriptor
|
|
||||||
*/
|
|
||||||
int getSocketDescriptor() const { return (sockfd); }
|
|
||||||
/**
|
|
||||||
* Returns the hostname of the minion.
|
|
||||||
* @return the hostname
|
|
||||||
*/
|
|
||||||
const std::string& getHostname() { return (hostname); }
|
|
||||||
/**
|
|
||||||
* Sets the hostname of the minion.
|
|
||||||
* @param host the hostname
|
|
||||||
*/
|
|
||||||
void setHostname(const std::string& host) { hostname = host; }
|
|
||||||
/**
|
|
||||||
* Returns the current ExperimentData which the minion is working with.
|
|
||||||
* @return a pointer of the current ExperimentData
|
|
||||||
*/
|
|
||||||
ExperimentData* getCurrentExperimentData() { return currentExperimentData; }
|
|
||||||
/**
|
|
||||||
* Sets the current ExperimentData which the minion is working with.
|
|
||||||
* @param exp the current ExperimentData
|
|
||||||
*/
|
|
||||||
void setCurrentExperimentData(ExperimentData* exp) { currentExperimentData = exp; }
|
|
||||||
/**
|
|
||||||
* Returns the current state of the minion.
|
|
||||||
* @return the current state
|
|
||||||
*/
|
|
||||||
bool isBusy() { return (isWorking); }
|
|
||||||
/**
|
|
||||||
* Sets the current state of the minion
|
|
||||||
* @param state the current state
|
|
||||||
*/
|
|
||||||
void setBusy(bool state) { isWorking = state; }
|
|
||||||
};
|
|
||||||
|
|
||||||
} // end-of-namespace: fail
|
|
||||||
|
|
||||||
#endif // __MINION_HPP__
|
|
||||||
Reference in New Issue
Block a user