From cfd99fe3af85f29ff48eb33989b4814e6468f73f Mon Sep 17 00:00:00 2001 From: Horst Schirmeier Date: Tue, 8 Apr 2014 14:55:55 +0200 Subject: [PATCH] DatabaseCampaign: load completed pilots in memory This change makes the DatabaseCampaign load all pilot_ids from the result table in memory instead of LEFT JOINing them for each variant. This vastly improves campaign speed (possibly making commit 5567c59 superfluous) at the cost of slightly increased startup time for half-completed (large) campaigns. By exploiting the generally continuous nature of pilot IDs and using a boost::icl::interval_map, the additional memory requirements are insignificant. Change-Id: I1e744fb9ca33efea77a2a785cea3c94106f360df --- src/core/cpn/DatabaseCampaign.cc | 99 +++++++++++++++++++++---------- src/core/cpn/DatabaseCampaign.hpp | 13 +++- 2 files changed, 80 insertions(+), 32 deletions(-) diff --git a/src/core/cpn/DatabaseCampaign.cc b/src/core/cpn/DatabaseCampaign.cc index 209321da..bb5efb24 100644 --- a/src/core/cpn/DatabaseCampaign.cc +++ b/src/core/cpn/DatabaseCampaign.cc @@ -79,6 +79,8 @@ bool DatabaseCampaign::run() { boost::thread collect_thread(&DatabaseCampaign::collect_result_thread, this); #endif + load_completed_pilots(); + std::vector variants = db->get_variants(variant, benchmark); for (std::vector::const_iterator it = variants.begin(); it != variants.end(); ++it) { @@ -126,36 +128,13 @@ void DatabaseCampaign::collect_result_thread() { } bool DatabaseCampaign::run_variant(Database::Variant variant) { - /* Copy pilot IDs of existing results to tmp table: otherwise, due to - * MyISAMs table-level locking, collect_result_thread() will block in - * INSERT (SHOW PROCESSLIST state "Waiting for table level lock") until the - * (streamed) pilot query finishes. As one pilot query follows after the - * other, collect_result_thread() may even starve until the memory for the - * JobServer's "done" queue runs out, resulting in a crash and the loss of - * all queued results. */ - db->query("CREATE TEMPORARY TABLE IF NOT EXISTS result_ids (pilot_id INT NOT NULL PRIMARY KEY, count INT NOT NULL)"); - db->query("TRUNCATE TABLE result_ids"); - std::stringstream ss; - ss << "INSERT INTO result_ids " - << "SELECT r.pilot_id, COUNT(*) FROM " << db_connect.result_table() << " r " - << "JOIN fsppilot p ON r.pilot_id = p.id " - << "WHERE p.fspmethod_id = " << fspmethod_id - << " AND p.variant_id = " << variant.id - << " GROUP BY r.pilot_id"; - db->query(ss.str().c_str()); - ss.str(""); - - /* Gather all unfinished jobs */ + /* Gather jobs */ int experiment_count; + std::stringstream ss; std::string sql_select = "SELECT p.id, p.fspmethod_id, p.variant_id, p.injection_instr, p.injection_instr_absolute, p.data_address, p.data_width "; ss << " FROM fsppilot p " - << " LEFT JOIN result_ids r ON r.pilot_id = p.id" << " WHERE p.fspmethod_id = " << fspmethod_id - << " AND p.variant_id = " << variant.id - << " AND (r.count" - << " < " << expected_number_of_results(variant.variant, variant.benchmark) - << " OR r.count IS NULL)" - << " ORDER BY p.injection_instr"; + << " AND p.variant_id = " << variant.id; std::string sql_body = ss.str(); /* Get the number of unfinished experiments */ @@ -166,17 +145,23 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) { MYSQL_RES *pilots = db->query_stream ((sql_select + sql_body).c_str()); - log_send << "Found " << experiment_count << " unfinished experiments in database. (" + log_send << "Found " << experiment_count << " jobs in database. (" << variant.variant << "/" << variant.benchmark << ")" << std::endl; - unsigned sent_pilots = 0; + unsigned expected_results = expected_number_of_results(variant.variant, variant.benchmark); + + unsigned sent_pilots = 0, skipped_pilots = 0; while ((row = mysql_fetch_row(pilots)) != 0) { unsigned pilot_id = strtoul(row[0], NULL, 10); + if (existing_results_for_pilot(pilot_id) == expected_results) { + skipped_pilots++; + continue; + } + unsigned injection_instr = strtoul(row[3], NULL, 10); unsigned data_address = strtoul(row[5], NULL, 10); unsigned data_width = strtoul(row[6], NULL, 10); - DatabaseCampaignMessage pilot; pilot.set_pilot_id(pilot_id); pilot.set_fspmethod_id(fspmethod_id); @@ -205,11 +190,63 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) { return false; } - log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl; - assert(experiment_count == sent_pilots && "ERROR: not all unfinished experiments pushed to queue"); + log_send << "pushed " << sent_pilots << " pilots into the queue, skipped " + << skipped_pilots << std::endl; + assert(experiment_count == sent_pilots + skipped_pilots && + "ERROR: not all unfinished experiments pushed to queue"); mysql_free_result(pilots); return true; } + +void DatabaseCampaign::load_completed_pilots() +{ + // load list of partially or completely finished pilots + std::stringstream sql; + sql << "SELECT pilot_id, COUNT(*) FROM " << db_connect.result_table() + << " GROUP BY pilot_id "; + MYSQL_RES *ids = db->query_stream(sql.str().c_str()); + log_send << "loading completed pilot IDs ..." << std::endl; + MYSQL_ROW row; + unsigned rowcount = 0; + while ((row = mysql_fetch_row(ids)) != 0) { + unsigned pilot_id = strtoul(row[0], NULL, 10); + unsigned result_count = strtoul(row[1], NULL, 10); +#ifndef __puma + completed_pilots.add( + make_pair( + id_interval(pilot_id, pilot_id, + boost::icl::interval_bounds::closed()), + result_count)); +#endif + if (((++rowcount) % 1000000) == 0) { + std::cerr << "."; + } + } + std::cerr << std::endl; + mysql_free_result(ids); +#ifndef __puma + log_send << "found " + << completed_pilots.size() << " pilots (" + << interval_count(completed_pilots) << " continuous ranges)" << std::endl; +/* + boost::icl::interval_map::iterator it = completed_pilots.begin(); + for (; it != completed_pilots.end(); ++it) { + std::cout << it->first << " : " << it->second << std::endl; + } +*/ +#endif +} + +unsigned DatabaseCampaign::existing_results_for_pilot(unsigned pilot_id) +{ +#ifndef __puma + id_iterator it = completed_pilots.find(pilot_id); + if (it != completed_pilots.end()) { + return it->second; + } +#endif + return 0; +} diff --git a/src/core/cpn/DatabaseCampaign.hpp b/src/core/cpn/DatabaseCampaign.hpp index d3ec7e98..11f4a8f7 100644 --- a/src/core/cpn/DatabaseCampaign.hpp +++ b/src/core/cpn/DatabaseCampaign.hpp @@ -8,7 +8,9 @@ #include "comm/ExperimentData.hpp" #include - +#ifndef __puma +#include +#endif namespace fail { @@ -27,6 +29,15 @@ class DatabaseCampaign : public Campaign { int fspmethod_id; // !< Which fspmethod should be put out to the clients void collect_result_thread(); + void load_completed_pilots(); + unsigned existing_results_for_pilot(unsigned pilot_id); + +#ifndef __puma + typedef boost::icl::discrete_interval::type id_interval; + typedef boost::icl::interval_map::type id_map; + typedef id_map::const_iterator id_iterator; + id_map completed_pilots; // !< map: Pilot IDs -> result count +#endif public: DatabaseCampaign() {};