DatabaseCampaign: abstract campain for interaction with MySQL Database

The DatabaseCampaign interacts with the MySQL tables that are created
by the import-trace and prune-trace tools. It does offer all
unfinished experiment pilots from the database to the
fail-clients. Those clients send back a (by the experiment) defined
protobuf message as a result. The custom protobuf message does have to
need the form:

   import "DatabaseCampaignMessage.proto";

   message ExperimentMsg {
       required DatabaseCampaignMessage fsppilot = 1;

       repeated group Result = 2 {
          // custom fields
          required int32 bitoffset = 1;
          optional int32 result = 2;
       }
   }

The DatabaseCampaignMessage is the pilot identifier from the
database. For each of the repeated result entries a row in a table is
allocated. The structure of this table is constructed (by protobuf
reflection) from the description of the message. Each field in the
Result group becomes a column in the result table. For the given
example it would be:

    CREATE TABLE result_ExperimentMessage(
           pilot_id INT,
           bitoffset INT NOT NULL,
           result INT,
           PRIMARY_KEY(pilot_id)
    )

Change-Id: I28fb5488e739d4098b823b42426c5760331027f8
This commit is contained in:
Christian Dietrich
2013-03-25 16:04:05 +01:00
parent 59e5fd3169
commit f18cddc63c
25 changed files with 1161 additions and 119 deletions

View File

@ -1,6 +1,7 @@
set(SRCS
CampaignManager.cc
JobServer.cc
DatabaseCampaign.cc
)
add_library(fail-cpn ${SRCS})

View File

@ -0,0 +1,147 @@
#include "DatabaseCampaign.hpp"
#include "cpn/CampaignManager.hpp"
#include "util/CommandLine.hpp"
#include "util/Logger.hpp"
#include "util/Database.hpp"
#include "comm/ExperimentData.hpp"
#ifndef __puma
#include <boost/thread.hpp>
#endif
using namespace fail;
static Logger log_recv("DatabaseCampaign::recv");
static Logger log_send("DatabaseCampaign");
bool DatabaseCampaign::run() {
CommandLine &cmd = CommandLine::Inst();
cmd.addOption("", "", Arg::None, "USAGE: fail-server [options...]\n\n");
CommandLine::option_handle HELP = cmd.addOption("h", "help", Arg::None, "-h,--help\t Print usage and exit");
Database::cmdline_setup();
/* Give the implementation the chance to add stuff to the command
line interface */
if (!cb_commandline_init()) return false;
CommandLine::option_handle VARIANT = cmd.addOption("v", "variant", Arg::Required,
"-v/--variant\t Variant label (default: \"none\")");
CommandLine::option_handle BENCHMARK = cmd.addOption("b", "benchmark", Arg::Required,
"-b/--benchmark\t Benchmark label (default: \"none\")\n");
CommandLine::option_handle PRUNER = cmd.addOption("p", "prune-method", Arg::Required,
"-p/--prune-method\t Which import method to use (default: basic)");
if(!cmd.parse()) {
log_send << "Error parsing arguments." << std::endl;
exit(-1);
}
if (cmd[HELP]) {
cmd.printUsage();
exit(0);
}
std::string variant, benchmark, pruner;
if (cmd[VARIANT].count() > 0)
variant = std::string(cmd[VARIANT].first()->arg);
else
variant = "none";
if (cmd[BENCHMARK].count() > 0)
benchmark = std::string(cmd[BENCHMARK].first()->arg);
else
benchmark = "none";
if (cmd[PRUNER].count() > 0)
pruner = std::string(cmd[PRUNER].first()->arg);
else
pruner = "basic";
db = Database::cmdline_connect();
variant_id = db->get_variant_id(variant, benchmark);
log_send << "Variant to use " << variant << "/" << benchmark << " (ID: " << variant_id << ")" << std::endl;
fspmethod_id = db->get_fspmethod_id(pruner);
log_send << "Pruner to use " << pruner << " (ID: " << fspmethod_id << ")" << std::endl;
/* Set up the adapter that maps the results into the MySQL
Database */
db_connect.set_database_handle(db);
const google::protobuf::Descriptor *desc = cb_result_message();
db_connect.create_table(desc);
// collect results in parallel to avoid deadlock
#ifndef __puma
boost::thread collect_thread(&DatabaseCampaign::collect_result_thread, this);
#endif
/* Gather all unfinished jobs */
int experiment_count;
std::string sql_select = "SELECT pilot_id, fspgroup.fspmethod_id, fspgroup.variant_id, fspgroup.instr2, fspgroup.data_address ";
std::stringstream ss;
ss << " FROM fspgroup INNER JOIN fsppilot ON fsppilot.id = fspgroup.pilot_id "
<< " WHERE known_outcome = 0 "
<< " AND fspgroup.fspmethod_id = " << fspmethod_id
<< " AND fspgroup.variant_id = " << variant_id
// << " AND fsppilot.data_address = 1346688"
<< " AND (SELECT COUNT(*) FROM " + db_connect.result_table() + " as r WHERE r.pilot_id = fspgroup.pilot_id) = 0"
<< " ORDER BY fspgroup.instr2";
std::string sql_body = ss.str();
/* Get the number of unfinished experiments */
MYSQL_RES *count = db->query(("SELECT COUNT(*) " + sql_body).c_str(), true);
MYSQL_ROW row = mysql_fetch_row(count);
experiment_count = atoi(row[0]);
MYSQL_RES *pilots = db->query_stream ((sql_select + sql_body).c_str());
log_send << "Found " << experiment_count << " unfinished experiments in database." << std::endl;
sent_pilots = 0;
while ((row = mysql_fetch_row(pilots)) != 0) {
unsigned pilot_id = atoi(row[0]);
unsigned instr2 = atoi(row[3]);
unsigned data_address = atoi(row[4]);
DatabaseCampaignMessage pilot;
pilot.set_pilot_id(pilot_id);
pilot.set_fspmethod_id(fspmethod_id);
pilot.set_variant_id(variant_id);
pilot.set_instr2(instr2);
pilot.set_data_address(data_address);
this->cb_send_pilot(pilot);
if ((++sent_pilots) % 1000 == 0) {
log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl;
}
}
log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl;
log_send << "wait for the clients to complete" << std::endl;
campaignmanager.noMoreParameters();
#ifndef __puma
collect_thread.join();
#endif
return true;
}
void DatabaseCampaign::collect_result_thread() {
log_recv << "Started result receive thread" << std::endl;
ExperimentData *res;
while ((res = static_cast<ExperimentData *>(campaignmanager.getDone()))) {
db_connect.insert_row(&res->getMessage());
delete res;
}
}

