JobServer: progress-report overhaul
The JobServer progress-report output now shows the total number of completed jobs instead of the (almost always zero) inbound queue fill level. Additionally, the current number of incoming results per second is shown, which also prepares for an ETA calculation in the following commit. Change-Id: I6b71c45f44b9e6b9b17c059959a90068b51c165c
This commit is contained in:
@ -178,10 +178,6 @@ void JobServer::addParam(ExperimentData* exp)
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
volatile unsigned JobServer::m_DoneCount = 0;
|
||||
#endif
|
||||
|
||||
ExperimentData *JobServer::getDone()
|
||||
{
|
||||
#ifndef __puma
|
||||
@ -222,7 +218,7 @@ void JobServer::measure()
|
||||
unsigned counter = 0;
|
||||
|
||||
m_file << "time\tthroughput" << endl;
|
||||
unsigned diff = 0;
|
||||
uint64_t diff = 0;
|
||||
while (!m_finish) {
|
||||
// Format: 1st column (seconds)[TAB]2nd column (throughput)
|
||||
m_file << counter << "\t" << (m_DoneCount - diff) << endl;
|
||||
@ -282,13 +278,25 @@ void CommThread::print_progress(const enum ProgressType type,
|
||||
using namespace std::chrono;
|
||||
static system_clock::time_point last;
|
||||
const auto now = system_clock::now();
|
||||
if (last + seconds{1} > now) {
|
||||
const auto delay = milliseconds{500};
|
||||
|
||||
if (last + delay > now) {
|
||||
return;
|
||||
}
|
||||
last = now;
|
||||
std::cout << std::setw(6) << m_js.m_undoneJobs.Size() << "/"
|
||||
<< std::setw(6) << m_js.m_runningJobs.Size() << "/"
|
||||
<< std::setw(6) << m_js.m_doneJobs.Size() << " - ";
|
||||
|
||||
const auto rate_alpha = .1;
|
||||
static float rate = 0;
|
||||
static uint64_t donecount_last = 0;
|
||||
uint64_t donecount_cur = m_js.m_DoneCount; // assuming atomic here
|
||||
rate = rate_alpha *
|
||||
(donecount_cur - donecount_last) / duration<float>(now - last).count() +
|
||||
(1 - rate_alpha) * rate;
|
||||
|
||||
std::cout
|
||||
<< std::setw(6) << m_js.m_undoneJobs.Size() << " out/"
|
||||
<< std::setw(6) << m_js.m_runningJobs.Size() << " run/"
|
||||
<< std::setw(6) << donecount_cur << " tot/ ("
|
||||
<< std::setw(6) << std::setprecision(1) << std::fixed << rate << "/s) ";
|
||||
const char *sep;
|
||||
if (type == ProgressType::Send) {
|
||||
sep = " >";
|
||||
@ -297,7 +305,10 @@ void CommThread::print_progress(const enum ProgressType type,
|
||||
} else {
|
||||
sep = " <";
|
||||
}
|
||||
std::cout << sep << '[' << w_id << '+' << count << "]\r" << std::flush;
|
||||
std::cout << sep << '[' << w_id << '+' << count << "] \r" << std::flush;
|
||||
|
||||
last = now;
|
||||
donecount_last = donecount_cur;
|
||||
}
|
||||
|
||||
void CommThread::operator()(yield_context yield)
|
||||
@ -484,9 +495,7 @@ void CommThread::receiveExperimentResults(FailControlMessage &ctrlmsg,
|
||||
const bool success = AsyncSocket::rcvMsg(
|
||||
m_socket, exp->getMessage(), yield);
|
||||
recvd.emplace_back(exp, w_id, success);
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
++JobServer::m_DoneCount;
|
||||
#endif
|
||||
++m_js.m_DoneCount;
|
||||
} else {
|
||||
// We can receive several results for the same workload id because
|
||||
// we (may) distribute the (running) jobs to a *few* experiment-clients.
|
||||
|
||||
@ -51,8 +51,8 @@ private:
|
||||
//! unique server run ID
|
||||
uint64_t m_runid;
|
||||
|
||||
volatile uint64_t m_DoneCount = 0; //! the number of finished jobs
|
||||
#ifdef SERVER_PERFORMANCE_MEASURE
|
||||
static volatile unsigned m_DoneCount; //! the number of finished jobs
|
||||
#ifndef __puma
|
||||
boost::thread* m_measureThread; //! the performance measurement thread
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user