another directory rename: failstar -> fail

"failstar" sounds like a name for a cruise liner from the 80s.  As "*" isn't a
desirable part of directory names, just name the whole thing "fail/", the core
parts being stored in "fail/core/".

Additionally fixing two build system dependency issues:
 - missing jobserver -> protomessages dependency
 - broken bochs -> fail dependency (add_custom_target DEPENDS only allows plain
   file dependencies ... cmake for the win)


git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@956 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
hsc
2012-03-08 19:43:02 +00:00
commit b70b6fb43a
921 changed files with 473161 additions and 0 deletions

View File

@ -0,0 +1,8 @@
set(SRCS
JobClient.cc
JobServer.cc
SocketComm.cc
)
add_subdirectory(messagedefs)
add_library(jobserver ${SRCS})
add_dependencies(jobserver protomessages)

101
core/jobserver/JobClient.cc Normal file
View File

@ -0,0 +1,101 @@
#include "JobClient.hpp"
namespace fi {
JobClient::JobClient(std::string server, int port)
{
m_server_port = port;
m_server = server;
m_server_ent = gethostbyname(m_server.c_str());
if(m_server_ent == NULL){
perror("Cannot resolve host.");
exit(1);
}
}
bool JobClient::connectToServer(){
// Connect to server
struct sockaddr_in serv_addr;
m_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(m_sockfd < 0){
perror("socket");
exit(0);
}
/* Enable address reuse */
int on = 1;
setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
memcpy(&serv_addr.sin_addr.s_addr, m_server_ent->h_addr, m_server_ent->h_length);
serv_addr.sin_port = htons(m_server_port);
if (connect(m_sockfd, (sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("connect()");
return false;
}
return true;
}
bool JobClient::getParam(ExperimentData& exp){
while(1) { //!< Here we try to acquire a parameter set
switch(tryToGetExperimentData(exp)){
//!< Jobserver will sent workload, params are set in \c exp
case FailControlMessage_Command_WORK_FOLLOWS: return true;
//!< Nothing to do right now, but maybe later
case FailControlMessage_Command_COME_AGAIN:
sleep(1);
continue;
default:
return false;
}
}
}
FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp)
{
//!< Connection failed, minion can die
if( !connectToServer() ) return FailControlMessage_Command_DIE;
// Retrieve ExperimentData
FailControlMessage ctrlmsg;
ctrlmsg.set_command(FailControlMessage_Command_NEED_WORK);
ctrlmsg.set_build_id(42);
SocketComm::send_msg(m_sockfd, ctrlmsg);
ctrlmsg.Clear();
SocketComm::rcv_msg(m_sockfd, ctrlmsg);
switch(ctrlmsg.command()){
case FailControlMessage_Command_WORK_FOLLOWS:
SocketComm::rcv_msg(m_sockfd, exp.getMessage());
exp.setWorkloadID(ctrlmsg.workloadid()); // Store workload id of experiment data
break;
case FailControlMessage_Command_COME_AGAIN:
break;
default:
break;
}
close(m_sockfd);
return ctrlmsg.command();
}
bool JobClient::sendResult(ExperimentData& result)
{
if (!connectToServer() ) return false;
// Send back results
FailControlMessage ctrlmsg;
ctrlmsg.set_command(FailControlMessage_Command_RESULT_FOLLOWS);
ctrlmsg.set_build_id(42);
ctrlmsg.set_workloadid(result.getWorkloadID());
cout << "Sending back result [" << std::dec << result.getWorkloadID() << "]" << endl;
SocketComm::send_msg(m_sockfd, ctrlmsg);
SocketComm::send_msg(m_sockfd, result.getMessage());
// close connection.
close(m_sockfd);
return true;
}
}

View File

@ -0,0 +1,63 @@
/**
* \brief The Minion's JobClient requests ExperimentData and returns results.
*
* \author Martin Hoffmann
*/
#ifndef __JOB_CLIENT_H__
#define __JOB_CLIENT_H__
#include "SocketComm.hpp"
#include <string>
#include "controller/ExperimentData.hpp"
#include "jobserver/messagedefs/FailControlMessage.pb.h"
namespace fi {
/**
* \class JobClient
*
* \brief Manages communication with JobServer
*
*/
class JobClient {
std::string m_server;
int m_server_port;
struct hostent* m_server_ent;
int m_sockfd;
bool connectToServer();
FailControlMessage_Command tryToGetExperimentData(ExperimentData& exp);
public:
JobClient(std::string server = "localhost", int port = 1111);
/**
* Receive experiment data set from (remote) JobServer
* The caller (experiment developer) is responsible for
* allocating his ExperimentData object.
*
* @param exp Reference to a ExperimentData object allocated by the caller!
* @result \c true if parameter have been received and put into \c exp, \c false else.
*/
bool getParam(ExperimentData& exp);
/**
* Send back experiment result to the (remote) JobServer
* The caller (experiment developer) is responsible for
* destroying his ExperimentData object afterwards.
*
* @param exp Reference to the ExperimentData holding result values
* @return \c true Result successfully sent, \c else.
*/
bool sendResult(ExperimentData& result);
};
}
#endif

220
core/jobserver/JobServer.cc Normal file
View File

@ -0,0 +1,220 @@
// Author: Martin Hoffmann, Richard Hellwig
// Date: 07.10.11
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
#include <iostream>
#include "JobServer.hpp"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include <string.h>
#include <arpa/inet.h>
#include "jobserver/messagedefs/FailControlMessage.pb.h"
#include "SocketComm.hpp"
#include "controller/Minion.hpp"
#ifndef __puma
#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#endif
using namespace std;
namespace fi {
void JobServer::addParam(ExperimentData* exp){
#ifndef __puma
m_undoneJobs.Enqueue(exp);
#endif
}
ExperimentData *JobServer::getDone()
{
// FIXME race condition, need to synchronize with
// sendPendingExperimentData() and receiveExperimentResults()
#ifndef __puma
if (m_undoneJobs.Size() == 0
&& noMoreExperiments()
&& m_runningJobs.Size() == 0
&& m_doneJobs.Size() == 0) {
// FRICKEL workaround
sleep(1);
ExperimentData *exp;
if (m_doneJobs.Dequeue_nb(exp)) {
return exp;
}
return 0;
}
return m_doneJobs.Dequeue();
#endif
}
#ifndef __puma
/**
* This is a predicate class for the remove_if operator
* on the thread list. The operator waits for
* timeout milliseconds to join each thread in the list.
* If the join was successful, the exited thread is deallocated
* and removed from the list.
*/
struct timed_join_successful {
int timeout_ms;
timed_join_successful(int timeout) : timeout_ms(timeout){};
bool operator()( boost::thread * threadelement ){
boost::posix_time::time_duration timeout = boost::posix_time::milliseconds(timeout_ms);
if(threadelement->timed_join(timeout)){
delete threadelement;
return true;
}else{
return false;
}
}
};
#endif
void JobServer::run(){
struct sockaddr_in clientaddr;
socklen_t clen = sizeof(clientaddr);
// implementation of server-client communication
int s = socket(AF_INET, SOCK_STREAM, 0);
/* Enable address reuse */
int on = 1;
setsockopt( s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
/* IPv4, Port: 1111, accept all IP adresses */
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(m_port);
saddr.sin_addr.s_addr = htons(INADDR_ANY);
/* bind to port */
bind(s, (struct sockaddr*) &saddr, sizeof(saddr));
/* Listen with a backlog of maxThreads */
listen(s, m_maxThreads);
cout << "JobServer listening...." << endl;
#ifndef __puma
boost::thread* th;
while(!m_finish){
// Accept connection
int cs = accept(s, (struct sockaddr*) &clientaddr, &clen);
// Spawn a thread for further communication,
// and add this thread to a list threads
// We can limit the generation of threads here.
if(m_threadlist.size() < m_maxThreads){
th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th);
}else{
/// Run over list with a timed_join,
/// removing finished threads.
do {
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
} while(m_threadlist.size() == m_maxThreads);
/// Start new thread
th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th);
}
}
close(s);
// when all undone Jobs are distributed -> call a timed_join on all spawned
// TODO: interrupt threads that do not want to join..
while(m_threadlist.size() > 0){
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
}
#endif
}
/// Communication thread implementation
void CommThread::operator()()
{
Minion minion;
FailControlMessage ctrlmsg;
minion.setSocketDescriptor(m_sock);
if (!SocketComm::rcv_msg(minion.getSocketDescriptor(), ctrlmsg)) {
cout << "!![Server] failed to read complete message from client" << endl;
close(m_sock);
return;
}
switch (ctrlmsg.command()) {
case FailControlMessage_Command_NEED_WORK:
// give minion something to do..
sendPendingExperimentData(minion);
break;
case FailControlMessage_Command_RESULT_FOLLOWS:
// get results and put to done queue.
receiveExperimentResults(minion, ctrlmsg.workloadid());
break;
default:
// hm.. don't know what to do. please die.
cout << "!![Server] no idea what to do with command #"
<< ctrlmsg.command() << ", telling minion to die." << endl;
ctrlmsg.Clear();
ctrlmsg.set_command(FailControlMessage_Command_DIE);
ctrlmsg.set_build_id(42);
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
}
close(m_sock);
}
bool CommThread::sendPendingExperimentData(Minion& minion)
{
FailControlMessage ctrlmsg;
ctrlmsg.set_build_id(42);
ExperimentData * exp = 0;
if(m_js.m_undoneJobs.Dequeue_nb(exp) == true){
/// Got an element from queue, assign ID to workload and send to minion
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
exp->setWorkloadID(workloadID); // store ID for identification when receiving result
if(!m_js.m_runningJobs.insert(workloadID, exp)){
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
cout << ">>[" << workloadID << "] " << flush;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage());
}else if( m_js.noMoreExperiments() == false ){
/// Currently we have no workload, but the campaign is not over yet. Minion can try again later
ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN);
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl;
}else{
/// No more elements, and campaign is over. Minion can die.
ctrlmsg.set_command(FailControlMessage_Command_DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
}
return true;
}
bool CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
{
ExperimentData * exp; // Get exp* from running jobs
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
cout << "<<[" << workloadID << "] " << flush;
if( m_js.m_runningJobs.remove(workloadID, exp) ){ /// ExperimentData* found
SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); /// deserialize results.
m_js.m_doneJobs.Enqueue(exp); /// Put results in done queue..
return true;
}else{
cout << "!![Server] workload id not found in running jobs map :( [" << workloadID << "]" << endl;
return false;
}
}
};

