diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index b9450e4e..cfc25c28 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -41,21 +41,30 @@ boost::mutex CommThread::m_CommMutex; ExperimentData *JobServer::getDone() { #ifndef __puma - if (m_undoneJobs.Size() == 0 - && noMoreExperiments() - && m_runningJobs.Size() == 0 - && m_doneJobs.Size() == 0 - && m_inOutCounter.getValue() == 0) { - return 0; + ExperimentData *exp = m_doneJobs.Dequeue(); + if (exp) { + m_inOutCounter.decrement(); } - - ExperimentData *exp = NULL; - exp = m_doneJobs.Dequeue(); - m_inOutCounter.decrement(); return exp; #endif } +void JobServer::setNoMoreExperiments() +{ +#ifndef __puma + boost::unique_lock lock(CommThread::m_CommMutex); +#endif + // currently not really necessary, as we only non-blockingly dequeue: + m_undoneJobs.setIsFinished(); + + m_noMoreExps = true; + if (m_undoneJobs.Size() == 0 && + noMoreExperiments() && + m_runningJobs.Size() == 0) { + m_doneJobs.setIsFinished(); + } +} + #ifdef SERVER_PERFORMANCE_MEASURE void JobServer::measure() { @@ -361,6 +370,12 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct } } + // all results complete? + if (m_js.m_undoneJobs.Size() == 0 && + m_js.noMoreExperiments() && + m_js.m_runningJobs.Size() == 0) { + m_js.m_doneJobs.setIsFinished(); + } } } // end-of-namespace: fail diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 7c1bf521..89928cb0 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -118,7 +118,7 @@ public: * sets. We need this, as we allow concurrent parameter generation and * distribution. */ - void setNoMoreExperiments() { m_noMoreExps = true; } + void setNoMoreExperiments(); /** * Checks whether there are no more experiment parameter sets. * @return \c true if no more parameter sets available, \c false otherwise diff --git a/src/core/util/SynchronizedQueue.hpp b/src/core/util/SynchronizedQueue.hpp index 5a9a4047..45d52330 100644 --- a/src/core/util/SynchronizedQueue.hpp +++ b/src/core/util/SynchronizedQueue.hpp @@ -18,14 +18,15 @@ class SynchronizedQueue { // Adapted from: http://www.quantnet.com/cplusplus-mul private: std::queue m_queue; //!< Use STL queue to store data unsigned capacity; + bool finished; #ifndef __puma boost::mutex m_mutex; //!< The mutex to synchronise on boost::condition_variable m_cond; //!< The condition to wait for boost::condition_variable m_cond_capacity; //!< Another condition to wait for #endif public: - SynchronizedQueue() : capacity(0) {} - SynchronizedQueue(unsigned capacity) : capacity(capacity) {} + SynchronizedQueue() : capacity(0), finished(false) {} + SynchronizedQueue(unsigned capacity) : capacity(capacity), finished(false) {} int Size() { #ifndef __puma @@ -69,6 +70,10 @@ public: // again after the wait #ifndef __puma while (m_queue.size() == 0) { + if (finished) { + // default-constructed T, 0 for integral types + return T(); + } m_cond.wait(lock); } #endif @@ -116,6 +121,17 @@ public: return false; } } // Lock is automatically released here + + void setIsFinished(bool value = true) + { +#ifndef __puma + boost::unique_lock lock(m_mutex); +#endif + finished = value; +#ifndef __puma + m_cond.notify_all(); +#endif + } }; } // end-of-namespace: fail