From 00f809231f9ecbefa08d765879b11c42878299d7 Mon Sep 17 00:00:00 2001 From: hellwig Date: Wed, 23 Jan 2013 14:22:05 +0000 Subject: [PATCH] Code cleanup for commit 1963-1965 git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@2014 8c4709b5-6ec9-48aa-a5cd-a96041d1645a --- src/core/config/CMakeLists.txt | 4 +- src/core/cpn/JobServer.cc | 13 ++- src/core/cpn/JobServer.hpp | 2 +- src/core/efw/JobClient.cc | 61 ++++++------ src/core/efw/JobClient.hpp | 4 +- src/experiments/weather-monitor/experiment.cc | 99 ++++++++++--------- 6 files changed, 92 insertions(+), 91 deletions(-) diff --git a/src/core/config/CMakeLists.txt b/src/core/config/CMakeLists.txt index 11bb8807..d91da3e9 100644 --- a/src/core/config/CMakeLists.txt +++ b/src/core/config/CMakeLists.txt @@ -30,8 +30,8 @@ SET(SERVER_PERF_STEPPING_SEC "1" CACHE STRING "Stepping of performan SET(CLIENT_RAND_BACKOFF_TSTART "3" CACHE STRING "Lower limit of client's backoff phase in seconds") SET(CLIENT_RAND_BACKOFF_TEND "8" CACHE STRING "Upper limit of client's backoff phase in seconds") SET(CLIENT_RETRY_COUNT "3" CACHE STRING "Client's number of reconnect retries") -SET(CLIENT_JOB_REQUEST_SEC "60" CACHE STRING "Determines how often the client asks for new jobs") -SET(CLIENT_JOB_LIMIT_SEC "1000" CACHE STRING "How many jobs can a client ask for") +SET(CLIENT_JOB_REQUEST_SEC "30" CACHE STRING "Time in seconds a client tries to get work for (to reduce client/server communication frequency)") +SET(CLIENT_JOB_LIMIT "1000" CACHE STRING "How many jobs can a client ask for") configure_file(${CMAKE_CURRENT_SOURCE_DIR}/FailConfig.hpp.in ${CMAKE_CURRENT_BINARY_DIR}/FailConfig.hpp) diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index c669023c..20365220 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -240,7 +240,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) { uint32_t i; uint32_t workloadID; - std::vector exp; + std::deque exp; ExperimentData* temp_exp = 0; FailControlMessage ctrlmsg; @@ -248,7 +248,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) ctrlmsg.set_run_id(m_js.m_runid); ctrlmsg.set_command(FailControlMessage::WORK_FOLLOWS); - for(i = 0; i < m_job_size ; i++) { + for (i = 0; i < m_job_size ; i++) { if (m_js.m_undoneJobs.Dequeue_nb(temp_exp) == true) { // Got an element from queue, assign ID to workload and send to minion workloadID = m_js.m_counter.increment(); // increment workload counter @@ -261,7 +261,6 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - sleep(10); } } if (exp.size() != 0) { @@ -276,15 +275,15 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { for (i = 0; i < ctrlmsg.job_size() ; i++) { - if(SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { - exp.erase(exp.begin()); + if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + exp.pop_front(); } else { break; } } - return; } + return; } #ifndef __puma @@ -329,7 +328,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) } } -void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg) +void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg) { int i; ExperimentData* exp = NULL; // Get exp* from running jobs diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 1f6ca999..79aff47e 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -158,7 +158,7 @@ private: * @param minion The minion offering results * @param workloadID The workload id of the result message */ - void receiveExperimentResults(Minion& minion, FailControlMessage ctrlmsg); + void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg); public: #ifndef __puma static boost::mutex m_CommMutex; //! to synchronise the communication diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index f0daebbc..6cb76cf2 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -86,18 +86,11 @@ bool JobClient::getParam(ExperimentData& exp) FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp) { + + FailControlMessage ctrlmsg; + //Are there other jobs for the experiment - if (m_parameters.size() != 0) { - exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); - exp.setWorkloadID(m_parameters.front()->getWorkloadID()); - - delete &m_parameters.front()->getMessage(); - delete m_parameters.front(); - m_parameters.erase(m_parameters.begin()); - - return FailControlMessage::WORK_FOLLOWS; - } else { - FailControlMessage ctrlmsg; + if (m_parameters.size() == 0) { // Connection failed, minion can die if (!connectToServer()) { @@ -134,6 +127,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) { // Failed to receive message? Retry. close(m_sockfd); + delete temp_exp; return FailControlMessage::COME_AGAIN; } @@ -146,22 +140,26 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp default: break; } - close(m_sockfd); - if (m_parameters.size() != 0) { - //Take front from m_parameters and copy to exp. - exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); - exp.setWorkloadID(m_parameters.front()->getWorkloadID()); - //Delete front element of m_parameters - delete &m_parameters.front()->getMessage(); - delete m_parameters.front(); - m_parameters.erase(m_parameters.begin()); - //start time measurement for throughput calculation - m_job_runtime.startTimer(); - } + //start time measurement for throughput calculation + m_job_runtime.startTimer(); + } + + if (m_parameters.size() != 0) { + exp.getMessage().CopyFrom(m_parameters.front()->getMessage()); + exp.setWorkloadID(m_parameters.front()->getWorkloadID()); + + delete &m_parameters.front()->getMessage(); + delete m_parameters.front(); + m_parameters.pop_front(); + + return FailControlMessage::WORK_FOLLOWS; + } else { return ctrlmsg.command(); } + + } bool JobClient::sendResult(ExperimentData& result) @@ -171,29 +169,30 @@ bool JobClient::sendResult(ExperimentData& result) temp_exp->getMessage().CopyFrom(result.getMessage()); temp_exp->setWorkloadID(result.getWorkloadID()); + m_results.push_back( temp_exp ); + if (m_parameters.size() != 0) { //If there are more jobs for the experiment store result - m_results.push_back( temp_exp ); - return true; } else { - m_results.push_back( temp_exp ); - //Stop time measurement and calculate new throughput m_job_runtime.stopTimer(); m_job_throughput = CLIENT_JOB_REQUEST_SEC/((double)m_job_runtime/m_results.size()); - if (m_job_throughput > CLIENT_JOB_LIMIT_SEC) + if (m_job_throughput > CLIENT_JOB_LIMIT_SEC) { m_job_throughput = CLIENT_JOB_LIMIT_SEC; + } - if (m_job_throughput < 1) + if (m_job_throughput < 1) { m_job_throughput = 1; + } //Reset timer for new time measurement m_job_runtime.reset(); - if (!connectToServer()) + if (!connectToServer()) { return false; + } //Send back results FailControlMessage ctrlmsg; @@ -219,7 +218,7 @@ bool JobClient::sendResult(ExperimentData& result) SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage()); delete &m_results.front()->getMessage(); delete m_results.front(); - m_results.erase(m_results.begin()); + m_results.pop_front(); } // Close connection. diff --git a/src/core/efw/JobClient.hpp b/src/core/efw/JobClient.hpp index 5a4806a8..9e4155ee 100644 --- a/src/core/efw/JobClient.hpp +++ b/src/core/efw/JobClient.hpp @@ -31,8 +31,8 @@ private: WallclockTimer m_job_runtime; int m_job_throughput; - std::vector m_parameters; - std::vector m_results; + std::deque m_parameters; + std::deque m_results; bool connectToServer(); diff --git a/src/experiments/weather-monitor/experiment.cc b/src/experiments/weather-monitor/experiment.cc index 72b17ffc..1c9de7fa 100644 --- a/src/experiments/weather-monitor/experiment.cc +++ b/src/experiments/weather-monitor/experiment.cc @@ -40,7 +40,7 @@ bool WeatherMonitorExperiment::run() log << "startup" << endl; -#if 1 +#if 0 // STEP 0: record memory map with vptr addresses GuestListener g; while (true) { @@ -94,7 +94,7 @@ bool WeatherMonitorExperiment::run() // -> campaign-ready traces with identical lengths bp.setWatchInstructionPointer(ANY_ADDR); bp.setCounter(WEATHER_NUMINSTR_TRACING); -#endif +#endif simulator.addListener(&bp); BPSingleListener ev_count(ANY_ADDR); simulator.addListener(&ev_count); @@ -136,10 +136,10 @@ bool WeatherMonitorExperiment::run() log << dec << "experiment finished after " << instr_counter << " instructions, seeing wait_end " << WEATHER_NUMITER_AFTER << " times" << endl; -#elif 0 +#elif 1 // STEP 3: The actual experiment. #if !LOCAL - for (int i = 0; i < 50; ++i) { // only do 50 sequential experiments, to prevent swapping + for (int i = 0; i < 5000; ++i) { // only do 50 sequential experiments, to prevent swapping // 50 exp ~ 0.5GB RAM usage per instance (linearly increasing) #endif @@ -159,20 +159,20 @@ bool WeatherMonitorExperiment::run() param.msg.set_mem_addr(0x00103bdc); #endif - int id = param.getWorkloadID(); - int instr_offset = param.msg.instr_offset(); - int mem_addr = param.msg.mem_addr(); + //int id = param.getWorkloadID(); + //int instr_offset = param.msg.instr_offset(); + //int mem_addr = param.msg.mem_addr(); // for each job we're actually doing *8* experiments (one for each bit) for (int bit_offset = 0; bit_offset < 8; ++bit_offset) { // 8 results in one job WeathermonitorProtoMsg_Result *result = param.msg.add_result(); - result->set_bit_offset(bit_offset); - log << dec << "job " << id << " instr " << instr_offset + result->set_bit_offset(bit_offset); //!!!!!!!!! + /*log << dec << "job " << id << " instr " << instr_offset << " mem " << mem_addr << "+" << bit_offset << endl; log << "restoring state" << endl; - simulator.restore(statename); + simulator.restore(statename);*/ // XXX debug /* @@ -184,21 +184,21 @@ bool WeatherMonitorExperiment::run() */ // this marks THE END - BPSingleListener ev_end(ANY_ADDR); + /*BPSingleListener ev_end(ANY_ADDR); ev_end.setCounter(WEATHER_NUMINSTR_TRACING + WEATHER_NUMINSTR_AFTER); - simulator.addListener(&ev_end); + simulator.addListener(&ev_end);*/ // count loop iterations by counting wait_begin() calls // FIXME would be nice to have a callback API for this as this needs to // be done "in parallel" - BPSingleListener ev_wait_begin(WEATHER_FUNC_WAIT_BEGIN); + /*BPSingleListener ev_wait_begin(WEATHER_FUNC_WAIT_BEGIN); simulator.addListener(&ev_wait_begin); - int count_loop_iter_before = 0; + int count_loop_iter_before = 0;*/ // no need to wait if offset is 0 - if (instr_offset > 0) { + //if (instr_offset > 0) { // XXX could be improved with intermediate states (reducing runtime until injection) - bp.setWatchInstructionPointer(ANY_ADDR); + /*bp.setWatchInstructionPointer(ANY_ADDR); bp.setCounter(instr_offset); simulator.addListener(&bp); @@ -207,18 +207,18 @@ bool WeatherMonitorExperiment::run() ++count_loop_iter_before; simulator.addListener(&ev_wait_begin); } - } + }*/ // --- fault injection --- - MemoryManager& mm = simulator.getMemoryManager(); + /*MemoryManager& mm = simulator.getMemoryManager(); byte_t data = mm.getByte(mem_addr); byte_t newdata = data ^ (1 << bit_offset); mm.setByte(mem_addr, newdata); // note at what IP we did it int32_t injection_ip = simulator.getRegisterManager().getInstructionPointer(); - param.msg.set_injection_ip(injection_ip); - result->set_iter_before_fi(count_loop_iter_before); - log << "fault injected @ ip " << injection_ip + param.msg.set_injection_ip(injection_ip);*/ + result->set_iter_before_fi(0); //!!!!!!!!!!!!!!!!! + /*log << "fault injected @ ip " << injection_ip << " 0x" << hex << ((int)data) << " -> 0x" << ((int)newdata) << endl; // sanity check if (param.msg.has_instr_address() && @@ -226,13 +226,13 @@ bool WeatherMonitorExperiment::run() stringstream ss; ss << "SANITY CHECK FAILED: " << injection_ip << " != " << param.msg.instr_address(); - log << ss.str() << endl; - result->set_resulttype(result->UNKNOWN); - result->set_latest_ip(injection_ip); - result->set_details(ss.str()); - result->set_iter_after_fi(0); + log << ss.str() << endl;*/ + result->set_resulttype(result->UNKNOWN); //!!!!!!!!!!!!!!!! + result->set_latest_ip(42); //!!!!!!!!!!!!!!!! + result->set_details("test"); //!!!!!!!!!!!!!!!! + result->set_iter_after_fi(0); //!!!!!!!!!!!!!!!! - simulator.clearListeners(); + //simulator.clearListeners(); continue; } @@ -250,7 +250,7 @@ bool WeatherMonitorExperiment::run() // - (XXX "sane" display?) // catch traps as "extraordinary" ending - TrapListener ev_trap(ANY_TRAP); + /*TrapListener ev_trap(ANY_TRAP); simulator.addListener(&ev_trap); // jump outside text segment BPRangeListener ev_below_text(ANY_ADDR, WEATHER_TEXT_START - 1); @@ -263,7 +263,7 @@ bool WeatherMonitorExperiment::run() // timeout (e.g., stuck in a HLT instruction) // 10000us = 500000 instructions TimerListener ev_timeout(10000); - simulator.addListener(&ev_timeout); + simulator.addListener(&ev_timeout); */ #if LOCAL && 0 // XXX debug @@ -275,57 +275,60 @@ bool WeatherMonitorExperiment::run() simulator.addFlow(&tp); #endif - BaseListener* ev; + /*BaseListener* ev; // count loop iterations int count_loop_iter_after = 0; while ((ev = simulator.resume()) == &ev_wait_begin) { ++count_loop_iter_after; simulator.addListener(&ev_wait_begin); - } - result->set_iter_after_fi(count_loop_iter_after); + }*/ + //result->set_iter_after_fi(42); //!!!!!!!!!!!! // record latest IP regardless of result - result->set_latest_ip(simulator.getRegisterManager().getInstructionPointer()); + //result->set_latest_ip(0x42); //!!!!!!!!!! - if (ev == &ev_end) { + //result->set_resulttype(result->FINISHED); + + /*if (ev == &ev_end) { log << "Result FINISHED (" << dec << count_loop_iter_before << "+" << count_loop_iter_after << ")" << endl; - result->set_resulttype(result->FINISHED); + result->set_resulttype(result->FINISHED); //!!!!!!!!! } else if (ev == &ev_timeout) { log << "Result TIMEOUT (" << dec << count_loop_iter_before << "+" << count_loop_iter_after << ")" << endl; - result->set_resulttype(result->TIMEOUT); + result->set_resulttype(result->TIMEOUT); //!!!!!!!!! } else if (ev == &ev_below_text || ev == &ev_beyond_text) { log << "Result OUTSIDE" << endl; - result->set_resulttype(result->OUTSIDE); + result->set_resulttype(result->OUTSIDE); //!!!!!!!!! } else if (ev == &ev_trap) { log << dec << "Result TRAP #" << ev_trap.getTriggerNumber() << endl; - result->set_resulttype(result->TRAP); + result->set_resulttype(result->TRAP); //!!!!!!!!! stringstream ss; ss << ev_trap.getTriggerNumber(); - result->set_details(ss.str()); + result->set_details(ss.str()); //!!!!!!!! } else if (ev == &ev_detected) { log << dec << "Result DETECTED" << endl; - result->set_resulttype(result->DETECTED); + result->set_resulttype(result->DETECTED); //!!!!!!!! } else { log << "Result WTF?" << endl; - result->set_resulttype(result->UNKNOWN); + result->set_resulttype(result->UNKNOWN); //!!!!!!!! stringstream ss; - ss << "eventid " << ev->getId() << " EIP " << simulator.getRegisterManager().getInstructionPointer(); - result->set_details(ss.str()); - } - } + ss << "eventid " << /*ev->getId() << " EIP " << simulator.getRegisterManager().getInstructionPointer();*/ + //result->set_details(ss.str()); //!!!!!!! + + //result->set_details("test"); + // sanity check: do we have exactly 8 results? - if (param.msg.result_size() != 8) { + /*if (param.msg.result_size() != 8) { log << "WTF? param.msg.result_size() != 8" << endl; - } else { + } else {*/ #if !LOCAL m_jc.sendResult(param); #endif - } + //} #if !LOCAL }