Use boost-asio to improve FAIL* server performance

This patch overhauls the FAIL* server code to leverage Boost asio to be able to
handle a large number of clients (>4000). In this implementation the server is
now single threaded. I've not encountered any problems with this for up to
about 10k clients. Boost ASIO can also be used multithreaded, but I assume the
FAIL* internal data structures (Synchronized*) will become a bottleneck first.

The code now additionally depends on Boost Coro and Boost Context, as well as
a C++ 14 compiler, although the only C++14 feature required is a lambda capture
with initializer, such as [ x = std::move(x) ]. gcc-4.9.2 does this.

The code could (and probably should) be cleaned up more. Comments are wordy,
code is unnecessary now (multiple server threads), code is not self-contained
(headers spread dependencies), many ifdef's (server performance measuring
should be runtime rather than a compile time option), and much more. But for
this patch I was going for a minimal changeset the get the functionality in,
to have an easier review. Alas, FAIL* has no Unit-test suite to run the changes
against.

To handle such a large number of clients more changes were necessary, for
example server status output is now performed every 1s, instead for every
request.

The class Minion was removed completely; the only thing it was doing was
encapsulate an int.

The server has now a runtime-configurable port, or it can select a free port on
its own if none is specified. This requires the CampaignManager to add a port
argument and instantiate the JobServer dynamically.

Change-Id: Iad9238972161f95f5802bd2251116f8aeee14884
This commit is contained in:
Hannes Weisbach
2017-05-22 14:19:13 +02:00
parent 48ceeb6a14
commit 6c120004eb
7 changed files with 345 additions and 302 deletions

View File

@ -1,5 +1,5 @@
### Add Boost and Threads ### Add Boost and Threads
find_package(Boost 1.42 COMPONENTS thread REQUIRED) find_package(Boost 1.42 COMPONENTS thread coroutine context REQUIRED)
include_directories(${Boost_INCLUDE_DIRS}) include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS}) link_directories(${Boost_LIBRARY_DIRS})

View File

@ -4,6 +4,8 @@ set(SRCS
DatabaseCampaign.cc DatabaseCampaign.cc
) )
set_source_files_properties(JobServer.cc CampaignManager.cc PROPERTIES COMPILE_FLAGS -std=c++11)
find_package(MySQL REQUIRED) find_package(MySQL REQUIRED)
include_directories(${MYSQL_INCLUDE_DIR}) include_directories(${MYSQL_INCLUDE_DIR})
@ -15,7 +17,7 @@ else(CONFIG_INJECTIONPOINT_HOPS)
endif(CONFIG_INJECTIONPOINT_HOPS) endif(CONFIG_INJECTIONPOINT_HOPS)
add_library(fail-cpn ${SRCS}) add_library(fail-cpn ${SRCS})
target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES}) target_link_libraries(fail-cpn fail-comm fail-util ${MYSQL_LIBRARIES} ${Boost_COROUTINE_LIBRARY} ${Boost_CONTEXT_LIBRARY})
# if hop-chains need to be calculated by the server, we # if hop-chains need to be calculated by the server, we
# the smarthopping module # the smarthopping module

View File

