Merge branch 'jobclientserver-fixes'

This commit is contained in:
Horst Schirmeier
2014-01-22 13:07:59 +01:00
9 changed files with 146 additions and 43 deletions

View File

@ -1,6 +1,7 @@
#include <string>
#include <errno.h>
#include <signal.h>
#include <poll.h>
#include "SocketComm.hpp"
@ -63,6 +64,17 @@ char * SocketComm::getBuf(int sockfd, int *size)
return buf;
}
int SocketComm::timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
{
struct pollfd pfd = {sockfd, POLLIN, 0};
int ret = poll(&pfd, 1, timeout);
if (ret > 0) {
return accept(sockfd, addr, addrlen);
}
errno = EWOULDBLOCK;
return -1;
}
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
{
ssize_t ret;

View File

@ -45,6 +45,16 @@ public:
*/
static bool dropMsg(int sockfd);
/**
* An accept() wrapper that times out (using poll(2))
* @param sockfd listening socket descriptor to accept connections from
* @param addr same as accept()'s
* @param addrlen same as accept()'s
* @param timeout timeout in milliseconds (see poll(2))
* \return < 0 on failure, > 0 for a new socket connection
*/
static int timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout);
private:
static char * getBuf(int sockfd, int *size);
static ssize_t safe_write(int fd, const void *buf, size_t count);

View File

@ -25,13 +25,14 @@ OPTION(CONFIG_FAST_BREAKPOINTS "Enable fast breakpoints (requires break
OPTION(CONFIG_FAST_WATCHPOINTS "Enable fast watchpoints (requires memory access events to be enabled)" OFF)
SET(SERVER_COMM_HOSTNAME "localhost" CACHE STRING "Job-server hostname or IP")
SET(SERVER_COMM_TCP_PORT "1111" CACHE STRING "Job-server TCP port")
SET(SERVER_OUT_QUEUE_SIZE "0" CACHE STRING "Queue size for outbound jobs (0 = unlimited)")
SET(SERVER_OUT_QUEUE_SIZE "10000" CACHE STRING "Queue size for outbound jobs (0 = unlimited)")
SET(SERVER_PERF_LOG_PATH "perf.log" CACHE STRING "A file name for storing the server's performance log (CSV)")
SET(SERVER_PERF_STEPPING_SEC "1" CACHE STRING "Stepping of performance measurements in seconds")
SET(CLIENT_RAND_BACKOFF_TSTART "3" CACHE STRING "Lower limit of client's backoff phase in seconds")
SET(CLIENT_RAND_BACKOFF_TEND "8" CACHE STRING "Upper limit of client's backoff phase in seconds")
SET(CLIENT_RETRY_COUNT "3" CACHE STRING "Client's number of reconnect retries")
SET(CLIENT_JOB_REQUEST_SEC "30" CACHE STRING "Time in seconds a client tries to get work for (to reduce client/server communication frequency)")
SET(CLIENT_JOB_INITIAL "1" CACHE STRING "Initial amount of jobs to request")
SET(CLIENT_JOB_LIMIT "1000" CACHE STRING "How many jobs can a client ask for")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/FailConfig.hpp.in

View File

@ -43,6 +43,7 @@
#define CLIENT_RETRY_COUNT @CLIENT_RETRY_COUNT@
#define CLIENT_JOB_REQUEST_SEC @CLIENT_JOB_REQUEST_SEC@
#define CLIENT_JOB_LIMIT @CLIENT_JOB_LIMIT@
#define CLIENT_JOB_INITIAL @CLIENT_JOB_INITIAL@
#define PROJECT_VERSION "@PROJECT_VERSION@"
#define FAIL_VERSION PROJECT_VERSION

View File

@ -34,28 +34,33 @@ void JobServer::addParam(ExperimentData* exp)
volatile unsigned JobServer::m_DoneCount = 0;
#endif
#ifndef __puma
boost::mutex CommThread::m_CommMutex;
#endif
ExperimentData *JobServer::getDone()
{
#ifndef __puma
if (m_undoneJobs.Size() == 0
&& noMoreExperiments()
&& m_runningJobs.Size() == 0
&& m_doneJobs.Size() == 0
&& m_inOutCounter.getValue() == 0) {
return 0;
ExperimentData *exp = m_doneJobs.Dequeue();
if (exp) {
m_inOutCounter.decrement();
}
ExperimentData *exp = NULL;
exp = m_doneJobs.Dequeue();
m_inOutCounter.decrement();
return exp;
#endif
}
void JobServer::setNoMoreExperiments()
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
// currently not really necessary, as we only non-blockingly dequeue:
m_undoneJobs.setIsFinished();
m_noMoreExps = true;
if (m_undoneJobs.Size() == 0 &&
noMoreExperiments() &&
m_runningJobs.Size() == 0) {
m_doneJobs.setIsFinished();
}
}
#ifdef SERVER_PERFORMANCE_MEASURE
void JobServer::measure()
{
@ -156,11 +161,15 @@ void JobServer::run()
boost::thread* th;
while (!m_finish){
// Accept connection
int cs = accept(s, (struct sockaddr*)&clientaddr, &clen);
if (cs == -1) {
perror("accept");
// TODO: Log-level?
return;
int cs = SocketComm::timedAccept(s, (struct sockaddr*)&clientaddr, &clen, 100);
if (cs < 0) {
if (errno != EWOULDBLOCK) {
perror("poll/accept");
// TODO: Log-level?
return;
} else {
continue;
}
}
// Spawn a thread for further communication,
// and add this thread to a list threads
@ -257,10 +266,6 @@ void CommThread::sendPendingExperimentData(Minion& minion)
} else {
break;
}
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
}
if (exp.size() != 0) {
ctrlmsg.set_job_size(exp.size());
@ -271,8 +276,22 @@ void CommThread::sendPendingExperimentData(Minion& minion)
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
for (i = 0; i < ctrlmsg.job_size(); i++) {
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
// delay insertion into m_runningJobs until here, as
// getMessage() won't work anymore if this job is re-sent,
// received, and deleted in the meantime
if (!m_js.m_runningJobs.insert(exp.front()->getWorkloadID(), exp.front())) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
exp.pop_front();
} else {
// add remaining jobs back to the queue
cout << "!![Server] failed to send scheduled " << exp.size() << " jobs" << endl;
while (exp.size()) {
m_js.m_undoneJobs.Enqueue(exp.front());
exp.pop_front();
}
break;
}
@ -285,7 +304,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
// Prevent receiveExperimentResults from modifying (or indirectly, via
// getDone and the campaign, deleting) jobs in the m_runningJobs queue.
// (See details in receiveExperimentResults)
boost::unique_lock<boost::mutex> lock(m_CommMutex);
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
#endif
if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
// (This picks one running job.)
@ -338,7 +357,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
// by the campaign at any time.
// Additionally, receiving a result overwrites the job's contents. This
// already may cause breakage in sendPendingExperimentData (a).
boost::unique_lock<boost::mutex> lock(m_CommMutex);
boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex);
#endif
for (i = 0; i < ctrlmsg.workloadid_size(); i++) {
if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found
@ -361,6 +380,12 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
}
}
// all results complete?
if (m_js.m_undoneJobs.Size() == 0 &&
m_js.noMoreExperiments() &&
m_js.m_runningJobs.Size() == 0) {
m_js.m_doneJobs.setIsFinished();
}
}
} // end-of-namespace: fail

