diff --git a/src/core/cpn/DatabaseCampaign.cc b/src/core/cpn/DatabaseCampaign.cc index 209321da..bb5efb24 100644 --- a/src/core/cpn/DatabaseCampaign.cc +++ b/src/core/cpn/DatabaseCampaign.cc @@ -79,6 +79,8 @@ bool DatabaseCampaign::run() { boost::thread collect_thread(&DatabaseCampaign::collect_result_thread, this); #endif + load_completed_pilots(); + std::vector variants = db->get_variants(variant, benchmark); for (std::vector::const_iterator it = variants.begin(); it != variants.end(); ++it) { @@ -126,36 +128,13 @@ void DatabaseCampaign::collect_result_thread() { } bool DatabaseCampaign::run_variant(Database::Variant variant) { - /* Copy pilot IDs of existing results to tmp table: otherwise, due to - * MyISAMs table-level locking, collect_result_thread() will block in - * INSERT (SHOW PROCESSLIST state "Waiting for table level lock") until the - * (streamed) pilot query finishes. As one pilot query follows after the - * other, collect_result_thread() may even starve until the memory for the - * JobServer's "done" queue runs out, resulting in a crash and the loss of - * all queued results. */ - db->query("CREATE TEMPORARY TABLE IF NOT EXISTS result_ids (pilot_id INT NOT NULL PRIMARY KEY, count INT NOT NULL)"); - db->query("TRUNCATE TABLE result_ids"); - std::stringstream ss; - ss << "INSERT INTO result_ids " - << "SELECT r.pilot_id, COUNT(*) FROM " << db_connect.result_table() << " r " - << "JOIN fsppilot p ON r.pilot_id = p.id " - << "WHERE p.fspmethod_id = " << fspmethod_id - << " AND p.variant_id = " << variant.id - << " GROUP BY r.pilot_id"; - db->query(ss.str().c_str()); - ss.str(""); - - /* Gather all unfinished jobs */ + /* Gather jobs */ int experiment_count; + std::stringstream ss; std::string sql_select = "SELECT p.id, p.fspmethod_id, p.variant_id, p.injection_instr, p.injection_instr_absolute, p.data_address, p.data_width "; ss << " FROM fsppilot p " - << " LEFT JOIN result_ids r ON r.pilot_id = p.id" << " WHERE p.fspmethod_id = " << fspmethod_id - << " AND p.variant_id = " << variant.id - << " AND (r.count" - << " < " << expected_number_of_results(variant.variant, variant.benchmark) - << " OR r.count IS NULL)" - << " ORDER BY p.injection_instr"; + << " AND p.variant_id = " << variant.id; std::string sql_body = ss.str(); /* Get the number of unfinished experiments */ @@ -166,17 +145,23 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) { MYSQL_RES *pilots = db->query_stream ((sql_select + sql_body).c_str()); - log_send << "Found " << experiment_count << " unfinished experiments in database. (" + log_send << "Found " << experiment_count << " jobs in database. (" << variant.variant << "/" << variant.benchmark << ")" << std::endl; - unsigned sent_pilots = 0; + unsigned expected_results = expected_number_of_results(variant.variant, variant.benchmark); + + unsigned sent_pilots = 0, skipped_pilots = 0; while ((row = mysql_fetch_row(pilots)) != 0) { unsigned pilot_id = strtoul(row[0], NULL, 10); + if (existing_results_for_pilot(pilot_id) == expected_results) { + skipped_pilots++; + continue; + } + unsigned injection_instr = strtoul(row[3], NULL, 10); unsigned data_address = strtoul(row[5], NULL, 10); unsigned data_width = strtoul(row[6], NULL, 10); - DatabaseCampaignMessage pilot; pilot.set_pilot_id(pilot_id); pilot.set_fspmethod_id(fspmethod_id); @@ -205,11 +190,63 @@ bool DatabaseCampaign::run_variant(Database::Variant variant) { return false; } - log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl; - assert(experiment_count == sent_pilots && "ERROR: not all unfinished experiments pushed to queue"); + log_send << "pushed " << sent_pilots << " pilots into the queue, skipped " + << skipped_pilots << std::endl; + assert(experiment_count == sent_pilots + skipped_pilots && + "ERROR: not all unfinished experiments pushed to queue"); mysql_free_result(pilots); return true; } + +void DatabaseCampaign::load_completed_pilots() +{ + // load list of partially or completely finished pilots + std::stringstream sql; + sql << "SELECT pilot_id, COUNT(*) FROM " << db_connect.result_table() + << " GROUP BY pilot_id "; + MYSQL_RES *ids = db->query_stream(sql.str().c_str()); + log_send << "loading completed pilot IDs ..." << std::endl; + MYSQL_ROW row; + unsigned rowcount = 0; + while ((row = mysql_fetch_row(ids)) != 0) { + unsigned pilot_id = strtoul(row[0], NULL, 10); + unsigned result_count = strtoul(row[1], NULL, 10); +#ifndef __puma + completed_pilots.add( + make_pair( + id_interval(pilot_id, pilot_id, + boost::icl::interval_bounds::closed()), + result_count)); +#endif + if (((++rowcount) % 1000000) == 0) { + std::cerr << "."; + } + } + std::cerr << std::endl; + mysql_free_result(ids); +#ifndef __puma + log_send << "found " + << completed_pilots.size() << " pilots (" + << interval_count(completed_pilots) << " continuous ranges)" << std::endl; +/* + boost::icl::interval_map::iterator it = completed_pilots.begin(); + for (; it != completed_pilots.end(); ++it) { + std::cout << it->first << " : " << it->second << std::endl; + } +*/ +#endif +} + +unsigned DatabaseCampaign::existing_results_for_pilot(unsigned pilot_id) +{ +#ifndef __puma + id_iterator it = completed_pilots.find(pilot_id); + if (it != completed_pilots.end()) { + return it->second; + } +#endif + return 0; +} diff --git a/src/core/cpn/DatabaseCampaign.hpp b/src/core/cpn/DatabaseCampaign.hpp index d3ec7e98..11f4a8f7 100644 --- a/src/core/cpn/DatabaseCampaign.hpp +++ b/src/core/cpn/DatabaseCampaign.hpp @@ -8,7 +8,9 @@ #include "comm/ExperimentData.hpp" #include - +#ifndef __puma +#include +#endif namespace fail { @@ -27,6 +29,15 @@ class DatabaseCampaign : public Campaign { int fspmethod_id; // !< Which fspmethod should be put out to the clients void collect_result_thread(); + void load_completed_pilots(); + unsigned existing_results_for_pilot(unsigned pilot_id); + +#ifndef __puma + typedef boost::icl::discrete_interval::type id_interval; + typedef boost::icl::interval_map::type id_map; + typedef id_map::const_iterator id_iterator; + id_map completed_pilots; // !< map: Pilot IDs -> result count +#endif public: DatabaseCampaign() {};