@ -1,7 +1,46 @@
#include <cstdlib>
#include "CampaignManager.hpp" #include "CampaignManager.hpp"
#include "util/Logger.hpp"
#include "JobServer.hpp"
namespace fail { namespace fail {
static Logger log_send("CampaignManager");
CampaignManager campaignmanager; CampaignManager campaignmanager;
CampaignManager::~CampaignManager() { delete m_jobserver; }
bool CampaignManager::runCampaign(Campaign *c) {
fail::CommandLine &cmd = fail::CommandLine::Inst();
if (!cmd.parse()) {
log_send << "Error parsing arguments." << std::endl;
exit(-1);
}
if (!m_jobserver) {
m_jobserver = (cmd[port].count() > 0)
? new JobServer(std::atoi(cmd[port].first()->arg))
: new JobServer;
}
m_currentCampaign = c;
bool ret = c->run();
m_jobserver->done();
return ret;
}
void CampaignManager::addParam(ExperimentData *exp)
{
m_jobserver->addParam(exp);
}
ExperimentData *CampaignManager::getDone() { return m_jobserver->getDone(); }
void CampaignManager::noMoreParameters()
{
m_jobserver->setNoMoreExperiments();
}
void CampaignManager::done() { m_jobserver->done(); }
} // end-of-namespace: fail } // end-of-namespace: fail

View File

@ -8,8 +8,8 @@
#include "sal/SALInst.hpp" #include "sal/SALInst.hpp"
#include "comm/ExperimentData.hpp" #include "comm/ExperimentData.hpp"
#include "JobServer.hpp"
#include "Campaign.hpp" #include "Campaign.hpp"
#include "util/CommandLine.hpp"
namespace fail { namespace fail {
@ -19,26 +19,25 @@ namespace fail {
* The CampaignManager allows a user-campaign access to all constant * The CampaignManager allows a user-campaign access to all constant
* simulator information and forwards single experiments to the JobServer. * simulator information and forwards single experiments to the JobServer.
*/ */
class JobServer;
class CampaignManager { class CampaignManager {
private: private:
JobServer *m_jobserver; JobServer *m_jobserver;
Campaign* m_currentCampaign; Campaign* m_currentCampaign;
CommandLine::option_handle port;
public: public:
CampaignManager() : m_jobserver(0), m_currentCampaign(0) { } CampaignManager() : m_jobserver(0), m_currentCampaign(0)
~CampaignManager() { delete m_jobserver; } {
fail::CommandLine &cmd = fail::CommandLine::Inst();
port = cmd.addOption("p", "port", Arg::Required,
"-p,--port \tListening port of server; no "
"value chooses port automatically");
}
~CampaignManager();
/** /**
* Executes a user campaign * Executes a user campaign
*/ */
bool runCampaign(Campaign* c) bool runCampaign(Campaign* c);
{
if (!m_jobserver) {
m_jobserver = new JobServer;
}
m_currentCampaign = c;
bool ret = c->run();
m_jobserver->done();
return ret;
}
/** /**
* Returns a const reference for acquiring constant simulator specific information. * Returns a const reference for acquiring constant simulator specific information.
* e.g., Registernames, to ease experiment data construction. * e.g., Registernames, to ease experiment data construction.
@ -54,21 +53,21 @@ public:
* A Parameter set includes space for results. * A Parameter set includes space for results.
* @param exp A pointer to a ExperimentData set. * @param exp A pointer to a ExperimentData set.
*/ */
void addParam(ExperimentData* exp) { m_jobserver->addParam(exp); } void addParam(ExperimentData* exp);
/** /**
* A user campaign can request a single result (blocking) from the queue. * A user campaign can request a single result (blocking) from the queue.
* @return Pointer to a parameter object with filled result data * @return Pointer to a parameter object with filled result data
* @see addParam() * @see addParam()
*/ */
ExperimentData* getDone() { return m_jobserver->getDone(); } ExperimentData* getDone();
/** /**
* Signal, that there will not come any further parameter sets. * Signal, that there will not come any further parameter sets.
*/ */
void noMoreParameters() { m_jobserver->setNoMoreExperiments(); } void noMoreParameters();
/** /**
* User campaign has finished. * User campaign has finished.
*/ */
void done() { m_jobserver->done(); } void done();
/** /**
* Wait actively, until all experiments expired. * Wait actively, until all experiments expired.
*/ */

View File

@ -1,19 +1,24 @@
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter // <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
#include <algorithm>
#include <chrono>
#include <iomanip>
#include <vector>
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>
#include <arpa/inet.h> #include <thread>
#include <tuple>
#include "comm/SocketComm.hpp" #include "comm/SocketComm.hpp"
#include "JobServer.hpp" #include "JobServer.hpp"
#include "Minion.hpp"
#ifndef __puma #ifndef __puma
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/date_time.hpp> #include <boost/date_time.hpp>
#endif #endif
@ -22,6 +27,156 @@ using namespace std;
namespace fail { namespace fail {
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::system;
/**
* @class CommThread
* Implementation of the communication threads.
* This class implements the actual communication
* with the minions.
*/
class CommThread {
private:
tcp::socket m_socket;
uint32_t m_job_size;
JobServer &m_js; //! Calling jobserver
// FIXME: Concerns are not really separated yet ;)
/**
* Called after minion calls for work.
* Tries to deque a parameter set non blocking, and
* sends it back to the requesting minion.
* @param minion The minion asking for input
*/
void sendPendingExperimentData(yield_context yield);
/**
* Called after minion offers a result message.
* Evaluates the Workload ID and puts the corresponding
* job result into the result queue.
* @param minion The minion offering results
* @param workloadID The workload id of the result message
*/
void receiveExperimentResults(FailControlMessage &ctrlmsg,
yield_context yield);
enum ProgressType { Send, Receive, Resend };
void print_progress(const enum ProgressType, const uint32_t,
const uint32_t);
public:
CommThread(tcp::socket socket, JobServer &p)
: m_socket(std::move(socket)), m_job_size(1), m_js(p) {}
/**
* The thread's entry point.
*/
void operator()(yield_context yield);
};
namespace AsyncSocket {
static bool rcvMsg(tcp::socket &socket, google::protobuf::Message &msg,
yield_context yield, bool drop = false)
{
boost::system::error_code ec;
int size;
size_t len = async_read(socket, buffer(&size, sizeof(size)), yield[ec]);
if (len != sizeof(size)) {
std::cerr << "Read " << len << " instead of " << sizeof(size)
<< " bytes from socket" << std::endl;
return false;
}
if (ec) {
std::cerr << ec.message() << std::endl;
return false;
}
const size_t msg_size = ntohl(size);
std::vector<char> buf(msg_size);
len = async_read(socket, buffer(buf), yield[ec]);
if (len != msg_size) {
std::cerr << "Read " << len << " instead of " << msg_size
<< " bytes from socket" << std::endl;
return false;
}
if (ec) {
std::cerr << ec.message() << std::endl;
return false;
}
return drop ? true : msg.ParseFromArray(buf.data(), buf.size());
}
static bool dropMsg(tcp::socket &socket, yield_context yield) {
FailControlMessage msg;
return rcvMsg(socket, msg, yield, true);
}
static bool sendMsg(tcp::socket &socket, google::protobuf::Message &msg,
yield_context yield)
{
std::string buf;
if (!msg.SerializeToString(&buf)) {
return false;
}
const int size = htonl(msg.ByteSize());
boost::array<const_buffer, 2> bufs{buffer(&size, sizeof(size)),
buffer(buf)};
boost::system::error_code ec;
async_write(socket, bufs, yield[ec]);
if (ec) {
std::cerr << ec.message() << std::endl;
return false;
}
return true;
}
}
struct JobServer::impl {
io_service accept_service;
io_service comm_service;
std::thread comm_thread;
std::atomic<uint64_t> redundant_results{0};
//! Campaign signaled last experiment data set
std::atomic_bool noMoreExps{false};
impl()
: comm_thread([this] {
io_service::work work(comm_service);
comm_service.run();
std::cout << "Received " << redundant_results
<< " redundant results." << std::endl;
})
{
}
~impl()
{
comm_service.stop();
if (comm_thread.joinable()) {
comm_thread.join();
}
}
};
JobServer::JobServer(const unsigned short port)
: m_d(std::make_shared<impl>()), m_port(port), m_finish(false),
m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
{
SocketComm::init();
m_runid = std::time(0);
#ifndef __puma
m_serverThread = new boost::thread(&JobServer::run, this);
#ifdef SERVER_PERFORMANCE_MEASURE
m_measureThread = new boost::thread(&JobServer::measure, this);
#endif
#endif
}
void JobServer::addParam(ExperimentData* exp) void JobServer::addParam(ExperimentData* exp)
{ {
#ifndef __puma #ifndef __puma
@ -45,17 +200,15 @@ ExperimentData *JobServer::getDone()
#endif #endif
} }
bool JobServer::noMoreExperiments() const { return m_d->noMoreExps; }
void JobServer::setNoMoreExperiments() void JobServer::setNoMoreExperiments()
{ {
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex); boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif m_d->noMoreExps = true;
// currently not really necessary, as we only non-blockingly dequeue:
m_undoneJobs.setIsFinished(); m_undoneJobs.setIsFinished();
m_noMoreExps = true;
if (m_undoneJobs.Size() == 0 && if (m_undoneJobs.Size() == 0 &&
noMoreExperiments() &&
m_runningJobs.Size() == 0) { m_runningJobs.Size() == 0) {
m_doneJobs.setIsFinished(); m_doneJobs.setIsFinished();
} }
@ -90,134 +243,77 @@ void JobServer::measure()
} }
#endif // SERVER_PERFORMANCE_MEASURE #endif // SERVER_PERFORMANCE_MEASURE
#ifndef __puma void JobServer::done()
/** {
* This is a predicate class for the remove_if operator on the thread m_d->accept_service.stop();
* list. The operator waits for timeout milliseconds to join each m_d->comm_service.stop();
* thread in the list. If the join was successful, the exited thread }
* is deallocated and removed from the list.
*/
struct timed_join_successful {
int timeout_ms;
timed_join_successful(int timeout)
: timeout_ms(timeout) { }
bool operator()(boost::thread* threadelement)
{
boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms);
if (threadelement->timed_join(timeout)) {
delete threadelement;
return true;
} else {
return false;
}
}
};
#endif
void JobServer::run() void JobServer::run()
{ {
struct sockaddr_in clientaddr; const auto ep = tcp::endpoint(tcp::v4(), m_port);
socklen_t clen = sizeof(clientaddr); auto acceptor = tcp::acceptor(m_d->accept_service, ep, true);
{
// implementation of server-client communication const auto local_ep = acceptor.local_endpoint();
int s; std::cout << "Listening on " << local_ep.address() << ":"
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) { << local_ep.port() << std::endl;
perror("socket");
// TODO: Log-level?
return;
} }
/* Enable address reuse */ spawn(m_d->accept_service, [this, &acceptor](yield_context yield) {
int on = 1; for (;;) {
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { tcp::socket socket(m_d->comm_service);
perror("setsockopt"); boost::system::error_code ec;
// TODO: Log-level? acceptor.async_accept(socket, yield[ec]);
return; if (ec) {
} std::cerr
<< "Error accept()ing a new connection "
/* IPv4, bind to all interfaces */ << ec.message() << std::endl;
struct sockaddr_in saddr = {0};
saddr.sin_family = AF_INET;
saddr.sin_port = htons(m_port);
saddr.sin_addr.s_addr = htons(INADDR_ANY);
/* bind to port */
if (::bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) {
perror("bind");
// TODO: Log-level?
return;
}
/* Listen with a backlog of maxThreads */
if (listen(s, m_maxThreads) == -1) {
perror("listen");
close(s);
// TODO: Log-level?
return;
}
cout << "JobServer listening ..." << endl;
// TODO: Log-level?
#ifndef __puma
boost::thread* th;
while (!m_finish) {
// Accept connection
int cs = SocketComm::timedAccept(s, (struct sockaddr*)&clientaddr, &clen, 100);
if (cs < 0) {
if (errno != EWOULDBLOCK) {
perror("poll/accept");
close(s);
// TODO: Log-level?
return;
} else {
continue; continue;
} }
}
bool creation_failed = false; spawn(m_d->comm_service,
do { [ socket = std::move(socket),
// Spawn a thread for further communication, this ](yield_context yield) mutable {
// and add this thread to a list threads CommThread coro(std::move(socket), *this);
// We can limit the generation of threads here. coro(yield);
if (m_threadlist.size() >= m_maxThreads || creation_failed) { });
// Run over list with a timed_join,
// removing finished threads.
do {
m_threadlist.remove_if(timed_join_successful(m_threadtimeout));
} while (m_threadlist.size() >= m_maxThreads);
} }
// Start new thread });
try {
th = new boost::thread(CommThread(cs, *this));
creation_failed = false;
} catch (boost::thread_resource_error e) {
cout << "failed to spawn thread, throttling ..." << endl;
creation_failed = true;
sleep(1);
}
} while (creation_failed);
m_threadlist.push_back(th); m_d->accept_service.run();
}
close(s);
// when all undone Jobs are distributed -> call a timed_join on all spawned
// TODO: interrupt threads that do not want to join..
while (m_threadlist.size() > 0)
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
#endif
} }
void CommThread::operator()() void CommThread::print_progress(const enum ProgressType type,
const uint32_t w_id, const uint32_t count)
{
using namespace std::chrono;
static system_clock::time_point last;
const auto now = system_clock::now();
if (last + seconds{1} > now) {
return;
}
last = now;
std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/"
<< std::setw(6) << m_js.m_runningJobs.Size() << "/"
<< std::setw(6) << m_js.m_doneJobs.Size() << " - ";
const char *sep;
if (type == ProgressType::Send) {
sep = " >";
} else if (type == ProgressType::Resend) {
sep = ">>";
} else {
sep = " <";
}
std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush;
}
void CommThread::operator()(yield_context yield)
{ {
// The communication thread implementation: // The communication thread implementation:
Minion minion;
FailControlMessage ctrlmsg; FailControlMessage ctrlmsg;
minion.setSocketDescriptor(m_sock);
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), ctrlmsg)) { if (!AsyncSocket::rcvMsg(m_socket, ctrlmsg, yield)) {
cout << "!![Server] failed to read complete message from client" << endl; cout << "!![Server] failed to read complete message from client" << endl;
close(m_sock);
return; return;
} }
@ -229,12 +325,12 @@ void CommThread::operator()()
ctrlmsg.Clear(); ctrlmsg.Clear();
ctrlmsg.set_command(FailControlMessage::DIE); ctrlmsg.set_command(FailControlMessage::DIE);
ctrlmsg.set_build_id(42); ctrlmsg.set_build_id(42);
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
break; break;
} }
// give minion something to do.. // give minion something to do..
m_job_size = ctrlmsg.job_size(); m_job_size = ctrlmsg.job_size();
sendPendingExperimentData(minion); sendPendingExperimentData(yield);
break; break;
case FailControlMessage::RESULT_FOLLOWS: case FailControlMessage::RESULT_FOLLOWS:
// ignore old client's results // ignore old client's results
@ -243,7 +339,7 @@ void CommThread::operator()()
break; break;
} }
// get results and put to done queue. // get results and put to done queue.
receiveExperimentResults(minion, ctrlmsg); receiveExperimentResults(ctrlmsg, yield);
break; break;
default: default:
// hm.. don't know what to do. please die. // hm.. don't know what to do. please die.
@ -252,13 +348,11 @@ void CommThread::operator()()
ctrlmsg.Clear(); ctrlmsg.Clear();
ctrlmsg.set_command(FailControlMessage::DIE); ctrlmsg.set_command(FailControlMessage::DIE);
ctrlmsg.set_build_id(42); ctrlmsg.set_build_id(42);
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
} }
close(m_sock);
} }
void CommThread::sendPendingExperimentData(Minion& minion) void CommThread::sendPendingExperimentData(yield_context yield)
{ {
uint32_t i; uint32_t i;
uint32_t workloadID; uint32_t workloadID;
@ -284,12 +378,12 @@ void CommThread::sendPendingExperimentData(Minion& minion)
if (exp.size() != 0) { if (exp.size() != 0) {
ctrlmsg.set_job_size(exp.size()); ctrlmsg.set_job_size(exp.size());
cout << " >>[" << ctrlmsg.workloadid(0) << "+" print_progress(ProgressType::Send, ctrlmsg.workloadid(0),
<< exp.size() << "] \r" << flush; exp.size());
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { if (AsyncSocket::sendMsg(m_socket, ctrlmsg, yield)) {
for (i = 0; i < ctrlmsg.job_size(); i++) { for (i = 0; i < ctrlmsg.job_size(); i++) {
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { if (AsyncSocket::sendMsg(m_socket, exp.front()->getMessage(), yield)) {
// delay insertion into m_runningJobs until here, as // delay insertion into m_runningJobs until here, as
// getMessage() won't work anymore if this job is re-sent, // getMessage() won't work anymore if this job is re-sent,
@ -314,6 +408,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
return; return;
} }
{ // Don't indent properly to reduce patch-noise.
#ifndef __puma #ifndef __puma
// Prevent receiveExperimentResults from modifying (or indirectly, via // Prevent receiveExperimentResults from modifying (or indirectly, via
// getDone and the campaign, deleting) jobs in the m_runningJobs queue. // getDone and the campaign, deleting) jobs in the m_runningJobs queue.
@ -337,33 +432,39 @@ void CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
ctrlmsg.add_workloadid(workloadID); // set workload id ctrlmsg.add_workloadid(workloadID); // set workload id
ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job ctrlmsg.set_job_size(1); // In 2nd priority the jobserver send only one job
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl; print_progress(ProgressType::Resend, workloadID, 1);
cout << ">>R[" << workloadID << "] \r" << flush;
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage());
}
} else if (m_js.noMoreExperiments() == false) { } else if (m_js.noMoreExperiments() == false) {
// Currently we have no workload (even the running-job-queue is empty!), but // Currently we have no workload (even the running-job-queue is empty!), but
// the campaign is not over yet. Minion can try again later. // the campaign is not over yet. Minion can try again later.
ctrlmsg.set_command(FailControlMessage::COME_AGAIN); ctrlmsg.set_command(FailControlMessage::COME_AGAIN);
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl; cout << "--[Server] No workload, come again..." << endl;
} else { } else {
// No more elements, and campaign is over. Minion can die. // No work & Camapaign is done. Go away.
ctrlmsg.set_command(FailControlMessage::DIE); ctrlmsg.set_command(FailControlMessage::DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl; }
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg); }
const bool success = AsyncSocket::sendMsg(m_socket, ctrlmsg, yield);
if (temp_exp != nullptr && success) {
AsyncSocket::sendMsg(m_socket, temp_exp->getMessage(), yield);
} }
} }
void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg) void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg,
yield_context yield)
{ {
int i; int i;
ExperimentData* exp = NULL; // Get exp* from running jobs ExperimentData* exp = NULL; // Get exp* from running jobs
if (ctrlmsg.workloadid_size() > 0) { if (ctrlmsg.workloadid_size() > 0) {
cout << " <<[" << ctrlmsg.workloadid(0) << "+" print_progress(ProgressType::Receive, ctrlmsg.workloadid(0),
<< ctrlmsg.workloadid_size() << "] \r" << flush; ctrlmsg.workloadid_size());
} }
// No yielding under locks ;)
std::vector<std::tuple<ExperimentData *, uint32_t>> msgs;
msgs.reserve(ctrlmsg.workloadid_size());
{ // Don't indent properly to reduce patch-noise.
#ifndef __puma #ifndef __puma
// Prevent re-sending jobs in sendPendingExperimentData: // Prevent re-sending jobs in sendPendingExperimentData:
// a) sendPendingExperimentData needs an intact job to serialize and send it. // a) sendPendingExperimentData needs an intact job to serialize and send it.
@ -374,23 +475,44 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex); boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
#endif #endif
for (i = 0; i < ctrlmsg.workloadid_size(); i++) { for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found const uint32_t id = ctrlmsg.workloadid(i);
// deserialize results, expect failures const bool success = m_js.m_runningJobs.remove(id, exp);
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage())) { msgs.emplace_back(success ? exp : nullptr, id);
m_js.m_runningJobs.insert(ctrlmsg.workloadid(i), exp);
} else {
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue
} }
}
/* Do I/O */
std::vector<std::tuple<ExperimentData *, uint32_t, bool>> recvd;
msgs.reserve(ctrlmsg.workloadid_size());
for (auto &&msg : msgs) {
auto &&exp = std::get<0>(msg);
if (exp != nullptr) {
const auto w_id = std::get<1>(msg);
const bool success = AsyncSocket::rcvMsg(
m_socket, exp->getMessage(), yield);
recvd.emplace_back(exp, w_id, success);
#ifdef SERVER_PERFORMANCE_MEASURE #ifdef SERVER_PERFORMANCE_MEASURE
++JobServer::m_DoneCount; ++JobServer::m_DoneCount;
#endif #endif
} else { } else {
// We can receive several results for the same workload id because // We can receive several results for the same workload id because
// we (may) distribute the (running) jobs to a *few* experiment-clients. // we (may) distribute the (running) jobs to a *few* experiment-clients.
cout << "[Server] Received another result for workload id [" ++m_js.m_d->redundant_results;
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl; AsyncSocket::dropMsg(m_socket, yield);
}
}
SocketComm::dropMsg(minion.getSocketDescriptor()); #ifndef __puma
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
#endif
for (auto &&rcv : recvd) {
auto &&exp = std::get<0>(rcv);
const auto received = std::get<2>(rcv);
if (received) {
m_js.m_doneJobs.Enqueue(exp);
} else {
const auto w_id = std::get<1>(rcv);
m_js.m_runningJobs.insert(w_id, exp);
} }
} }

