Error handling for Socket-related function calls added (JobServer).

git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@988 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
adrian
2012-03-12 17:09:46 +00:00
parent 2a42b48417
commit d8547aeac8

View File

@ -84,11 +84,18 @@ void JobServer::run(){
socklen_t clen = sizeof(clientaddr); socklen_t clen = sizeof(clientaddr);
// implementation of server-client communication // implementation of server-client communication
int s = socket(AF_INET, SOCK_STREAM, 0); int s;
if((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("socket");
return;
}
/* Enable address reuse */ /* Enable address reuse */
int on = 1; int on = 1;
setsockopt( s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); if(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
perror("setsockopt");
return;
}
/* IPv4, Port: 1111, accept all IP adresses */ /* IPv4, Port: 1111, accept all IP adresses */
struct sockaddr_in saddr; struct sockaddr_in saddr;
@ -96,17 +103,27 @@ void JobServer::run(){
saddr.sin_port = htons(m_port); saddr.sin_port = htons(m_port);
saddr.sin_addr.s_addr = htons(INADDR_ANY); saddr.sin_addr.s_addr = htons(INADDR_ANY);
/* bind to port */ /* bind to port */
bind(s, (struct sockaddr*) &saddr, sizeof(saddr)); if(bind(s, (struct sockaddr*) &saddr, sizeof(saddr)) == -1) {
perror("bind");
return;
}
/* Listen with a backlog of maxThreads */ /* Listen with a backlog of maxThreads */
listen(s, m_maxThreads); if(listen(s, m_maxThreads) == -1) {
perror("listen");
return;
}
cout << "JobServer listening...." << endl; cout << "JobServer listening...." << endl;
#ifndef __puma #ifndef __puma
boost::thread* th; boost::thread* th;
while(!m_finish){ while(!m_finish){
// Accept connection // Accept connection
int cs = accept(s, (struct sockaddr*) &clientaddr, &clen); int cs = accept(s, (struct sockaddr*)&clientaddr, &clen);
if(cs == -1) {
perror("accept");
return;
}
// Spawn a thread for further communication, // Spawn a thread for further communication,
// and add this thread to a list threads // and add this thread to a list threads
// We can limit the generation of threads here. // We can limit the generation of threads here.
@ -114,12 +131,12 @@ void JobServer::run(){
th = new boost::thread(CommThread(cs, *this)); th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th); m_threadlist.push_back(th);
}else{ }else{
/// Run over list with a timed_join, // Run over list with a timed_join,
/// removing finished threads. // removing finished threads.
do { do {
m_threadlist.remove_if( timed_join_successful(m_threadtimeout) ); m_threadlist.remove_if( timed_join_successful(m_threadtimeout) );
} while(m_threadlist.size() == m_maxThreads); } while(m_threadlist.size() == m_maxThreads);
/// Start new thread // Start new thread
th = new boost::thread(CommThread(cs, *this)); th = new boost::thread(CommThread(cs, *this));
m_threadlist.push_back(th); m_threadlist.push_back(th);
} }
@ -175,7 +192,7 @@ bool CommThread::sendPendingExperimentData(Minion& minion)
ctrlmsg.set_build_id(42); ctrlmsg.set_build_id(42);
ExperimentData * exp = 0; ExperimentData * exp = 0;
if(m_js.m_undoneJobs.Dequeue_nb(exp) == true){ if(m_js.m_undoneJobs.Dequeue_nb(exp) == true){
/// Got an element from queue, assign ID to workload and send to minion // Got an element from queue, assign ID to workload and send to minion
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
exp->setWorkloadID(workloadID); // store ID for identification when receiving result exp->setWorkloadID(workloadID); // store ID for identification when receiving result
if(!m_js.m_runningJobs.insert(workloadID, exp)){ if(!m_js.m_runningJobs.insert(workloadID, exp)){
@ -188,12 +205,12 @@ bool CommThread::sendPendingExperimentData(Minion& minion)
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage()); SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage());
}else if( m_js.noMoreExperiments() == false ){ }else if( m_js.noMoreExperiments() == false ){
/// Currently we have no workload, but the campaign is not over yet. Minion can try again later // Currently we have no workload, but the campaign is not over yet. Minion can try again later
ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN); ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN);
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl; cout << "--[Server] No workload, come again..." << endl;
}else{ }else{
/// No more elements, and campaign is over. Minion can die. // No more elements, and campaign is over. Minion can die.
ctrlmsg.set_command(FailControlMessage_Command_DIE); ctrlmsg.set_command(FailControlMessage_Command_DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl; cout << "--[Server] No workload, and no campaign, please die." << endl;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg); SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);