View File

@ -0,0 +1,159 @@
/**
* \brief The JobServer supplies the Minions with ExperimentData's and receives the result data.
*
* \author Martin Hoffmann, Richard Hellwig
*/
#ifndef __JOB_SERVER_H__
#define __JOB_SERVER_H__
#include "controller/Minion.hpp"
#include "util/SynchronizedQueue.hpp"
#include "util/SynchronizedCounter.hpp"
#include "util/SynchronizedMap.hpp"
#include <list>
#ifndef __puma
#include <boost/thread.hpp>
#endif
namespace fi {
class CommThread;
/**
* \class JobServer
* Manages the campaigns parameter distributions.
* The Campaign Controller can add experiment parameter sets,
* which the Jobserver will distribute to requesting clients.
* The campaign controller can wait for all results, or a timeout.
*/
class JobServer
{
//! The TCP Port number
int m_port;
//! TODO nice termination concept
bool m_finish;
//! Campaign signaled last expirement data set
bool m_noMoreExps;
//! the maximal number of threads spawned for TCP communication
unsigned m_maxThreads;
//! the maximal timeout per communication thread
int m_threadtimeout;
//! A of spawned threads
#ifndef __puma
typedef std::list<boost::thread*> Tthreadlist;
Tthreadlist m_threadlist;
boost::thread* m_serverThread;
#endif // puma
//! Atomic counter for Workload IDs.
SynchronizedCounter m_counter;
//! Map of running jobs (referenced by Workload ID
SynchronizedMap<uint32_t, ExperimentData*> m_runningJobs;
//! List of undone jobs, here the campaigns jobs enter
SynchronizedQueue<ExperimentData*> m_undoneJobs;
//! List of finished experiment results.
SynchronizedQueue<ExperimentData*> m_doneJobs;
friend class CommThread; //!< CommThread is allowed access the job queues.
public:
JobServer(int port = 1111) : m_port(port), m_finish(false), m_noMoreExps(false), m_maxThreads(32), m_threadtimeout(500) {
#ifndef __puma
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
#endif
};
~JobServer() {}
private:
/**
* The actual startup of the Jobserver.
* Here we initalize the network socket
* and listen for connections.
*/
void run();
void sendWork(int sockfd);
public:
/**
* Adds a new experiment data set to the job queue.
* @param data Pointer to an expoeriment data object
*/
void addParam(ExperimentData* data);
/**
* Retrieve an experiment result.
*
* Blocks if we currently have no results. Returns NULL if no results
* are to be expected, because no parameter sets were enqueued
* beforehand.
*/
ExperimentData* getDone();
/**
* The Campaign controller must signalize, that
* there will be no more parameter sets.
* We need this, as we allow concurrent parameter
* generation and distribution.
*/
void setNoMoreExperiments(){ m_noMoreExps = true; };
/**
* corresponding getter-method @see setNoMoreExperiments
*/
bool noMoreExperiments()const { return m_noMoreExps; };
/**
* The Campaign Controller can signalize, that
* the jobserver can stop listening for client
* connections.
*/
void done() { m_finish = true; };
};
/**
* @class CommThread
* Implementation of the communication threads.
* This class implements the actual communication
* with the minions.
*/
class CommThread {
int m_sock; //! Socket descriptor of the connection
JobServer& m_js; //! Calling jobserver
public:
CommThread(int sockfd, JobServer& p) : m_sock(sockfd), m_js(p) {};
/**
* The thread's entry point
*/
void operator() ();
private:
/// FIXME concerns are not really separated yet ;)
/**
* Called after minion calls for work.
* Tries to deque a parameter set non blocking, and
* sends it back to the requesting minion.
* @param minion The minion asking for input
* @return FIXME return value not evaluated yet.
*/
bool sendPendingExperimentData(Minion& minion);
/**
* Called after minion offers a result message.
* Evaluates the Workload ID and puts the corresponding
* job result into the result queue.
* @param minion The minion offering results
* @param workloadID The workload id of the result message
* @return \c true if Worload ID could be mapped, \c false if not
*/
bool receiveExperimentResults(Minion& minion, uint32_t workloadID);
};
};
#endif //__JOB_SERVER_H__