View File

@ -1,16 +1,18 @@
#ifndef __JOB_SERVER_H__ #ifndef __JOB_SERVER_H__
#define __JOB_SERVER_H__ #define __JOB_SERVER_H__
#include "Minion.hpp"
#include "util/SynchronizedQueue.hpp" #include "util/SynchronizedQueue.hpp"
#include "util/SynchronizedCounter.hpp" #include "util/SynchronizedCounter.hpp"
#include "util/SynchronizedMap.hpp" #include "util/SynchronizedMap.hpp"
#include "config/FailConfig.hpp" #include "config/FailConfig.hpp"
#include "comm/ExperimentData.hpp"
#include "comm/FailControlMessage.pb.h" #include "comm/FailControlMessage.pb.h"
#include "comm/SocketComm.hpp" #include "comm/SocketComm.hpp"
#include <list> #include <list>
#include <ctime> #include <ctime>
#include <memory>
#include <string>
#ifndef __puma #ifndef __puma
#include <boost/thread.hpp> #include <boost/thread.hpp>
@ -30,14 +32,13 @@ class CommThread;
*/ */
class JobServer { class JobServer {
private: private:
struct impl;
std::shared_ptr<impl> m_d;
//! The TCP Port number //! The TCP Port number
int m_port; const unsigned short m_port;
//! TODO nice termination concept //! TODO nice termination concept
bool m_finish; bool m_finish;
//! Campaign signaled last expirement data set
bool m_noMoreExps;
//! the maximal number of threads spawned for TCP communication
unsigned m_maxThreads;
//! the maximal timeout per communication thread //! the maximal timeout per communication thread
int m_threadtimeout; int m_threadtimeout;
//! list of spawned threads //! list of spawned threads
@ -82,18 +83,7 @@ private:
void sendWork(int sockfd); void sendWork(int sockfd);
public: public:
JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false), JobServer(const unsigned short port = SERVER_COMM_TCP_PORT);
m_maxThreads(128), m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
{
SocketComm::init();
m_runid = std::time(0);
#ifndef __puma
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
#ifdef SERVER_PERFORMANCE_MEASURE
m_measureThread = new boost::thread(&JobServer::measure, this);
#endif
#endif
}
~JobServer() ~JobServer()
{ {
done(); done();
@ -130,50 +120,13 @@ public:
* @return \c true if no more parameter sets available, \c false otherwise * @return \c true if no more parameter sets available, \c false otherwise
* @see setNoMoreExperiments * @see setNoMoreExperiments
*/ */
bool noMoreExperiments() const { return m_noMoreExps; } bool noMoreExperiments() const;
/** /**
* The Campaign Controller may signal that the jobserver can stop listening * The Campaign Controller may signal that the jobserver can stop listening
* for client connections. * for client connections.
*/ */
void done() { m_finish = true; } void done();
};
/**
* @class CommThread
* Implementation of the communication threads.
* This class implements the actual communication
* with the minions.
*/
class CommThread {
private:
int m_sock; //! Socket descriptor of the connection
uint32_t m_job_size;
JobServer& m_js; //! Calling jobserver
// FIXME: Concerns are not really separated yet ;)
/**
* Called after minion calls for work.
* Tries to deque a parameter set non blocking, and
* sends it back to the requesting minion.
* @param minion The minion asking for input
*/
void sendPendingExperimentData(Minion& minion);
/**
* Called after minion offers a result message.
* Evaluates the Workload ID and puts the corresponding
* job result into the result queue.
* @param minion The minion offering results
* @param workloadID The workload id of the result message
*/
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
public:
CommThread(int sockfd, JobServer& p)
: m_sock(sockfd), m_job_size(1), m_js(p) { }
/**
* The thread's entry point.
*/
void operator()();
}; };
} // end-of-namespace: fail } // end-of-namespace: fail

