Fail* directories reorganized, Code-cleanup (-> coding-style), Typos+comments fixed.

git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1321 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
adrian
2012-06-08 20:09:43 +00:00
parent d474a5b952
commit 2575604b41
866 changed files with 1848 additions and 1879 deletions

View File

@ -0,0 +1,8 @@
set(SRCS
CampaignManager.cc
JobServer.cc
)
add_library(cpn ${SRCS})
add_dependencies(cpn comm)

25
src/core/cpn/Campaign.hpp Normal file
View File

@ -0,0 +1,25 @@
#ifndef __CAMPAIGN_HPP__
#define __CAMPAIGN_HPP__
namespace fail {
/**
* \class Campaign
*
* Basic interface for user-defined campaigns. To create a new
* campaign, derive your own class from Campaign,
* define the run method, and add it to the CampaignManager.
*/
class Campaign {
public:
Campaign() { }
/**
* Defines the campaign.
* @return \c true if the campaign was successful, \c false otherwise
*/
virtual bool run() = 0;
};
} // end-of-namespace: fail
#endif // __CAMPAIGN_HPP__

View File

@ -0,0 +1,7 @@
#include "CampaignManager.hpp"
namespace fail {
CampaignManager campaignmanager;
} // end-of-namespace: fail

View File

@ -0,0 +1,77 @@
/**
* \brief The manager for an entire campaign
*/
#ifndef __CAMPAIGN_MANAGER_HPP__
#define __CAMPAIGN_MANAGER_HPP__
#include "sal/SALInst.hpp"
#include "comm/ExperimentData.hpp"
#include "JobServer.hpp"
#include "Campaign.hpp"
namespace fail {
/**
* \class CampaignManager
*
* The CampaignManager allows a user-campaign access to all constant
* simulator information and forwards single experiments to the JobServer.
*/
class CampaignManager {
private:
JobServer m_jobserver;
Campaign* m_currentCampaign;
public:
CampaignManager() { }
/**
* Executes a user campaign
*/
bool runCampaign(Campaign* c)
{
m_currentCampaign = c;
bool ret = c->run();
m_jobserver.done();
return ret;
}
/**
* Returns a const reference for acquiring constant simulator specific information.
* e.g., Registernames, to ease experiment data construction.
* The campaign description is not allowed to change the simulator
* state, as the actual simulation runs within another process (Minion)
* @return constant reference to the current simulator backend.
*/
SimulatorController const& getSimulator() const { return simulator; }
/**
* Add a experiment parameter set.
* The user campaign has to allocate the Parameter object,
* and deallocate it after result reception.
* A Parameter set includes space for results.
* @param exp A pointer to a ExperimentData set.
*/
void addParam(ExperimentData* exp) { m_jobserver.addParam(exp); }
/**
* A user campaign can request a single result (blocking) from the queue.
* @return Pointer to a parameter object with filled result data
* @see addParam()
*/
ExperimentData* getDone() { return m_jobserver.getDone(); }
/**
* Signal, that there will not come any further parameter sets.
*/
void noMoreParameters() { m_jobserver.setNoMoreExperiments(); }
/**
* User campaign has finished.
*/
void done() { m_jobserver.done(); }
/**
* Wait actively, until all experiments expired.
*/
// void waitForCompletion();
};
extern CampaignManager campaignmanager;
} // end-of-namespace: fail
#endif // __CAMPAIGN_MANAGER_HPP__

313
src/core/cpn/JobServer.cc Normal file
View File