View File

@ -66,6 +66,9 @@ private:
SynchronizedQueue<ExperimentData*> m_undoneJobs;
//! List of finished experiment results.
SynchronizedQueue<ExperimentData*> m_doneJobs;
#ifndef __puma
boost::mutex m_CommMutex; //! to synchronise the communication
#endif // __puma
friend class CommThread; //!< CommThread is allowed access the job queues.
/**
* The actual startup of the Jobserver.
@ -93,10 +96,13 @@ public:
}
~JobServer()
{
done();
#ifndef __puma
// Cleanup of m_serverThread, etc.
m_serverThread->join();
delete m_serverThread;
#ifdef SERVER_PERFORMANCE_MEASURE
m_measureThread->join();
delete m_measureThread;
#endif
#endif // __puma
@ -118,7 +124,7 @@ public:
* sets. We need this, as we allow concurrent parameter generation and
* distribution.
*/
void setNoMoreExperiments() { m_noMoreExps = true; }
void setNoMoreExperiments();
/**
* Checks whether there are no more experiment parameter sets.
* @return \c true if no more parameter sets available, \c false otherwise
@ -162,9 +168,6 @@ private:
*/
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
public:
#ifndef __puma
static boost::mutex m_CommMutex; //! to synchronise the communication
#endif // __puma
CommThread(int sockfd, JobServer& p)
: m_sock(sockfd), m_job_size(1), m_js(p) { }
/**

View File

@ -6,10 +6,14 @@ using namespace std;
namespace fail {
JobClient::JobClient(const std::string& server, int port)
: m_server(server), m_server_port(port),
m_server_runid(0), // server accepts this for virgin clients
m_job_runtime_total(0),
m_job_throughput(CLIENT_JOB_INITIAL), // will be corrected after measurement
m_job_total(0),
m_connect_failed(false)
{
SocketComm::init();
m_server_port = port;
m_server = server;
m_server_ent = gethostbyname(m_server.c_str());
cout << "JobServer: " << m_server.c_str() << endl;
if(m_server_ent == NULL) {
@ -18,8 +22,6 @@ JobClient::JobClient(const std::string& server, int port)
exit(1);
}
srand(time(NULL)); // needed for random backoff (see connectToServer)
m_server_runid = 0; // server accepts this for virgin clients
m_job_throughput = 1; // client gets only one job at the first request
}
JobClient::~JobClient()
@ -30,6 +32,11 @@ JobClient::~JobClient()
bool JobClient::connectToServer()
{
// don't retry server connects to speedup shutdown at campaign end
if (m_connect_failed) {
return false;
}
int retries = CLIENT_RETRY_COUNT;
while (true) {
// Connect to server
@ -67,6 +74,7 @@ bool JobClient::connectToServer()
cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT << " times); "
<< "I'll give it up!" << endl;
// TODO: Log-level?
m_connect_failed = true;
return false; // finally: unable to connect, give it up :-(
}
break; // connected! :-)
@ -79,6 +87,11 @@ bool JobClient::connectToServer()
bool JobClient::getParam(ExperimentData& exp)
{
// die immediately if a previous connect already failed
if (m_connect_failed) {
return false;
}
while (1) { // Here we try to acquire a parameter set
switch (tryToGetExperimentData(exp)) {
// Jobserver will sent workload, params are set in \c exp
@ -135,10 +148,16 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
ExperimentData* temp_exp = new ExperimentData(exp.getMessage().New());
if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) {
// Failed to receive message? Retry.
close(m_sockfd);
// looks like we won't receive more jobs now, cleanup
delete &temp_exp->getMessage();
delete temp_exp;
return FailControlMessage::COME_AGAIN;
// did a previous loop iteration succeed?
if (m_parameters.size() > 0) {
break;
} else {
// nothing to do now, retry later
return FailControlMessage::COME_AGAIN;
}
}
temp_exp->setWorkloadID(ctrlmsg.workloadid(i)); //Store workload id of experiment data
@ -188,10 +207,10 @@ bool JobClient::sendResult(ExperimentData& result)
m_job_runtime.reset();
m_job_runtime.startTimer();
m_job_total += m_results.size();
sendResultsToServer();
// tell caller whether we failed phoning home
return sendResultsToServer();
}
//If there are more jobs for the experiment store result
return true;
} else {
//Stop time measurement and calculate new throughput
@ -219,6 +238,14 @@ bool JobClient::sendResultsToServer()
{
if (m_results.size() != 0) {
if (!connectToServer()) {
// clear results, although we didn't get them to safety; otherwise,
// subsequent calls to sendResult() may and the destructor will
// retry sending them, resulting in a large shutdown time
while (m_results.size()) {
delete &m_results.front()->getMessage();
delete m_results.front();
m_results.pop_front();
}
return false;
}
@ -240,10 +267,16 @@ bool JobClient::sendResultsToServer()
cout << "]";
// TODO: Log-level?
SocketComm::sendMsg(m_sockfd, ctrlmsg);
if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) {
close(m_sockfd);
return false;
}
for (i = 0; i < ctrlmsg.job_size() ; i++) {
SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage());
if (!SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage())) {
close(m_sockfd);
return false;
}
delete &m_results.front()->getMessage();
delete m_results.front();
m_results.pop_front();

View File

@ -37,6 +37,8 @@ private:
std::deque<ExperimentData*> m_parameters;
std::deque<ExperimentData*> m_results;
bool m_connect_failed;
bool connectToServer();
bool sendResultsToServer();
FailControlMessage_Command tryToGetExperimentData(ExperimentData& exp);

View File

@ -18,14 +18,15 @@ class SynchronizedQueue { // Adapted from: http://www.quantnet.com/cplusplus-mul
private:
std::queue<T> m_queue; //!< Use STL queue to store data
unsigned capacity;
bool finished;
#ifndef __puma
boost::mutex m_mutex; //!< The mutex to synchronise on
boost::condition_variable m_cond; //!< The condition to wait for
boost::condition_variable m_cond_capacity; //!< Another condition to wait for
#endif
public:
SynchronizedQueue() : capacity(0) {}
SynchronizedQueue(unsigned capacity) : capacity(capacity) {}
SynchronizedQueue() : capacity(0), finished(false) {}
SynchronizedQueue(unsigned capacity) : capacity(capacity), finished(false) {}
int Size()
{
#ifndef __puma
@ -69,6 +70,10 @@ public:
// again after the wait
#ifndef __puma
while (m_queue.size() == 0) {
if (finished) {
// default-constructed T, 0 for integral types
return T();
}
m_cond.wait(lock);
}
#endif
@ -116,6 +121,17 @@ public:
return false;
}
} // Lock is automatically released here
void setIsFinished(bool value = true)
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_mutex);
#endif
finished = value;
#ifndef __puma
m_cond.notify_all();
#endif
}
};
} // end-of-namespace: fail