View File

@ -1,72 +0,0 @@
/**
* \file Minion.hpp
* \brief The representation of a minion.
*/
#ifndef __MINION_HPP__
#define __MINION_HPP__
#include <string>
#include "comm/ExperimentData.hpp"
namespace fail {
/**
* \class Minion
*
* Contains all informations about a minion.
*/
class Minion {
private:
std::string hostname;
bool isWorking;
ExperimentData* currentExperimentData;
int sockfd;
public:
Minion() : isWorking(false), currentExperimentData(0), sockfd(-1) { }
/**
* Sets the socket descriptor.
* @param sock the new socket descriptor (used internal)
*/
void setSocketDescriptor(int sock) { sockfd = sock; }
/**
* Retrives the socket descriptor.
* @return the socket descriptor
*/
int getSocketDescriptor() const { return (sockfd); }
/**
* Returns the hostname of the minion.
* @return the hostname
*/
const std::string& getHostname() { return (hostname); }
/**
* Sets the hostname of the minion.
* @param host the hostname
*/
void setHostname(const std::string& host) { hostname = host; }
/**
* Returns the current ExperimentData which the minion is working with.
* @return a pointer of the current ExperimentData
*/
ExperimentData* getCurrentExperimentData() { return currentExperimentData; }
/**
* Sets the current ExperimentData which the minion is working with.
* @param exp the current ExperimentData
*/
void setCurrentExperimentData(ExperimentData* exp) { currentExperimentData = exp; }
/**
* Returns the current state of the minion.
* @return the current state
*/
bool isBusy() { return (isWorking); }
/**
* Sets the current state of the minion
* @param state the current state
*/
void setBusy(bool state) { isWorking = state; }
};
} // end-of-namespace: fail
#endif // __MINION_HPP__