@ -0,0 +1,313 @@
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include <string.h>
#include <arpa/inet.h>
#include "comm/msg/FailControlMessage.pb.h"
#include "comm/SocketComm.hpp"
#include "JobServer.hpp"
#include "Minion.hpp"
#ifndef __puma
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#endif
using namespace std;
namespace fail {
void JobServer::addParam(ExperimentData* exp)
{
#ifndef __puma
m_undoneJobs.Enqueue(exp);
#endif
}
#ifdef SERVER_PERFORMANCE_MEASURE
volatile unsigned JobServer::m_DoneCount = 0;
#endif
ExperimentData *JobServer::getDone()
{
// FIXME race condition, need to synchronize with
// sendPendingExperimentData() and receiveExperimentResults()
#ifndef __puma
if (m_undoneJobs.Size() == 0
&& noMoreExperiments()
&& m_runningJobs.Size() == 0
&& m_doneJobs.Size() == 0) {
// FRICKEL workaround
sleep(1);
ExperimentData *exp = NULL;
if (m_doneJobs.Dequeue_nb(exp)) {
return exp;
}
return 0;
}
return m_doneJobs.Dequeue();
#endif
}
#ifdef SERVER_PERFORMANCE_MEASURE
void JobServer::measure()
{
// TODO: Log-level?
cout << "\n[Server] Logging throughput in \"" << SERVER_PERF_LOG_PATH << "\"..." << endl;
ofstream m_file(SERVER_PERF_LOG_PATH, std::ios::trunc); // overwrite existing perf-logs
if (!m_file.is_open()) {
cerr << "[Server] Perf-logging has been enabled"
<< "but I was not able to write the log-file \""
<< SERVER_PERF_LOG_PATH << "\"." << endl;
exit(1);
}
unsigned counter = 0;
m_file << "time\tthroughput" << endl;
unsigned diff = 0;
while (!m_finish) {
// Format: 1st column (seconds)[TAB]2nd column (throughput)
m_file << counter << "\t" << (m_DoneCount - diff) << endl;
counter += SERVER_PERF_STEPPING_SEC;
diff = m_DoneCount;
sleep(SERVER_PERF_STEPPING_SEC);
}
// NOTE: Summing up the values written in the 2nd column does not
// necessarily yield the number of completed experiments/jobs
// (due to thread-scheduling behaviour -> not sync'd!)
}
#endif // SERVER_PERFORMANCE_MEASURE
#ifndef __puma
/**
* This is a predicate class for the remove_if operator on the thread
* list. The operator waits for timeout milliseconds to join each
* thread in the list. If the join was successful, the exited thread
* is deallocated and removed from the list.
*/
struct timed_join_successful {
int timeout_ms;
timed_join_successful(int timeout)
: timeout_ms(timeout) { }
bool operator()(boost::thread* threadelement)
{
boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms);
if (threadelement->timed_join(timeout)) {
delete threadelement;
return true;
} else {
return false;
}
}
};
#endif
void JobServer::run()
{
struct sockaddr_in clientaddr;
socklen_t clen = sizeof(clientaddr);
// implementation of server-client communication
int s;
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
// TODO: Log-level?
return;
}
/* Enable address reuse */
int on = 1;
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
perror("setsockopt");
// TODO: Log-level?
return;
}
/* IPv4, Port: 1111, accept all IP adresses */
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(m_port);
saddr.sin_addr.s_addr = htons(INADDR_ANY);
/* bind to port */
if (bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) {
perror("bind");
// TODO: Log-level?
return;
}
/* Listen with a backlog of maxThreads */
if (listen(s, m_maxThreads) == -1) {
perror("listen");
// TODO: Log-level?
return;
}
cout << "JobServer listening...." << endl;
// TODO: Log-level?
#ifndef __puma
boost::thread* th;
while(!m_finish){
// Accept connection
int cs = accept(s, (struct sockaddr*)&clientaddr, &clen);
if (cs == -1) {
perror("accept");
// TODO: Log-level?
return;
}
// Spawn a thread for further communication,
// and add this thread to a list threads
// We can limit the generation of threads here.
if (m_threadlist.size() < m_maxThreads) {
th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th);
} else {
// Run over list with a timed_join,
// removing finished threads.
do {
m_threadlist.remove_if(timed_join_successful(m_threadtimeout));
} while(m_threadlist.size() == m_maxThreads);
// Start new thread
th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th);
}
}
close(s);
// when all undone Jobs are distributed -> call a timed_join on all spawned
// TODO: interrupt threads that do not want to join..
while (m_threadlist.size() > 0)
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
#endif
}
void CommThread::operator()()
{
// The communication thread implementation:
Minion minion;
FailControlMessage ctrlmsg;
minion.setSocketDescriptor(m_sock);
if (!SocketComm::rcvMsg(minion.getSocketDescriptor(), ctrlmsg)) {
cout << "!![Server] failed to read complete message from client" << endl;
close(m_sock);
return;
}
switch (ctrlmsg.command()) {
case FailControlMessage_Command_NEED_WORK:
// give minion something to do..
sendPendingExperimentData(minion);
break;
case FailControlMessage_Command_RESULT_FOLLOWS:
// get results and put to done queue.
receiveExperimentResults(minion, ctrlmsg.workloadid());
break;
default:
// hm.. don't know what to do. please die.
cout << "!![Server] no idea what to do with command #"
<< ctrlmsg.command() << ", telling minion to die." << endl;
ctrlmsg.Clear();
ctrlmsg.set_command(FailControlMessage_Command_DIE);
ctrlmsg.set_build_id(42);
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
}
close(m_sock);
}
#ifndef __puma
boost::mutex CommThread::m_CommMutex;
#endif // __puma
void CommThread::sendPendingExperimentData(Minion& minion)
{
FailControlMessage ctrlmsg;
ctrlmsg.set_build_id(42);
ExperimentData * exp = 0;
if (m_js.m_undoneJobs.Dequeue_nb(exp) == true) {
// Got an element from queue, assign ID to workload and send to minion
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
exp->setWorkloadID(workloadID); // store ID for identification when receiving result
if (!m_js.m_runningJobs.insert(workloadID, exp)) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
cout << ">>[" << workloadID << "] " << flush;
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
return;
}
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
if ((exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority
// (This picks one running job.)
// TODO: Improve selection of parameter set to be resent:
// - currently: Linear complexity!
// - pick entry at random?
// - retry counter for each job?
// Implement resend of running-parameter sets to improve campaign speed
// and to prevent result loss due to (unexpected) termination of experiment
// clients.
// (Note: Therefore we need to be aware of receiving multiple results for a
// single parameter-set, @see receiveExperimentResults.)
uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously)
// Resend the parameter-set.
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
cout << ">>R[" << workloadID << "] " << flush;
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::sendMsg(minion.getSocketDescriptor(), exp->getMessage());
} else if (m_js.noMoreExperiments() == false) {
// Currently we have no workload (even the running-job-queue is empty!), but
// the campaign is not over yet. Minion can try again later.
ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN);
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl;
} else {
// No more elements, and campaign is over. Minion can die.
ctrlmsg.set_command(FailControlMessage_Command_DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl;
SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg);
}
}
void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
ExperimentData * exp; // Get exp* from running jobs
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
cout << "<<[" << workloadID << "] " << flush;
if (m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
SocketComm::rcvMsg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results.
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue..
#ifdef SERVER_PERFORMANCE_MEASURE
++JobServer::m_DoneCount;
#endif
} else {
// We can receive several results for the same workload id because
// we (may) distribute the (running) jobs to a *few* experiment-clients.
cout << "[Server] Received another result for workload id ["
<< workloadID << "] -- ignored." << endl;
// TODO: Any need for error-handling here?
}
}
} // end-of-namespace: fail

