properly deal with clients that talked to another campaign server before
A campaign server now tells all clients a unique run ID (the UNIX timestamp when it was started). This allows us to ignore results from "old" clients that talked to another server before, and to tell them to die. git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1677 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -12,5 +12,8 @@ message FailControlMessage {
|
|||||||
|
|
||||||
required Command command = 1;
|
required Command command = 1;
|
||||||
optional uint32 workloadID = 2;
|
optional uint32 workloadID = 2;
|
||||||
required uint64 build_id = 3; // identifying the client/server build (e.g., build time in unixtime format)
|
// identifying the client/server build (e.g., build time in unixtime format)
|
||||||
|
required uint64 build_id = 3;
|
||||||
|
// campaign server run ID: prevents old clients talking to new servers
|
||||||
|
optional uint64 run_id = 4;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -199,10 +199,23 @@ void CommThread::operator()()
|
|||||||
|
|
||||||
switch (ctrlmsg.command()) {
|
switch (ctrlmsg.command()) {
|
||||||
case FailControlMessage_Command_NEED_WORK:
|
case FailControlMessage_Command_NEED_WORK:
|
||||||
|
// let old clients die
|
||||||
|
if (!ctrlmsg.has_run_id() || (ctrlmsg.run_id() != 0 && ctrlmsg.run_id() != m_js.m_runid)) {
|
||||||
|
cout << "!![Server] telling old client to die" << endl;
|
||||||
|
ctrlmsg.Clear();
|
||||||
|
ctrlmsg.set_command(FailControlMessage_Command_DIE);
|
||||||
|
ctrlmsg.set_build_id(42);
|
||||||
|
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
|
||||||
|
}
|
||||||
// give minion something to do..
|
// give minion something to do..
|
||||||
sendPendingExperimentData(minion);
|
sendPendingExperimentData(minion);
|
||||||
break;
|
break;
|
||||||
case FailControlMessage_Command_RESULT_FOLLOWS:
|
case FailControlMessage_Command_RESULT_FOLLOWS:
|
||||||
|
// ignore old client's results
|
||||||
|
if (!ctrlmsg.has_run_id() || (ctrlmsg.run_id() != 0 && ctrlmsg.run_id() != m_js.m_runid)) {
|
||||||
|
cout << "!![Server] ignoring old client's results" << endl;
|
||||||
|
break;
|
||||||
|
}
|
||||||
// get results and put to done queue.
|
// get results and put to done queue.
|
||||||
receiveExperimentResults(minion, ctrlmsg.workloadid());
|
receiveExperimentResults(minion, ctrlmsg.workloadid());
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -8,6 +8,7 @@
|
|||||||
#include "config/FailConfig.hpp"
|
#include "config/FailConfig.hpp"
|
||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
|
#include <ctime>
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
#include <boost/thread.hpp>
|
#include <boost/thread.hpp>
|
||||||
@ -37,7 +38,7 @@ private:
|
|||||||
unsigned m_maxThreads;
|
unsigned m_maxThreads;
|
||||||
//! the maximal timeout per communication thread
|
//! the maximal timeout per communication thread
|
||||||
int m_threadtimeout;
|
int m_threadtimeout;
|
||||||
//! A of spawned threads
|
//! list of spawned threads
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
typedef std::list<boost::thread*> Tthreadlist;
|
typedef std::list<boost::thread*> Tthreadlist;
|
||||||
Tthreadlist m_threadlist;
|
Tthreadlist m_threadlist;
|
||||||
@ -45,6 +46,9 @@ private:
|
|||||||
boost::thread* m_serverThread;
|
boost::thread* m_serverThread;
|
||||||
#endif // puma
|
#endif // puma
|
||||||
|
|
||||||
|
//! unique server run ID
|
||||||
|
uint64_t m_runid;
|
||||||
|
|
||||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||||
static volatile unsigned m_DoneCount; //! the number of finished jobs
|
static volatile unsigned m_DoneCount; //! the number of finished jobs
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
@ -75,6 +79,7 @@ public:
|
|||||||
JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false),
|
JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false),
|
||||||
m_maxThreads(128), m_threadtimeout(0)
|
m_maxThreads(128), m_threadtimeout(0)
|
||||||
{
|
{
|
||||||
|
m_runid = std::time(0);
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
|
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
|
||||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||||
|
|||||||
@ -15,6 +15,7 @@ JobClient::JobClient(const std::string& server, int port)
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
srand(time(NULL)); // needed for random backoff (see connectToServer)
|
srand(time(NULL)); // needed for random backoff (see connectToServer)
|
||||||
|
m_server_runid = 0; // server accepts this for virgin clients
|
||||||
}
|
}
|
||||||
|
|
||||||
bool JobClient::connectToServer()
|
bool JobClient::connectToServer()
|
||||||
@ -91,11 +92,15 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
|
|||||||
FailControlMessage ctrlmsg;
|
FailControlMessage ctrlmsg;
|
||||||
ctrlmsg.set_command(FailControlMessage_Command_NEED_WORK);
|
ctrlmsg.set_command(FailControlMessage_Command_NEED_WORK);
|
||||||
ctrlmsg.set_build_id(42);
|
ctrlmsg.set_build_id(42);
|
||||||
|
ctrlmsg.set_run_id(m_server_runid);
|
||||||
|
|
||||||
SocketComm::sendMsg(m_sockfd, ctrlmsg);
|
SocketComm::sendMsg(m_sockfd, ctrlmsg);
|
||||||
ctrlmsg.Clear();
|
ctrlmsg.Clear();
|
||||||
SocketComm::rcvMsg(m_sockfd, ctrlmsg);
|
SocketComm::rcvMsg(m_sockfd, ctrlmsg);
|
||||||
|
|
||||||
|
// now we know the current run ID
|
||||||
|
m_server_runid = ctrlmsg.run_id();
|
||||||
|
|
||||||
switch (ctrlmsg.command()) {
|
switch (ctrlmsg.command()) {
|
||||||
case FailControlMessage_Command_WORK_FOLLOWS:
|
case FailControlMessage_Command_WORK_FOLLOWS:
|
||||||
SocketComm::rcvMsg(m_sockfd, exp.getMessage());
|
SocketComm::rcvMsg(m_sockfd, exp.getMessage());
|
||||||
@ -119,6 +124,7 @@ bool JobClient::sendResult(ExperimentData& result)
|
|||||||
FailControlMessage ctrlmsg;
|
FailControlMessage ctrlmsg;
|
||||||
ctrlmsg.set_command(FailControlMessage_Command_RESULT_FOLLOWS);
|
ctrlmsg.set_command(FailControlMessage_Command_RESULT_FOLLOWS);
|
||||||
ctrlmsg.set_build_id(42);
|
ctrlmsg.set_build_id(42);
|
||||||
|
ctrlmsg.set_run_id(m_server_runid);
|
||||||
ctrlmsg.set_workloadid(result.getWorkloadID());
|
ctrlmsg.set_workloadid(result.getWorkloadID());
|
||||||
cout << "[Client] Sending back result [" << std::dec << result.getWorkloadID() << "]..." << endl;
|
cout << "[Client] Sending back result [" << std::dec << result.getWorkloadID() << "]..." << endl;
|
||||||
// TODO: Log-level?
|
// TODO: Log-level?
|
||||||
|
|||||||
@ -26,6 +26,7 @@ private:
|
|||||||
int m_server_port;
|
int m_server_port;
|
||||||
struct hostent* m_server_ent;
|
struct hostent* m_server_ent;
|
||||||
int m_sockfd;
|
int m_sockfd;
|
||||||
|
uint64_t m_server_runid;
|
||||||
|
|
||||||
bool connectToServer();
|
bool connectToServer();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user