DatabaseCampaign: load completed pilots in memory

This change makes the DatabaseCampaign load all pilot_ids from the result
table in memory instead of LEFT JOINing them for each variant.  This vastly
improves campaign speed (possibly making commit 5567c59 superfluous) at the
cost of slightly increased startup time for half-completed (large)
campaigns.

By exploiting the generally continuous nature of pilot IDs and using a
boost::icl::interval_map, the additional memory requirements are
insignificant.

Change-Id: I1e744fb9ca33efea77a2a785cea3c94106f360df
This commit is contained in:
Horst Schirmeier
2014-04-08 14:55:55 +02:00
parent a8611d1ec0
commit cfd99fe3af
2 changed files with 80 additions and 32 deletions

View File

@ -79,6 +79,8 @@ bool DatabaseCampaign::run() {
boost::thread collect_thread(&DatabaseCampaign::collect_result_thread, this);
#endif
load_completed_pilots();
std::vector<Database::Variant> variants = db->get_variants(variant, benchmark);
for (std::vector<Database::Variant>::const_iterator it = variants.begin();
it != variants.end(); ++it) {
@ -126,36 +128,13 @@ void DatabaseCampaign::collect_result_thread() {
}
bool DatabaseCampaign::run_variant(Database::Variant variant) {
/* Copy pilot IDs of existing results to tmp table: otherwise, due to
* MyISAMs table-level locking, collect_result_thread() will block in
* INSERT (SHOW PROCESSLIST state "Waiting for table level lock") until the
* (streamed) pilot query finishes. As one pilot query follows after the
* other, collect_result_thread() may even starve until the memory for the
* JobServer's "done" queue runs out, resulting in a crash and the loss of
* all queued results. */
db->query("CREATE TEMPORARY TABLE IF NOT EXISTS result_ids (pilot_id INT NOT NULL PRIMARY KEY, count INT NOT NULL)");
db->query("TRUNCATE TABLE result_ids");
std::stringstream ss;
ss << "INSERT INTO result_ids "
<< "SELECT r.pilot_id, COUNT(*) FROM " << db_connect.result_table() << " r "
<< "JOIN fsppilot p ON r.pilot_id = p.id "
<< "WHERE p.fspmethod_id = " << fspmethod_id
<< " AND p.variant_id = " << variant.id
<< " GROUP BY r.pilot_id";
db->query(ss.str().c_str());
ss.str("");
/* Gather all unfinished jobs */
/* Gather jobs */
int experiment_count;
std::stringstream ss;
std::string sql_select = "SELECT p.id, p.fspmethod_id, p.variant_id, p.injection_instr, p.injection_instr_absolute, p.data_address, p.data_width ";
ss << " FROM fsppilot p "
<< " LEFT JOIN result_ids r ON r.pilot_id = p.id"
<< " WHERE p.fspmethod_id = " << fspmethod_id
<< " AND p.variant_id = " << variant.id
<< " AND (r.count"
<< " < " << expected_number_of_results(variant.variant, variant.benchmark)
<< " OR r.count IS NULL)"
<< " ORDER BY p.injection_instr";
<< " AND p.variant_id = " << variant.id;
std::string sql_body = ss.str();
/* Get the number of unfinished experiments */
@ -166,17 +145,23 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) {
MYSQL_RES *pilots = db->query_stream ((sql_select + sql_body).c_str());
log_send << "Found " << experiment_count << " unfinished experiments in database. ("
log_send << "Found " << experiment_count << " jobs in database. ("
<< variant.variant << "/" << variant.benchmark << ")" << std::endl;
unsigned sent_pilots = 0;
unsigned expected_results = expected_number_of_results(variant.variant, variant.benchmark);
unsigned sent_pilots = 0, skipped_pilots = 0;
while ((row = mysql_fetch_row(pilots)) != 0) {
unsigned pilot_id = strtoul(row[0], NULL, 10);
if (existing_results_for_pilot(pilot_id) == expected_results) {
skipped_pilots++;
continue;
}
unsigned injection_instr = strtoul(row[3], NULL, 10);
unsigned data_address = strtoul(row[5], NULL, 10);
unsigned data_width = strtoul(row[6], NULL, 10);
DatabaseCampaignMessage pilot;
pilot.set_pilot_id(pilot_id);
pilot.set_fspmethod_id(fspmethod_id);
@ -205,11 +190,63 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) {
return false;
}
log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl;
assert(experiment_count == sent_pilots && "ERROR: not all unfinished experiments pushed to queue");
log_send << "pushed " << sent_pilots << " pilots into the queue, skipped "
<< skipped_pilots << std::endl;
assert(experiment_count == sent_pilots + skipped_pilots &&
"ERROR: not all unfinished experiments pushed to queue");
mysql_free_result(pilots);
return true;
}
void DatabaseCampaign::load_completed_pilots()
{
// load list of partially or completely finished pilots
std::stringstream sql;
sql << "SELECT pilot_id, COUNT(*) FROM " << db_connect.result_table()
<< " GROUP BY pilot_id ";
MYSQL_RES *ids = db->query_stream(sql.str().c_str());
log_send << "loading completed pilot IDs ..." << std::endl;
MYSQL_ROW row;
unsigned rowcount = 0;
while ((row = mysql_fetch_row(ids)) != 0) {
unsigned pilot_id = strtoul(row[0], NULL, 10);
unsigned result_count = strtoul(row[1], NULL, 10);
#ifndef __puma
completed_pilots.add(
make_pair(
id_interval(pilot_id, pilot_id,
boost::icl::interval_bounds::closed()),
result_count));
#endif
if (((++rowcount) % 1000000) == 0) {
std::cerr << ".";
}
}
std::cerr << std::endl;
mysql_free_result(ids);
#ifndef __puma
log_send << "found "
<< completed_pilots.size() << " pilots ("
<< interval_count(completed_pilots) << " continuous ranges)" << std::endl;
/*
boost::icl::interval_map<unsigned, unsigned>::iterator it = completed_pilots.begin();
for (; it != completed_pilots.end(); ++it) {
std::cout << it->first << " : " << it->second << std::endl;
}
*/
#endif
}
unsigned DatabaseCampaign::existing_results_for_pilot(unsigned pilot_id)
{
#ifndef __puma
id_iterator it = completed_pilots.find(pilot_id);
if (it != completed_pilots.end()) {
return it->second;
}
#endif
return 0;
}

View File

@ -8,7 +8,9 @@
#include "comm/ExperimentData.hpp"
#include <google/protobuf/message.h>
#ifndef __puma
#include <boost/icl/interval_map.hpp>
#endif
namespace fail {
@ -27,6 +29,15 @@ class DatabaseCampaign : public Campaign {
int fspmethod_id; // !< Which fspmethod should be put out to the clients
void collect_result_thread();
void load_completed_pilots();
unsigned existing_results_for_pilot(unsigned pilot_id);
#ifndef __puma
typedef boost::icl::discrete_interval<unsigned>::type id_interval;
typedef boost::icl::interval_map<unsigned, unsigned>::type id_map;
typedef id_map::const_iterator id_iterator;
id_map completed_pilots; // !< map: Pilot IDs -> result count
#endif
public:
DatabaseCampaign() {};