View File

@ -0,0 +1,49 @@
#include "SocketComm.hpp"
namespace fi {
bool SocketComm::send_msg(int sockfd, google::protobuf::Message& msg)
{
#ifdef USE_SIZE_PREFIX
int size = htonl(msg.ByteSize());
if (write(sockfd, &size, sizeof(size)) != sizeof(size)) {
return false;
}
std::string buf;
msg.SerializeToString(&buf);
if (write(sockfd, buf.c_str(), buf.size()) != (ssize_t) buf.size()) {
return false;
}
#else
char c = 0;
if (!msg.SerializeToFileDescriptor(sockfd) || write(sockfd, &c, 1) != 1) {
return false;
}
#endif
return true;
}
bool SocketComm::rcv_msg(int sockfd, google::protobuf::Message& msg)
{
#ifdef USE_SIZE_PREFIX
int size;
// FIXME: read() may need to be called multiple times until all data was read
if (read(sockfd, &size, sizeof(size)) != sizeof(size)) {
return false;
}
size = ntohl(size);
char *buf = new char[size];
// FIXME: read() may need to be called multiple times until all data was read
if (read(sockfd, buf, size) != size) {
delete[] buf;
return false;
}
std::string st(buf, size);
delete[] buf;
msg.ParseFromString(st);
return true;
#else
return msg.ParseFromFileDescriptor(sockfd);
#endif
}
}

