ecos: split campaign into job producer and consumer

When we want to use a bounded job queue, job producer and job consumer must
run in different threads in order not to deadlock the campaign.  Ideally
this functionality moves to the CampaignManager later (including the
retrieval of existing experiment results and storing new results).

git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1946 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
hsc
2012-11-20 15:02:01 +00:00
parent 127161ef5a
commit dfe4a5d4c0
2 changed files with 69 additions and 46 deletions

View File

@ -170,6 +170,11 @@ bool EcosKernelTestCampaign::run()
return false;
}
// collect results in parallel to avoid deadlock
#ifndef __puma
boost::thread collect_thread(&EcosKernelTestCampaign::collect_results, this);
#endif
for (int variant_nr = 0; variants[variant_nr]; ++variant_nr) {
char const *variant = variants[variant_nr];
for (int benchmark_nr = 0; benchmarks[benchmark_nr]; ++benchmark_nr) {
@ -323,52 +328,9 @@ bool EcosKernelTestCampaign::run()
<< endl;
// collect results
EcosKernelTestExperimentData *res;
while ((res = static_cast<EcosKernelTestExperimentData *>(campaignmanager.getDone()))) {
// sanity check
if (res->msg.result_size() != 8) {
m_log << "wtf, result_size = " << res->msg.result_size() << endl;
continue;
}
EcosKernelTestProtoMsg_Result const *prev_singleres = 0;
int first_bit = 0, bit_width = 0;
// one job contains 8 experiments
for (int idx = 0; idx < res->msg.result_size(); ++idx) {
EcosKernelTestProtoMsg_Result const *cur_singleres = &res->msg.result(idx);
if (!prev_singleres) {
prev_singleres = cur_singleres;
first_bit = cur_singleres->bit_offset();
bit_width = 1;
continue;
}
// compatible? merge.
if (cur_singleres->bit_offset() == first_bit + bit_width // neighbor?
&& prev_singleres->resulttype() == cur_singleres->resulttype()
&& prev_singleres->latest_ip() == cur_singleres->latest_ip()
&& prev_singleres->ecos_test_result() == cur_singleres->ecos_test_result()
&& prev_singleres->error_corrected() == cur_singleres->error_corrected()
&& prev_singleres->details() == cur_singleres->details()) {
bit_width++;
continue;
}
add_result(res->msg.variant(), res->msg.benchmark(), res->msg.instr1_offset(),
res->msg.instr2_offset(), res->msg.instr2_address(), res->msg.mem_addr(),
first_bit, bit_width, prev_singleres->resulttype(), prev_singleres->ecos_test_result(),
prev_singleres->latest_ip(), prev_singleres->error_corrected(), prev_singleres->details(),
res->msg.runtime() * bit_width / 8.0);
prev_singleres = cur_singleres;
first_bit = cur_singleres->bit_offset();
bit_width = 1;
}
add_result(res->msg.variant(), res->msg.benchmark(), res->msg.instr1_offset(),
res->msg.instr2_offset(), res->msg.instr2_address(), res->msg.mem_addr(),
first_bit, bit_width, prev_singleres->resulttype(), prev_singleres->ecos_test_result(),
prev_singleres->latest_ip(), prev_singleres->error_corrected(), prev_singleres->details(),
res->msg.runtime() * bit_width / 8.0);
delete res;
}
#ifndef __puma
collect_thread.join();
#endif
finalize_results();
m_log << "done." << endl;
@ -503,6 +465,9 @@ void EcosKernelTestCampaign::add_result(const std::string& variant, const std::s
int bitnr, int bit_width, int resulttype, int ecos_test_result, address_t latest_ip,
int error_corrected, const std::string& details, float runtime)
{
#ifndef __puma
boost::lock_guard<boost::mutex> guard(m_result_mutex);
#endif
resultstream << hex
<< variant << "\t"
<< benchmark << "\t"
@ -525,3 +490,53 @@ void EcosKernelTestCampaign::finalize_results()
{
resultstream.close();
}
void EcosKernelTestCampaign::collect_results()
{
EcosKernelTestExperimentData *res;
while ((res = static_cast<EcosKernelTestExperimentData *>(campaignmanager.getDone()))) {
// sanity check
if (res->msg.result_size() != 8) {
m_log << "wtf, result_size = " << res->msg.result_size() << endl;
continue;
}
EcosKernelTestProtoMsg_Result const *prev_singleres = 0;
int first_bit = 0, bit_width = 0;
// one job contains 8 experiments
for (int idx = 0; idx < res->msg.result_size(); ++idx) {
EcosKernelTestProtoMsg_Result const *cur_singleres = &res->msg.result(idx);
if (!prev_singleres) {
prev_singleres = cur_singleres;
first_bit = cur_singleres->bit_offset();
bit_width = 1;
continue;
}
// compatible? merge.
if (cur_singleres->bit_offset() == first_bit + bit_width // neighbor?
&& prev_singleres->resulttype() == cur_singleres->resulttype()
&& prev_singleres->latest_ip() == cur_singleres->latest_ip()
&& prev_singleres->ecos_test_result() == cur_singleres->ecos_test_result()
&& prev_singleres->error_corrected() == cur_singleres->error_corrected()
&& prev_singleres->details() == cur_singleres->details()) {
bit_width++;
continue;
}
add_result(res->msg.variant(), res->msg.benchmark(), res->msg.instr1_offset(),
res->msg.instr2_offset(), res->msg.instr2_address(), res->msg.mem_addr(),
first_bit, bit_width, prev_singleres->resulttype(), prev_singleres->ecos_test_result(),
prev_singleres->latest_ip(), prev_singleres->error_corrected(), prev_singleres->details(),
res->msg.runtime() * bit_width / 8.0);
prev_singleres = cur_singleres;
first_bit = cur_singleres->bit_offset();
bit_width = 1;
}
add_result(res->msg.variant(), res->msg.benchmark(), res->msg.instr1_offset(),
res->msg.instr2_offset(), res->msg.instr2_address(), res->msg.mem_addr(),
first_bit, bit_width, prev_singleres->resulttype(), prev_singleres->ecos_test_result(),
prev_singleres->latest_ip(), prev_singleres->error_corrected(), prev_singleres->details(),
res->msg.runtime() * bit_width / 8.0);
delete res;
}
}

View File

@ -3,6 +3,10 @@
#include <string>
#include <fstream>
#ifndef __puma
#include <boost/thread.hpp>
#endif
#include "cpn/Campaign.hpp"
#include "comm/ExperimentData.hpp"
#include "ecos_kernel_test.pb.h"
@ -32,10 +36,14 @@ class EcosKernelTestCampaign : public fail::Campaign {
int bitnr, int bit_width, int resulttype, int ecos_test_result, fail::address_t latest_ip,
int error_corrected, const std::string& details, float runtime);
void finalize_results();
void collect_results();
bool check_available(const std::string& variant, const std::string& benchmark, fail::address_t data_address, int instr2);
std::ofstream resultstream;
typedef std::map<std::pair<const std::string, const std::string>, std::map<fail::address_t, std::set<int> > > AvailableResultMap;
AvailableResultMap available_results;
#ifndef __puma
boost::mutex m_result_mutex;
#endif
public:
EcosKernelTestCampaign() : m_log("EcosKernelTest Campaign"),
count_exp(0), count_exp_jobs(0), count_known(0), count_known_jobs(0) {}