View File

@ -0,0 +1,66 @@
#ifndef __CPN_DATABASE_CAMPAIGN_H__
#define __CPN_DATABASE_CAMPAIGN_H__
#include "util/Database.hpp"
#include "util/DatabaseProtobufAdapter.hpp"
#include "comm/DatabaseCampaignMessage.pb.h"
#include "Campaign.hpp"
#include "comm/ExperimentData.hpp"
#include <google/protobuf/message.h>
namespace fail {
/**
* \class Campaign
*
* Interface for a generic database driven campaign. It uses the
* results from mysql tables that are genrated by the import-trace and
* prune-trace.
*/
class DatabaseCampaign : public Campaign {
Database *db; // !< The database connection object
DatabaseProtobufAdapter db_connect;
int variant_id; // !< Which variant do we work on (from CMDLINE)
int fspmethod_id; // !< Which fspmethod should be put out to the clients
void collect_result_thread();
int sent_pilots;
public:
DatabaseCampaign() {};
/**
* Defines the campaign. In the DatabaseCampaign the database
* connection is done
* @return \c true if the campaign was successful, \c false otherwise
*/
virtual bool run();
/**
* Callback function that can be used to add command line options
* to the campaign
*/
virtual bool cb_commandline_init() { return true; }
/**
* Callback to the campagin to get the result message descriptor
*/
virtual const google::protobuf::Descriptor * cb_result_message() = 0;
/**
* Callback that gets a DatabaseExperimentData instance, that is
* filled with a concrete experiment pilot from the database. The
* application should wrap the DatabaseCampaignMessage pilot into
* a custom message and give it to the campainmanager.
*/
virtual void cb_send_pilot(DatabaseCampaignMessage pilot) = 0;
};
}
#endif