From f89794329c5c32bb6136caf9bad753d1ba76ec41 Mon Sep 17 00:00:00 2001 From: Horst Schirmeier Date: Tue, 31 Jul 2018 12:33:17 +0200 Subject: [PATCH] JobServer: progress-report overhaul The JobServer progress-report output now shows the total number of completed jobs instead of the (almost always zero) inbound queue fill level. Additionally, the current number of incoming results per second is shown, which also prepares for an ETA calculation in the following commit. Change-Id: I6b71c45f44b9e6b9b17c059959a90068b51c165c --- src/core/cpn/JobServer.cc | 37 +++++++++++++++++++++++-------------- src/core/cpn/JobServer.hpp | 2 +- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index a521de5b..1c0a94de 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -178,10 +178,6 @@ void JobServer::addParam(ExperimentData* exp) #endif } -#ifdef SERVER_PERFORMANCE_MEASURE -volatile unsigned JobServer::m_DoneCount = 0; -#endif - ExperimentData *JobServer::getDone() { #ifndef __puma @@ -222,7 +218,7 @@ void JobServer::measure() unsigned counter = 0; m_file << "time\tthroughput" << endl; - unsigned diff = 0; + uint64_t diff = 0; while (!m_finish) { // Format: 1st column (seconds)[TAB]2nd column (throughput) m_file << counter << "\t" << (m_DoneCount - diff) << endl; @@ -282,13 +278,25 @@ void CommThread::print_progress(const enum ProgressType type, using namespace std::chrono; static system_clock::time_point last; const auto now = system_clock::now(); - if (last + seconds{1} > now) { + const auto delay = milliseconds{500}; + + if (last + delay > now) { return; } - last = now; - std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/" - << std::setw(6) << m_js.m_runningJobs.Size() << "/" - << std::setw(6) << m_js.m_doneJobs.Size() << " - "; + + const auto rate_alpha = .1; + static float rate = 0; + static uint64_t donecount_last = 0; + uint64_t donecount_cur = m_js.m_DoneCount; // assuming atomic here + rate = rate_alpha * + (donecount_cur - donecount_last) / duration(now - last).count() + + (1 - rate_alpha) * rate; + + std::cout + << std::setw(6) << m_js.m_undoneJobs.Size() << " out/" + << std::setw(6) << m_js.m_runningJobs.Size() << " run/" + << std::setw(6) << donecount_cur << " tot/ (" + << std::setw(6) << std::setprecision(1) << std::fixed << rate << "/s) "; const char *sep; if (type == ProgressType::Send) { sep = " >"; @@ -297,7 +305,10 @@ void CommThread::print_progress(const enum ProgressType type, } else { sep = " <"; } - std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush; + std::cout << sep << '[' << w_id << '+' << count << "] \r" << std::flush; + + last = now; + donecount_last = donecount_cur; } void CommThread::operator()(yield_context yield) @@ -484,9 +495,7 @@ void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg, const bool success = AsyncSocket::rcvMsg( m_socket, exp->getMessage(), yield); recvd.emplace_back(exp, w_id, success); - #ifdef SERVER_PERFORMANCE_MEASURE - ++JobServer::m_DoneCount; - #endif + ++m_js.m_DoneCount; } else { // We can receive several results for the same workload id because // we (may) distribute the (running) jobs to a *few* experiment-clients. diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 8704dae6..b5bcf3a1 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -51,8 +51,8 @@ private: //! unique server run ID uint64_t m_runid; + volatile uint64_t m_DoneCount = 0; //! the number of finished jobs #ifdef SERVER_PERFORMANCE_MEASURE - static volatile unsigned m_DoneCount; //! the number of finished jobs #ifndef __puma boost::thread* m_measureThread; //! the performance measurement thread #endif