View File

@ -0,0 +1,44 @@
/**
* \brief Socket based communictaion
*
* \author Horst Schirmeier, Martin Hoffmann
*/
#ifndef __SOCKETCOMM_HPP__
#define __SOCKETCOMM_HPP__
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <iostream>
#include <fstream>
#include <google/protobuf/message.h>
#define USE_SIZE_PREFIX
namespace fi {
class SocketComm {
public:
/**
* Send Protobuf-generated message
* @param sockfd open socket descriptor to write to
* @param Msg Reference to Protobuf generated message type
* \return false if message sending failed
*/
static bool send_msg(int sockfd, google::protobuf::Message& msg);
/**
* Receive Protobuf-generated message
* @param sockfd open socket descriptor to write to
* @param Msg Reference to Protobuf generated message type
* \return false if message reception failed
*/
static bool rcv_msg(int sockfd, google::protobuf::Message& msg);
};
}
#endif

View File

@ -0,0 +1,15 @@
## Setup desired protobuf descriptions HERE ##
set(MY_PROTOS
FailControlMessage.proto
)
#### PROTOBUFS ####
find_package(Protobuf REQUIRED)
include_directories(${PROTOBUF_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${MY_PROTOS} )
## Build library
add_library(protomessages ${PROTO_SRCS} ${PROTO_HDRS} )

View File

@ -0,0 +1,16 @@
message FailControlMessage {
enum Command {
// Minions may send these:
NEED_WORK = 0; // server replies with WORK_FOLLOWS or DIE
RESULT_FOLLOWS = 1; // followed by experiment-specific ExperimentData message (holding both original parameters and experiment result)
// JobServer may send these:
WORK_FOLLOWS = 5; // followed by experiment-specific ExperimentData message
COME_AGAIN = 6; // no experiment-specific ExperimentData at the moment, but Campaign is not over yet
DIE = 7; // tells the client to terminate
}
required Command command = 1;
optional uint32 workloadID = 2;
required uint64 build_id = 3; // identifying the client/server build (e.g., build time in unixtime format)
}

View File

@ -0,0 +1,2 @@
#!/bin/bash
protoc --cpp_out=. FailControlMessage.proto