Code cleanup for commit 1963-1965
git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@2014 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -30,8 +30,8 @@ SET(SERVER_PERF_STEPPING_SEC "1" CACHE STRING "Stepping of performan
|
||||
SET(CLIENT_RAND_BACKOFF_TSTART "3" CACHE STRING "Lower limit of client's backoff phase in seconds")
|
||||
SET(CLIENT_RAND_BACKOFF_TEND "8" CACHE STRING "Upper limit of client's backoff phase in seconds")
|
||||
SET(CLIENT_RETRY_COUNT "3" CACHE STRING "Client's number of reconnect retries")
|
||||
SET(CLIENT_JOB_REQUEST_SEC "60" CACHE STRING "Determines how often the client asks for new jobs")
|
||||
SET(CLIENT_JOB_LIMIT_SEC "1000" CACHE STRING "How many jobs can a client ask for")
|
||||
SET(CLIENT_JOB_REQUEST_SEC "30" CACHE STRING "Time in seconds a client tries to get work for (to reduce client/server communication frequency)")
|
||||
SET(CLIENT_JOB_LIMIT "1000" CACHE STRING "How many jobs can a client ask for")
|
||||
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/FailConfig.hpp.in
|
||||
${CMAKE_CURRENT_BINARY_DIR}/FailConfig.hpp)
|
||||
|
||||
@ -240,7 +240,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
{
|
||||
uint32_t i;
|
||||
uint32_t workloadID;
|
||||
std::vector<ExperimentData*> exp;
|
||||
std::deque<ExperimentData*> exp;
|
||||
ExperimentData* temp_exp = 0;
|
||||
FailControlMessage ctrlmsg;
|
||||
|
||||
@ -248,7 +248,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
ctrlmsg.set_run_id(m_js.m_runid);
|
||||
ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS);
|
||||
|
||||
for(i = 0; i < m_job_size ; i++) {
|
||||
for (i = 0; i < m_job_size ; i++) {
|
||||
if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) {
|
||||
// Got an element from queue, assign ID to workload and send to minion
|
||||
workloadID = m_js.m_counter.increment(); // increment workload counter
|
||||
@ -261,7 +261,6 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
|
||||
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
|
||||
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
|
||||
sleep(10);
|
||||
}
|
||||
}
|
||||
if (exp.size() != 0) {
|
||||
@ -276,15 +275,15 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||
for (i = 0; i < ctrlmsg.job_size() ; i++) {
|
||||
if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
||||
exp.erase(exp.begin());
|
||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
||||
exp.pop_front();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
#ifndef __puma
|
||||
@ -329,7 +328,7 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
||||
}
|
||||
}
|
||||
|
||||
void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg)
|
||||
void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg)
|
||||
{
|
||||
int i;
|
||||
ExperimentData* exp = NULL; // Get exp* from running jobs
|
||||
|
||||
@ -158,7 +158,7 @@ private:
|
||||
* @param minion The minion offering results
|
||||
* @param workloadID The workload id of the result message
|
||||
*/
|
||||
void receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg);
|
||||
void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg);
|
||||
public:
|
||||
#ifndef __puma
|
||||
static boost::mutex m_CommMutex; //! to synchronise the communication
|
||||
|
||||
@ -86,18 +86,11 @@ bool JobClient::getParam(ExperimentData& exp)
|
||||
|
||||
FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp)
|
||||
{
|
||||
|
||||
FailControlMessage ctrlmsg;
|
||||
|
||||
//Are there other jobs for the experiment
|
||||
if (m_parameters.size() != 0) {
|
||||
exp.getMessage().CopyFrom(m_parameters.front()->getMessage());
|
||||
exp.setWorkloadID(m_parameters.front()->getWorkloadID());
|
||||
|
||||
delete &m_parameters.front()->getMessage();
|
||||
delete m_parameters.front();
|
||||
m_parameters.erase(m_parameters.begin());
|
||||
|
||||
return FailControlMessage::WORK_FOLLOWS;
|
||||
} else {
|
||||
FailControlMessage ctrlmsg;
|
||||
if (m_parameters.size() == 0) {
|
||||
|
||||
// Connection failed, minion can die
|
||||
if (!connectToServer()) {
|
||||
@ -134,6 +127,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
|
||||
if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) {
|
||||
// Failed to receive message? Retry.
|
||||
close(m_sockfd);
|
||||
delete temp_exp;
|
||||
return FailControlMessage::COME_AGAIN;
|
||||
}
|
||||
|
||||
@ -146,22 +140,26 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
close(m_sockfd);
|
||||
if (m_parameters.size() != 0) {
|
||||
//Take front from m_parameters and copy to exp.
|
||||
exp.getMessage().CopyFrom(m_parameters.front()->getMessage());
|
||||
exp.setWorkloadID(m_parameters.front()->getWorkloadID());
|
||||
//Delete front element of m_parameters
|
||||
delete &m_parameters.front()->getMessage();
|
||||
delete m_parameters.front();
|
||||
m_parameters.erase(m_parameters.begin());
|
||||
//start time measurement for throughput calculation
|
||||
m_job_runtime.startTimer();
|
||||
}
|
||||
|
||||
//start time measurement for throughput calculation
|
||||
m_job_runtime.startTimer();
|
||||
}
|
||||
|
||||
if (m_parameters.size() != 0) {
|
||||
exp.getMessage().CopyFrom(m_parameters.front()->getMessage());
|
||||
exp.setWorkloadID(m_parameters.front()->getWorkloadID());
|
||||
|
||||
delete &m_parameters.front()->getMessage();
|
||||
delete m_parameters.front();
|
||||
m_parameters.pop_front();
|
||||
|
||||
return FailControlMessage::WORK_FOLLOWS;
|
||||
} else {
|
||||
return ctrlmsg.command();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
bool JobClient::sendResult(ExperimentData& result)
|
||||
@ -171,29 +169,30 @@ bool JobClient::sendResult(ExperimentData& result)
|
||||
temp_exp->getMessage().CopyFrom(result.getMessage());
|
||||
temp_exp->setWorkloadID(result.getWorkloadID());
|
||||
|
||||
m_results.push_back( temp_exp );
|
||||
|
||||
if (m_parameters.size() != 0) {
|
||||
//If there are more jobs for the experiment store result
|
||||
m_results.push_back( temp_exp );
|
||||
|
||||
return true;
|
||||
} else {
|
||||
m_results.push_back( temp_exp );
|
||||
|
||||
//Stop time measurement and calculate new throughput
|
||||
m_job_runtime.stopTimer();
|
||||
m_job_throughput = CLIENT_JOB_REQUEST_SEC/((double)m_job_runtime/m_results.size());
|
||||
|
||||
if (m_job_throughput > CLIENT_JOB_LIMIT_SEC)
|
||||
if (m_job_throughput > CLIENT_JOB_LIMIT_SEC) {
|
||||
m_job_throughput = CLIENT_JOB_LIMIT_SEC;
|
||||
}
|
||||
|
||||
if (m_job_throughput < 1)
|
||||
if (m_job_throughput < 1) {
|
||||
m_job_throughput = 1;
|
||||
}
|
||||
|
||||
//Reset timer for new time measurement
|
||||
m_job_runtime.reset();
|
||||
|
||||
if (!connectToServer())
|
||||
if (!connectToServer()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//Send back results
|
||||
FailControlMessage ctrlmsg;
|
||||
@ -219,7 +218,7 @@ bool JobClient::sendResult(ExperimentData& result)
|
||||
SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage());
|
||||
delete &m_results.front()->getMessage();
|
||||
delete m_results.front();
|
||||
m_results.erase(m_results.begin());
|
||||
m_results.pop_front();
|
||||
}
|
||||
|
||||
// Close connection.
|
||||
|
||||
@ -31,8 +31,8 @@ private:
|
||||
|
||||
WallclockTimer m_job_runtime;
|
||||
int m_job_throughput;
|
||||
std::vector<ExperimentData*> m_parameters;
|
||||
std::vector<ExperimentData*> m_results;
|
||||
std::deque<ExperimentData*> m_parameters;
|
||||
std::deque<ExperimentData*> m_results;
|
||||
|
||||
bool connectToServer();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user