diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index a521de5b..1c0a94de 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -178,10 +178,6 @@ void JobServer::addParam(ExperimentData* exp) #endif } -#ifdef SERVER_PERFORMANCE_MEASURE -volatile unsigned JobServer::m_DoneCount = 0; -#endif - ExperimentData *JobServer::getDone() { #ifndef __puma @@ -222,7 +218,7 @@ void JobServer::measure() unsigned counter = 0; m_file << "time\tthroughput" << endl; - unsigned diff = 0; + uint64_t diff = 0; while (!m_finish) { // Format: 1st column (seconds)[TAB]2nd column (throughput) m_file << counter << "\t" << (m_DoneCount - diff) << endl; @@ -282,13 +278,25 @@ void CommThread::print_progress(const enum ProgressType type, using namespace std::chrono; static system_clock::time_point last; const auto now = system_clock::now(); - if (last + seconds{1} > now) { + const auto delay = milliseconds{500}; + + if (last + delay > now) { return; } - last = now; - std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/" - << std::setw(6) << m_js.m_runningJobs.Size() << "/" - << std::setw(6) << m_js.m_doneJobs.Size() << " - "; + + const auto rate_alpha = .1; + static float rate = 0; + static uint64_t donecount_last = 0; + uint64_t donecount_cur = m_js.m_DoneCount; // assuming atomic here + rate = rate_alpha * + (donecount_cur - donecount_last) / duration(now - last).count() + + (1 - rate_alpha) * rate; + + std::cout + << std::setw(6) << m_js.m_undoneJobs.Size() << " out/" + << std::setw(6) << m_js.m_runningJobs.Size() << " run/" + << std::setw(6) << donecount_cur << " tot/ (" + << std::setw(6) << std::setprecision(1) << std::fixed << rate << "/s) "; const char *sep; if (type == ProgressType::Send) { sep = " >"; @@ -297,7 +305,10 @@ void CommThread::print_progress(const enum ProgressType type, } else { sep = " <"; } - std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush; + std::cout << sep << '[' << w_id << '+' << count << "] \r" << std::flush; + + last = now; + donecount_last = donecount_cur; } void CommThread::operator()(yield_context yield) @@ -484,9 +495,7 @@ void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg, const bool success = AsyncSocket::rcvMsg( m_socket, exp->getMessage(), yield); recvd.emplace_back(exp, w_id, success); - #ifdef SERVER_PERFORMANCE_MEASURE - ++JobServer::m_DoneCount; - #endif + ++m_js.m_DoneCount; } else { // We can receive several results for the same workload id because // we (may) distribute the (running) jobs to a *few* experiment-clients. diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 8704dae6..b5bcf3a1 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -51,8 +51,8 @@ private: //! unique server run ID uint64_t m_runid; + volatile uint64_t m_DoneCount = 0; //! the number of finished jobs #ifdef SERVER_PERFORMANCE_MEASURE - static volatile unsigned m_DoneCount; //! the number of finished jobs #ifndef __puma boost::thread* m_measureThread; //! the performance measurement thread #endif