Fixed whitespaces.
git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@2067 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -40,7 +40,7 @@ boost::mutex CommThread::m_CommMutex;
|
||||
|
||||
ExperimentData *JobServer::getDone()
|
||||
{
|
||||
|
||||
|
||||
#ifndef __puma
|
||||
if (m_undoneJobs.Size() == 0
|
||||
&& noMoreExperiments()
|
||||
@ -49,7 +49,7 @@ ExperimentData *JobServer::getDone()
|
||||
&& m_inOutCounter.getValue() == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
ExperimentData *exp = NULL;
|
||||
exp = m_doneJobs.Dequeue();
|
||||
m_inOutCounter.decrement();
|
||||
@ -115,7 +115,7 @@ void JobServer::run()
|
||||
{
|
||||
struct sockaddr_in clientaddr;
|
||||
socklen_t clen = sizeof(clientaddr);
|
||||
|
||||
|
||||
// implementation of server-client communication
|
||||
int s;
|
||||
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
|
||||
@ -131,20 +131,20 @@ void JobServer::run()
|
||||
// TODO: Log-level?
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/* IPv4, bind to all interfaces */
|
||||
struct sockaddr_in saddr;
|
||||
saddr.sin_family = AF_INET;
|
||||
saddr.sin_port = htons(m_port);
|
||||
saddr.sin_addr.s_addr = htons(INADDR_ANY);
|
||||
|
||||
|
||||
/* bind to port */
|
||||
if (::bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) {
|
||||
perror("bind");
|
||||
// TODO: Log-level?
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/* Listen with a backlog of maxThreads */
|
||||
if (listen(s, m_maxThreads) == -1) {
|
||||
perror("listen");
|
||||
@ -156,7 +156,7 @@ void JobServer::run()
|
||||
#ifndef __puma
|
||||
boost::thread* th;
|
||||
while (!m_finish){
|
||||
// Accept connection
|
||||
// Accept connection
|
||||
int cs = accept(s, (struct sockaddr*)&clientaddr, &clen);
|
||||
if (cs == -1) {
|
||||
perror("accept");
|
||||
@ -173,7 +173,7 @@ void JobServer::run()
|
||||
m_threadlist.remove_if(timed_join_successful(m_threadtimeout));
|
||||
} while (m_threadlist.size() == m_maxThreads);
|
||||
}
|
||||
// Start new thread
|
||||
// Start new thread
|
||||
th = new boost::thread(CommThread(cs, *this));
|
||||
m_threadlist.push_back(th);
|
||||
}
|
||||
@ -243,13 +243,13 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
std::deque<ExperimentData*> exp;
|
||||
ExperimentData* temp_exp = 0;
|
||||
FailControlMessage ctrlmsg;
|
||||
|
||||
|
||||
ctrlmsg.set_build_id(42);
|
||||
ctrlmsg.set_run_id(m_js.m_runid);
|
||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||
|
||||
for (i = 0; i < m_job_size ; i++) {
|
||||
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
|
||||
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
|
||||
// Got an element from queue, assign ID to workload and send to minion
|
||||
workloadID = m_js.m_counter.increment(); // increment workload counter
|
||||
temp_exp->setWorkloadID(workloadID); // store ID for identification when receiving result
|
||||
@ -258,21 +258,21 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
|
||||
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
|
||||
}
|
||||
}
|
||||
if (exp.size() != 0) {
|
||||
ctrlmsg.set_job_size(exp.size());
|
||||
|
||||
|
||||
cout << " >>[";
|
||||
for ( i = 0; i < exp.size() ; i++) {
|
||||
cout << " "<< ctrlmsg.workloadid(i) <<" ";
|
||||
}
|
||||
cout << "] " << flush;
|
||||
|
||||
|
||||
|
||||
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
for (i = 0; i < ctrlmsg.job_size() ; i++) {
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
||||
@ -280,7 +280,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
return;
|
||||
@ -314,7 +314,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
SocketComm::sendMsg(minion.getSocketDescriptor(), temp_exp->getMessage());
|
||||
}
|
||||
} else if (m_js.noMoreExperiments() == false) {
|
||||
} else if (m_js.noMoreExperiments() == false) {
|
||||
// Currently we have no workload (even the running-job-queue is empty!), but
|
||||
// the campaign is not over yet. Minion can try again later.
|
||||
ctrlmsg.set_command(FailControlMessage::COME_AGAIN);
|
||||
@ -362,7 +362,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
|
||||
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
||||
cout << "[Server] Received another result for workload id ["
|
||||
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl;
|
||||
|
||||
|
||||
// TODO: Any need for error-handling here?
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,21 +18,21 @@
|
||||
namespace fail {
|
||||
|
||||
class CommThread;
|
||||
|
||||
|
||||
/**
|
||||
* \class JobServer
|
||||
* The server supplies the Minions with ExperimentData's and receives the result data.
|
||||
*
|
||||
*
|
||||
* Manages the campaigns parameter distributions. The Campaign Controller can add
|
||||
* experiment parameter sets, which the Jobserver will distribute to requesting
|
||||
* clients. The campaign controller can wait for all results, or a timeout.
|
||||
*/
|
||||
class JobServer {
|
||||
private:
|
||||
//! The TCP Port number
|
||||
//! The TCP Port number
|
||||
int m_port;
|
||||
//! TODO nice termination concept
|
||||
bool m_finish;
|
||||
//! TODO nice termination concept
|
||||
bool m_finish;
|
||||
//! Campaign signaled last expirement data set
|
||||
bool m_noMoreExps;
|
||||
//! the maximal number of threads spawned for TCP communication
|
||||
@ -43,7 +43,7 @@ private:
|
||||
#ifndef __puma
|
||||
typedef std::list<boost::thread*> Tthreadlist;
|
||||
Tthreadlist m_threadlist;
|
||||
|
||||
|
||||
boost::thread* m_serverThread;
|
||||
#endif // puma
|
||||
|
||||
@ -77,13 +77,13 @@ private:
|
||||
#endif
|
||||
void sendWork(int sockfd);
|
||||
|
||||
public:
|
||||
public:
|
||||
JobServer(int port = SERVER_COMM_TCP_PORT) : m_port(port), m_finish(false), m_noMoreExps(false),
|
||||
m_maxThreads(128), m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
|
||||
{
|
||||
{
|
||||
m_runid = std::time(0);
|
||||
#ifndef __puma
|
||||
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
|
||||
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
m_measureThread = new boost::thread(&JobServer::measure, this);
|
||||
#endif
|
||||
@ -123,8 +123,8 @@ public:
|
||||
* @see setNoMoreExperiments
|
||||
*/
|
||||
bool noMoreExperiments() const { return m_noMoreExps; }
|
||||
|
||||
/**
|
||||
|
||||
/**
|
||||
* The Campaign Controller can signalize, that the jobserver can
|
||||
* stop listening for client connections.
|
||||
*/
|
||||
@ -134,7 +134,7 @@ public:
|
||||
/**
|
||||
* @class CommThread
|
||||
* Implementation of the communication threads.
|
||||
* This class implements the actual communication
|
||||
* This class implements the actual communication
|
||||
* with the minions.
|
||||
*/
|
||||
class CommThread {
|
||||
@ -154,10 +154,10 @@ private:
|
||||
/**
|
||||
* Called after minion offers a result message.
|
||||
* Evaluates the Workload ID and puts the corresponding
|
||||
* job result into the result queue.
|
||||
* job result into the result queue.
|
||||
* @param minion The minion offering results
|
||||
* @param workloadID The workload id of the result message
|
||||
*/
|
||||
*/
|
||||
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
|
||||
public:
|
||||
#ifndef __puma
|
||||
|
||||
Reference in New Issue
Block a user