168
src/core/cpn/JobServer.hpp Normal file
View File

@ -0,0 +1,168 @@
#ifndef __JOB_SERVER_H__
#define __JOB_SERVER_H__
#include "Minion.hpp"
#include "util/SynchronizedQueue.hpp"
#include "util/SynchronizedCounter.hpp"
#include "util/SynchronizedMap.hpp"
#include "config/FailConfig.hpp"
#include <list>
#ifndef __puma
#include <boost/thread.hpp>
#endif
namespace fail {
class CommThread;
/**
* \class JobServer
* The server supplies the Minions with ExperimentData's and receives the result data.
*
* Manages the campaigns parameter distributions. The Campaign Controller can add
* experiment parameter sets, which the Jobserver will distribute to requesting
* clients. The campaign controller can wait for all results, or a timeout.
*/
class JobServer {
private:
//! The TCP Port number
int m_port;
//! TODO nice termination concept
bool m_finish;
//! Campaign signaled last expirement data set
bool m_noMoreExps;
//! the maximal number of threads spawned for TCP communication
unsigned m_maxThreads;
//! the maximal timeout per communication thread
int m_threadtimeout;
//! A of spawned threads
#ifndef __puma
typedef std::list<boost::thread*> Tthreadlist;
Tthreadlist m_threadlist;
boost::thread* m_serverThread;
#endif // puma
#ifdef SERVER_PERFORMANCE_MEASURE
static volatile unsigned m_DoneCount; //! the number of finished jobs
#ifndef __puma
boost::thread* m_measureThread; //! the performance measurement thread
#endif
#endif
//! Atomic counter for Workload IDs.
SynchronizedCounter m_counter;
//! Map of running jobs (referenced by Workload ID
SynchronizedMap<uint32_t, ExperimentData*> m_runningJobs;
//! List of undone jobs, here the campaigns jobs enter
SynchronizedQueue<ExperimentData*> m_undoneJobs;
//! List of finished experiment results.
SynchronizedQueue<ExperimentData*> m_doneJobs;
friend class CommThread; //!< CommThread is allowed access the job queues.
/**
* The actual startup of the Jobserver.
* Here we initalize the network socket
* and listen for connections.
*/
void run();
#ifdef SERVER_PERFORMANCE_MEASURE
void measure();
#endif
void sendWork(int sockfd);
public:
JobServer(int port = 1111) : m_port(port), m_finish(false), m_noMoreExps(false),
m_maxThreads(128), m_threadtimeout(0)
{
#ifndef __puma
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
#ifdef SERVER_PERFORMANCE_MEASURE
m_measureThread = new boost::thread(&JobServer::measure, this);
#endif
#endif
}
~JobServer()
{
#ifndef __puma
// Cleanup of m_serverThread, etc.
delete m_serverThread;
#ifdef SERVER_PERFORMANCE_MEASURE
delete m_measureThread;
#endif
#endif // __puma
}
/**
* Adds a new experiment data set to the job queue.
* @param data Pointer to an expoeriment data object
*/
void addParam(ExperimentData* data);
/**
* Retrieve an experiment result. Blocks if we currently have no results.
* Returns \c NULL if no results are to be expected, because no parameter
* sets were enqueued beforehand.
* @return pointer to experiment result data
*/
ExperimentData* getDone();
/**
* The Campaign controller must signalize, that there will be no
* more parameter sets. We need this, as we allow concurrent parameter
* generation and distribution.
*/
void setNoMoreExperiments() { m_noMoreExps = true; }
/**
* Checks whether there are no more experiment paremeter sets.
* @return \c true if no more parameter sets available, \c false otherwise
* @see setNoMoreExperiments
*/
bool noMoreExperiments() const { return m_noMoreExps; }
/**
* The Campaign Controller can signalize, that the jobserver can
* stop listening for client connections.
*/
void done() { m_finish = true; }
};
/**
* @class CommThread
* Implementation of the communication threads.
* This class implements the actual communication
* with the minions.
*/
class CommThread {
private:
int m_sock; //! Socket descriptor of the connection
JobServer& m_js; //! Calling jobserver
#ifndef __puma
static boost::mutex m_CommMutex; //! to synchronise the communication
#endif // __puma
// FIXME: Concerns are not really separated yet ;)
/**
* Called after minion calls for work.
* Tries to deque a parameter set non blocking, and
* sends it back to the requesting minion.
* @param minion The minion asking for input
*/
void sendPendingExperimentData(Minion& minion);
/**
* Called after minion offers a result message.
* Evaluates the Workload ID and puts the corresponding
* job result into the result queue.
* @param minion The minion offering results
* @param workloadID The workload id of the result message
*/
void receiveExperimentResults(Minion& minion, uint32_t workloadID);
public:
CommThread(int sockfd, JobServer& p)
: m_sock(sockfd), m_js(p) { }
/**
* The thread's entry point.
*/
void operator()();
};
} // end-of-namespace: fail
#endif //__JOB_SERVER_H__

