jobserver: synchronization cleanup

This change cleans up in/out queue synchronization in the job server.
End-of-jobs conditions are now properly signaled through the
SynchronizedQueue, allowing to resume and abort blocked readers when
no more input is expected.

Change-Id: I3eaf37115ccf8c5b5afe3d971c7109cd62b68906
This commit is contained in:
Horst Schirmeier
2013-04-25 16:29:42 +02:00
parent 5ac108ea4b
commit 8505ddbb04
3 changed files with 44 additions and 13 deletions

View File

@ -41,21 +41,30 @@ boost::mutex CommThread::m_CommMutex;
ExperimentData *JobServer::getDone()
{
#ifndef __puma
if (m_undoneJobs.Size() == 0
&& noMoreExperiments()
&& m_runningJobs.Size() == 0
&& m_doneJobs.Size() == 0
&& m_inOutCounter.getValue() == 0) {
return 0;
ExperimentData *exp = m_doneJobs.Dequeue();
if (exp) {
m_inOutCounter.decrement();
}
ExperimentData *exp = NULL;
exp = m_doneJobs.Dequeue();
m_inOutCounter.decrement();
return exp;
#endif
}
void JobServer::setNoMoreExperiments()
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(CommThread::m_CommMutex);
#endif
// currently not really necessary, as we only non-blockingly dequeue:
m_undoneJobs.setIsFinished();
m_noMoreExps = true;
if (m_undoneJobs.Size() == 0 &&
noMoreExperiments() &&
m_runningJobs.Size() == 0) {
m_doneJobs.setIsFinished();
}
}
#ifdef SERVER_PERFORMANCE_MEASURE
void JobServer::measure()
{
@ -361,6 +370,12 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
}
}
// all results complete?
if (m_js.m_undoneJobs.Size() == 0 &&
m_js.noMoreExperiments() &&
m_js.m_runningJobs.Size() == 0) {
m_js.m_doneJobs.setIsFinished();
}
}
} // end-of-namespace: fail

View File

@ -118,7 +118,7 @@ public:
* sets. We need this, as we allow concurrent parameter generation and
* distribution.
*/
void setNoMoreExperiments() { m_noMoreExps = true; }
void setNoMoreExperiments();
/**
* Checks whether there are no more experiment parameter sets.
* @return \c true if no more parameter sets available, \c false otherwise

View File

@ -18,14 +18,15 @@ class SynchronizedQueue { // Adapted from: http://www.quantnet.com/cplusplus-mul
private:
std::queue<T> m_queue; //!< Use STL queue to store data
unsigned capacity;
bool finished;
#ifndef __puma
boost::mutex m_mutex; //!< The mutex to synchronise on
boost::condition_variable m_cond; //!< The condition to wait for
boost::condition_variable m_cond_capacity; //!< Another condition to wait for
#endif
public:
SynchronizedQueue() : capacity(0) {}
SynchronizedQueue(unsigned capacity) : capacity(capacity) {}
SynchronizedQueue() : capacity(0), finished(false) {}
SynchronizedQueue(unsigned capacity) : capacity(capacity), finished(false) {}
int Size()
{
#ifndef __puma
@ -69,6 +70,10 @@ public:
// again after the wait
#ifndef __puma
while (m_queue.size() == 0) {
if (finished) {
// default-constructed T, 0 for integral types
return T();
}
m_cond.wait(lock);
}
#endif
@ -116,6 +121,17 @@ public:
return false;
}
} // Lock is automatically released here
void setIsFinished(bool value = true)
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_mutex);
#endif
finished = value;
#ifndef __puma
m_cond.notify_all();
#endif
}
};
} // end-of-namespace: fail