71
src/core/cpn/Minion.hpp Normal file
View File

@ -0,0 +1,71 @@
/**
* \brief The representation of a minion.
*/
#ifndef __MINION_HPP__
#define __MINION_HPP__
#include <string>
#include "comm/ExperimentData.hpp"
namespace fail {
/**
* \class Minion
*
* Contains all informations about a minion.
*/
class Minion {
private:
std::string hostname;
bool isWorking;
ExperimentData* currentExperimentData;
int sockfd;
public:
Minion() : isWorking(false), currentExperimentData(0), sockfd(-1) { }
/**
* Sets the socket descriptor.
* @param sock the new socket descriptor (used internal)
*/
void setSocketDescriptor(int sock) { sockfd = sock; }
/**
* Retrives the socket descriptor.
* @return the socket descriptor
*/
int getSocketDescriptor() const { return (sockfd); }
/**
* Returns the hostname of the minion.
* @return the hostname
*/
const std::string& getHostname() { return (hostname); }
/**
* Sets the hostname of the minion.
* @param host the hostname
*/
void setHostname(const std::string& host) { hostname = host; }
/**
* Returns the current ExperimentData which the minion is working with.
* @return a pointer of the current ExperimentData
*/
ExperimentData* getCurrentExperimentData() { return currentExperimentData; }
/**
* Sets the current ExperimentData which the minion is working with.
* @param exp the current ExperimentData
*/
void setCurrentExperimentData(ExperimentData* exp) { currentExperimentData = exp; }
/**
* Returns the current state of the minion.
* @return the current state
*/
bool isBusy() { return (isWorking); }
/**
* Sets the current state of the minion
* @param state the current state
*/
void setBusy(bool state) { isWorking = state; }
};
} // end-of-namespace: fail
#endif // __MINION_HPP__