From f18cddc63c575ea8082f31f35d6e6d984e93d000 Mon Sep 17 00:00:00 2001 From: Christian Dietrich Date: Mon, 25 Mar 2013 16:04:05 +0100 Subject: [PATCH] DatabaseCampaign: abstract campain for interaction with MySQL Database The DatabaseCampaign interacts with the MySQL tables that are created by the import-trace and prune-trace tools. It does offer all unfinished experiment pilots from the database to the fail-clients. Those clients send back a (by the experiment) defined protobuf message as a result. The custom protobuf message does have to need the form: import "DatabaseCampaignMessage.proto"; message ExperimentMsg { required DatabaseCampaignMessage fsppilot = 1; repeated group Result = 2 { // custom fields required int32 bitoffset = 1; optional int32 result = 2; } } The DatabaseCampaignMessage is the pilot identifier from the database. For each of the repeated result entries a row in a table is allocated. The structure of this table is constructed (by protobuf reflection) from the description of the message. Each field in the Result group becomes a column in the result table. For the given example it would be: CREATE TABLE result_ExperimentMessage( pilot_id INT, bitoffset INT NOT NULL, result INT, PRIMARY_KEY(pilot_id) ) Change-Id: I28fb5488e739d4098b823b42426c5760331027f8 --- src/CMakeLists.txt | 8 + src/core/comm/CMakeLists.txt | 6 +- src/core/comm/DatabaseCampaignMessage.proto | 13 + src/core/comm/ExperimentData.hpp | 1 + .../comm/TracePlugin.proto} | 0 src/core/cpn/CMakeLists.txt | 1 + src/core/cpn/DatabaseCampaign.cc | 147 ++++++ src/core/cpn/DatabaseCampaign.hpp | 66 +++ src/core/util/CMakeLists.txt | 3 + src/core/util/Database.cc | 54 +- src/core/util/Database.hpp | 85 +++- src/core/util/DatabaseProtobufAdapter.cc | 465 ++++++++++++++++++ src/core/util/DatabaseProtobufAdapter.hpp | 206 ++++++++ src/core/util/StringJoiner.hpp | 69 +++ src/plugins/tracing/CMakeLists.txt | 14 +- src/plugins/tracing/TracingPlugin.hpp | 2 +- tools/import-trace/BasicImporter.cc | 10 +- tools/import-trace/CMakeLists.txt | 14 +- tools/import-trace/DCiAOKernelImporter.cc | 12 +- tools/import-trace/Importer.cc | 12 +- tools/import-trace/Importer.hpp | 4 +- tools/import-trace/main.cc | 26 +- tools/prune-trace/BasicPruner.cc | 36 +- tools/prune-trace/Pruner.cc | 9 +- tools/prune-trace/main.cc | 17 +- 25 files changed, 1161 insertions(+), 119 deletions(-) create mode 100644 src/core/comm/DatabaseCampaignMessage.proto rename src/{plugins/tracing/trace.proto => core/comm/TracePlugin.proto} (100%) create mode 100644 src/core/cpn/DatabaseCampaign.cc create mode 100644 src/core/cpn/DatabaseCampaign.hpp create mode 100644 src/core/util/DatabaseProtobufAdapter.cc create mode 100644 src/core/util/DatabaseProtobufAdapter.hpp create mode 100644 src/core/util/StringJoiner.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bbc85a11..ff878659 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,14 @@ ### Setup search paths for headers ## include_directories(${CMAKE_CURRENT_BINARY_DIR}/core) +include_directories(${CMAKE_CURRENT_BINARY_DIR}/core/comm) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/core) + +# We need to control all the protoc import paths to ensure, that all +# protoc imports refer to the same root path. Otherwise the generated +# protoc headers are not comptabile. +SET(PROTOBUF_GENERATE_CPP_APPEND_PATH FALSE) +SET(PROTOBUF_IMPORT_DIRS "/usr/include;${CMAKE_CURRENT_SOURCE_DIR}/core/comm") + # Note: CMAKE_CURRENT_BINARY_DIR is needed to find "FailConfig.hpp", which # is generated by CMake from config/FailConfig.hpp.in and stored in # your build-dir. (The same goes for "FailControlMessage.pb.h", etc.) diff --git a/src/core/comm/CMakeLists.txt b/src/core/comm/CMakeLists.txt index 159679ce..8f353689 100644 --- a/src/core/comm/CMakeLists.txt +++ b/src/core/comm/CMakeLists.txt @@ -5,8 +5,10 @@ set(SRCS ) ## Setup desired protobuf descriptions HERE ## -set(MY_PROTOS +set(PROTOS FailControlMessage.proto + DatabaseCampaignMessage.proto + TracePlugin.proto ) #### PROTOBUFS #### @@ -14,7 +16,7 @@ find_package(Protobuf REQUIRED) include_directories(${PROTOBUF_INCLUDE_DIRS}) include_directories(${CMAKE_CURRENT_BINARY_DIR}) -PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${MY_PROTOS}) +PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${PROTOS}) add_library(fail-comm ${SRCS} ${PROTO_SRCS} ${PROTO_HDRS}) target_link_libraries(fail-comm ${PROTOBUF_LIBRARY}) diff --git a/src/core/comm/DatabaseCampaignMessage.proto b/src/core/comm/DatabaseCampaignMessage.proto new file mode 100644 index 00000000..f2c80455 --- /dev/null +++ b/src/core/comm/DatabaseCampaignMessage.proto @@ -0,0 +1,13 @@ +import "google/protobuf/descriptor.proto"; +extend google.protobuf.FieldOptions { + optional bool sql_primary_key = 32382 [ default = false]; + optional bool sql_ignore = 32383 [ default = false]; +} + +message DatabaseCampaignMessage { + required int32 pilot_id = 1 [(sql_primary_key) = true]; + required int32 variant_id = 2 [(sql_ignore) = true]; + required int32 fspmethod_id = 3 [(sql_ignore) = true]; + required int32 instr2 = 4 [(sql_ignore) = true]; + required int32 data_address = 5 [(sql_ignore) = true]; +} \ No newline at end of file diff --git a/src/core/comm/ExperimentData.hpp b/src/core/comm/ExperimentData.hpp index dc1a6ef3..a526e92b 100644 --- a/src/core/comm/ExperimentData.hpp +++ b/src/core/comm/ExperimentData.hpp @@ -24,6 +24,7 @@ public: ExperimentData(google::protobuf::Message* m) : msg(m) , m_workloadID(0) { } google::protobuf::Message& getMessage() { return *msg; } + void setMessage(google::protobuf::Message *msg) { this->msg = msg; } uint32_t getWorkloadID() const { return m_workloadID; }; void setWorkloadID(uint32_t id) { m_workloadID = id; } /** diff --git a/src/plugins/tracing/trace.proto b/src/core/comm/TracePlugin.proto similarity index 100% rename from src/plugins/tracing/trace.proto rename to src/core/comm/TracePlugin.proto diff --git a/src/core/cpn/CMakeLists.txt b/src/core/cpn/CMakeLists.txt index 777aa7aa..81ac82cf 100644 --- a/src/core/cpn/CMakeLists.txt +++ b/src/core/cpn/CMakeLists.txt @@ -1,6 +1,7 @@ set(SRCS CampaignManager.cc JobServer.cc + DatabaseCampaign.cc ) add_library(fail-cpn ${SRCS}) diff --git a/src/core/cpn/DatabaseCampaign.cc b/src/core/cpn/DatabaseCampaign.cc new file mode 100644 index 00000000..64f22794 --- /dev/null +++ b/src/core/cpn/DatabaseCampaign.cc @@ -0,0 +1,147 @@ +#include "DatabaseCampaign.hpp" +#include "cpn/CampaignManager.hpp" +#include "util/CommandLine.hpp" +#include "util/Logger.hpp" +#include "util/Database.hpp" +#include "comm/ExperimentData.hpp" + + +#ifndef __puma +#include +#endif + + +using namespace fail; + +static Logger log_recv("DatabaseCampaign::recv"); +static Logger log_send("DatabaseCampaign"); + +bool DatabaseCampaign::run() { + CommandLine &cmd = CommandLine::Inst(); + + cmd.addOption("", "", Arg::None, "USAGE: fail-server [options...]\n\n"); + CommandLine::option_handle HELP = cmd.addOption("h", "help", Arg::None, "-h,--help\t Print usage and exit"); + + Database::cmdline_setup(); + + /* Give the implementation the chance to add stuff to the command + line interface */ + if (!cb_commandline_init()) return false; + + CommandLine::option_handle VARIANT = cmd.addOption("v", "variant", Arg::Required, + "-v/--variant\t Variant label (default: \"none\")"); + CommandLine::option_handle BENCHMARK = cmd.addOption("b", "benchmark", Arg::Required, + "-b/--benchmark\t Benchmark label (default: \"none\")\n"); + CommandLine::option_handle PRUNER = cmd.addOption("p", "prune-method", Arg::Required, + "-p/--prune-method\t Which import method to use (default: basic)"); + + if(!cmd.parse()) { + log_send << "Error parsing arguments." << std::endl; + exit(-1); + } + + if (cmd[HELP]) { + cmd.printUsage(); + exit(0); + } + + std::string variant, benchmark, pruner; + + if (cmd[VARIANT].count() > 0) + variant = std::string(cmd[VARIANT].first()->arg); + else + variant = "none"; + + if (cmd[BENCHMARK].count() > 0) + benchmark = std::string(cmd[BENCHMARK].first()->arg); + else + benchmark = "none"; + + if (cmd[PRUNER].count() > 0) + pruner = std::string(cmd[PRUNER].first()->arg); + else + pruner = "basic"; + + db = Database::cmdline_connect(); + variant_id = db->get_variant_id(variant, benchmark); + log_send << "Variant to use " << variant << "/" << benchmark << " (ID: " << variant_id << ")" << std::endl; + fspmethod_id = db->get_fspmethod_id(pruner); + log_send << "Pruner to use " << pruner << " (ID: " << fspmethod_id << ")" << std::endl; + + /* Set up the adapter that maps the results into the MySQL + Database */ + db_connect.set_database_handle(db); + + const google::protobuf::Descriptor *desc = cb_result_message(); + db_connect.create_table(desc); + + // collect results in parallel to avoid deadlock +#ifndef __puma + boost::thread collect_thread(&DatabaseCampaign::collect_result_thread, this); +#endif + + /* Gather all unfinished jobs */ + int experiment_count; + std::string sql_select = "SELECT pilot_id, fspgroup.fspmethod_id, fspgroup.variant_id, fspgroup.instr2, fspgroup.data_address "; + std::stringstream ss; + ss << " FROM fspgroup INNER JOIN fsppilot ON fsppilot.id = fspgroup.pilot_id " + << " WHERE known_outcome = 0 " + << " AND fspgroup.fspmethod_id = " << fspmethod_id + << " AND fspgroup.variant_id = " << variant_id + // << " AND fsppilot.data_address = 1346688" + << " AND (SELECT COUNT(*) FROM " + db_connect.result_table() + " as r WHERE r.pilot_id = fspgroup.pilot_id) = 0" + << " ORDER BY fspgroup.instr2"; + std::string sql_body = ss.str(); + + /* Get the number of unfinished experiments */ + MYSQL_RES *count = db->query(("SELECT COUNT(*) " + sql_body).c_str(), true); + MYSQL_ROW row = mysql_fetch_row(count); + experiment_count = atoi(row[0]); + + + MYSQL_RES *pilots = db->query_stream ((sql_select + sql_body).c_str()); + + log_send << "Found " << experiment_count << " unfinished experiments in database." << std::endl; + + sent_pilots = 0; + while ((row = mysql_fetch_row(pilots)) != 0) { + unsigned pilot_id = atoi(row[0]); + unsigned instr2 = atoi(row[3]); + unsigned data_address = atoi(row[4]); + + DatabaseCampaignMessage pilot; + pilot.set_pilot_id(pilot_id); + pilot.set_fspmethod_id(fspmethod_id); + pilot.set_variant_id(variant_id); + pilot.set_instr2(instr2); + pilot.set_data_address(data_address); + + this->cb_send_pilot(pilot); + + if ((++sent_pilots) % 1000 == 0) { + log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl; + } + } + + log_send << "pushed " << sent_pilots << " pilots into the queue" << std::endl; + log_send << "wait for the clients to complete" << std::endl; + + campaignmanager.noMoreParameters(); + +#ifndef __puma + collect_thread.join(); +#endif + return true; +} + +void DatabaseCampaign::collect_result_thread() { + log_recv << "Started result receive thread" << std::endl; + + ExperimentData *res; + + while ((res = static_cast(campaignmanager.getDone()))) { + db_connect.insert_row(&res->getMessage()); + delete res; + } +} + diff --git a/src/core/cpn/DatabaseCampaign.hpp b/src/core/cpn/DatabaseCampaign.hpp new file mode 100644 index 00000000..b4f40e2e --- /dev/null +++ b/src/core/cpn/DatabaseCampaign.hpp @@ -0,0 +1,66 @@ +#ifndef __CPN_DATABASE_CAMPAIGN_H__ +#define __CPN_DATABASE_CAMPAIGN_H__ + +#include "util/Database.hpp" +#include "util/DatabaseProtobufAdapter.hpp" +#include "comm/DatabaseCampaignMessage.pb.h" +#include "Campaign.hpp" +#include "comm/ExperimentData.hpp" +#include + + + +namespace fail { + +/** + * \class Campaign + * + * Interface for a generic database driven campaign. It uses the + * results from mysql tables that are genrated by the import-trace and + * prune-trace. + */ + +class DatabaseCampaign : public Campaign { + Database *db; // !< The database connection object + DatabaseProtobufAdapter db_connect; + + int variant_id; // !< Which variant do we work on (from CMDLINE) + int fspmethod_id; // !< Which fspmethod should be put out to the clients + + void collect_result_thread(); + + int sent_pilots; + +public: + DatabaseCampaign() {}; + + /** + * Defines the campaign. In the DatabaseCampaign the database + * connection is done + * @return \c true if the campaign was successful, \c false otherwise + */ + virtual bool run(); + + /** + * Callback function that can be used to add command line options + * to the campaign + */ + virtual bool cb_commandline_init() { return true; } + + /** + * Callback to the campagin to get the result message descriptor + */ + virtual const google::protobuf::Descriptor * cb_result_message() = 0; + + /** + * Callback that gets a DatabaseExperimentData instance, that is + * filled with a concrete experiment pilot from the database. The + * application should wrap the DatabaseCampaignMessage pilot into + * a custom message and give it to the campainmanager. + */ + virtual void cb_send_pilot(DatabaseCampaignMessage pilot) = 0; +}; + +} + +#endif diff --git a/src/core/util/CMakeLists.txt b/src/core/util/CMakeLists.txt index 4bc5f216..bfaf0765 100644 --- a/src/core/util/CMakeLists.txt +++ b/src/core/util/CMakeLists.txt @@ -5,6 +5,8 @@ set(SRCS ElfReader.hpp Database.hpp Database.cc + DatabaseProtobufAdapter.hpp + DatabaseProtobufAdapter.cc Demangler.hpp Demangler.cc Disassembler.hpp @@ -57,4 +59,5 @@ endif() mark_as_advanced(FAIL_OBJDUMP) add_library(fail-util ${SRCS}) +add_dependencies(fail-util fail-comm) target_link_libraries(fail-util ${PROTOBUF_LIBRARY} ${Boost_LIBRARIES} ${LIB_IBERTY} ) diff --git a/src/core/util/Database.cc b/src/core/util/Database.cc index f9f53af4..d7a98754 100644 --- a/src/core/util/Database.cc +++ b/src/core/util/Database.cc @@ -3,7 +3,8 @@ #include "Database.hpp" #include "util/CommandLine.hpp" #include "util/Logger.hpp" -static fail::Logger log("Database", true); + +static fail::Logger LOG("Database", true); using namespace fail; @@ -14,14 +15,17 @@ Database::Database(const std::string &username, const std::string &host, const s if (!mysql_real_connect(handle, host.c_str(), username.c_str(), 0, database.c_str(), 0, 0, 0)) { - log << "cannot connect to MySQL server: " << mysql_error(handle) << std::endl; + LOG << "cannot connect to MySQL server: " << mysql_error(handle) << std::endl; exit(-1); } - log << "opened MYSQL connection" << std::endl; + LOG << "opened MYSQL connection" << std::endl; } -MYSQL_RES* Database::query(char const *query, bool get_result) -{ +MYSQL_RES* Database::query(char const *query, bool get_result) { +#ifndef __puma + boost::lock_guard guard(m_handle_lock); +#endif + if (mysql_query(handle, query)) { std::cerr << "query '" << query << "' failed: " << mysql_error(handle) << std::endl; return 0; @@ -43,6 +47,26 @@ MYSQL_RES* Database::query(char const *query, bool get_result) return (MYSQL_RES *) 1; // Invalid PTR!!! } +MYSQL_RES* Database::query_stream(char const *query) +{ +#ifndef __puma + boost::lock_guard guard(m_handle_lock); +#endif + + if (mysql_query(handle, query)) { + std::cerr << "query '" << query << "' failed: " << mysql_error(handle) << std::endl; + return 0; + } + + MYSQL_RES *res = mysql_use_result(handle); + if (!res && mysql_errno(handle)) { + std::cerr << "mysql_use_result for query '" << query << "' failed: " << mysql_error(handle) << std::endl; + return 0; + } + + return res; +} + my_ulonglong Database::affected_rows() { @@ -53,10 +77,10 @@ my_ulonglong Database::affected_rows() int Database::get_variant_id(const std::string &variant, const std::string &benchmark) { if (!query("CREATE TABLE IF NOT EXISTS variant (" - " id int(11) NOT NULL AUTO_INCREMENT," - " variant varchar(255) NOT NULL," - " benchmark varchar(255) NOT NULL," - " PRIMARY KEY (id)," + " id int(11) NOT NULL AUTO_INCREMENT," + " variant varchar(255) NOT NULL," + " benchmark varchar(255) NOT NULL," + " PRIMARY KEY (id)," "UNIQUE KEY variant (variant,benchmark))")) { return 0; } @@ -85,9 +109,9 @@ int Database::get_variant_id(const std::string &variant, const std::string &benc int Database::get_fspmethod_id(const std::string &method) { if (!query("CREATE TABLE IF NOT EXISTS fspmethod (" - " id int(11) NOT NULL AUTO_INCREMENT," - " method varchar(255) NOT NULL," - " PRIMARY KEY (id), UNIQUE KEY method (method))")) { + " id int(11) NOT NULL AUTO_INCREMENT," + " method varchar(255) NOT NULL," + " PRIMARY KEY (id), UNIQUE KEY method (method))")) { return 0; } @@ -120,11 +144,11 @@ void Database::cmdline_setup() { CommandLine &cmd = CommandLine::Inst(); DATABASE = cmd.addOption("d", "database", Arg::Required, - "-d/--database\t MYSQL Database (default: taken from ~/.my.cnf)"); + "-d/--database\t MYSQL Database (default: taken from ~/.my.cnf)"); HOSTNAME = cmd.addOption("H", "hostname", Arg::Required, - "-h/--hostname\t MYSQL Hostname (default: taken from ~/.my.cnf)"); + "-h/--hostname\t MYSQL Hostname (default: taken from ~/.my.cnf)"); USERNAME = cmd.addOption("u", "username", Arg::Required, - "-u/--username\t MYSQL Username (default: taken from ~/.my.cnf, or your current user)"); + "-u/--username\t MYSQL Username (default: taken from ~/.my.cnf, or your current user)"); } Database * Database::cmdline_connect() { diff --git a/src/core/util/Database.hpp b/src/core/util/Database.hpp index e11ed2a4..96a99f58 100644 --- a/src/core/util/Database.hpp +++ b/src/core/util/Database.hpp @@ -1,31 +1,86 @@ #ifndef __UTIL_DATABASE_H__ #define __UTIL_DATABASE_H__ +#ifndef __puma +#include +#endif + #include #include #include namespace fail { -class Database { - MYSQL *handle; - MYSQL_RES *last_result; -public: - Database(const std::string &username, const std::string &host, const std::string &database); - ~Database() { mysql_close(handle); } + /** \class Database + * + * Database abstraction layer that handles the database connection + * parameters, the database connection and provides different + * database access methods + */ + class Database { + MYSQL *handle; // !< The MySQL Database handle + MYSQL_RES *last_result; // !< Used for mysql_result_free +#ifndef __puma + boost::mutex m_handle_lock; +#endif - int get_variant_id(const std::string &variant, const std::string &benchmark); - int get_fspmethod_id(const std::string &method); + public: + /** + * Constructor that connects instantly to the database + */ + Database(const std::string &username, const std::string &host, const std::string &database); + ~Database() { mysql_close(handle); } + + /** + * Get the variant id for a specific variant/benchmark pair, + * if it isn't defined in the database (variant table), it is + * inserted + */ + int get_variant_id(const std::string &variant, const std::string &benchmark); + + /** + * Get the fault space pruning method id for a specific + * pruning method, if it isn't defined in the database + * (fspmethod table), it is inserted. + */ + int get_fspmethod_id(const std::string &method); - MYSQL * getHandle() const { return handle; } - MYSQL_RES *query(char const *query, bool get_result = false); - my_ulonglong affected_rows(); + /** + * Get the raw mysql database handle + */ + MYSQL * getHandle() const { return handle; } + /** + * Do a small database query. If get_result is set to false + * (MYSQL_RES *)0 or (MYSQL_RES *)1 is given back to indicate + * the result of the query. If the result should be fetched + * a pointer is returned. This pointer is valid until the next + * call to this->query(stmt, true) + */ + MYSQL_RES *query(char const *query, bool get_result = false); + /** + * Similar to Database::query, but this should be used for big + * queries. The result is not copied instantly from the + * database server, but a partial result is returned. Which + * behaves as a normal MYSQL_RES pointer. + */ + MYSQL_RES *query_stream(char const *query); - // Interface to the command line parser - static void cmdline_setup(); - static Database * cmdline_connect(); -}; + /** + * How many rows were affected by the last query + */ + my_ulonglong affected_rows(); + + /** + * Interface to the util/CommandLine.hpp interface. In you + * application you first call cmdline_setup(), which adds + * different command line options. The cmdline_connect() + * function then parses the commandline and connects to the + * database. + */ + static void cmdline_setup(); + static Database * cmdline_connect(); + }; } diff --git a/src/core/util/DatabaseProtobufAdapter.cc b/src/core/util/DatabaseProtobufAdapter.cc new file mode 100644 index 00000000..0ba75672 --- /dev/null +++ b/src/core/util/DatabaseProtobufAdapter.cc @@ -0,0 +1,465 @@ +#include +#include +#include +#include "DatabaseProtobufAdapter.hpp" +#include "util/Logger.hpp" +#include "util/StringJoiner.hpp" +//FIXME +#include "../experiments/dciao-kernelstructs/dciao_kernel.pb.h" + + +static fail::Logger LOG("DatabaseProtobufAdapter", true); + + +using namespace fail; +using namespace google::protobuf; + +DatabaseProtobufAdapter::TypeBridge::TypeBridge(const FieldDescriptor *desc) + : desc(desc) { + /* We get the value of the field option extension in_primary_key + to determine which fields should be added additionally to the + primary key, besides pilot_id */ + if (desc) { // top-level type bridge has no desc pointer + const FieldOptions& field_options = desc->options(); + this->primary_key = field_options.GetExtension(sql_primary_key); + } +} + + +std::string DatabaseProtobufAdapter::TypeBridge_enum::sql_type() { + /* enum types are mapped onto SQL enum types, therefore we have to + gather all the stringified names for the enum, values */ + const google::protobuf::EnumDescriptor *e = desc->enum_type(); + std::stringstream ss; + ss << "ENUM ("; + for (int i = 0; i < e->value_count(); i++) { + ss << "'" << e->value(i)->name() << "'"; + if (i != e->value_count() - 1) + ss << ", "; + } + ss << ")"; + return ss.str(); +} + +std::string DatabaseProtobufAdapter::TypeBridge_message::sql_create_stmt() { + /* The create statement for a message TypeBridge just propagates + the call onto the enclosed types (including inner + TypeBridge_message objects */ + std::stringstream ss; + for (std::vector::iterator it = types.begin(); + it != types.end(); ++it) { + TypeBridge *bridge = *it; + ss << bridge->sql_create_stmt(); + if (it+1 != types.end()) + ss << ", "; + } + return ss.str(); +} + + +void DatabaseProtobufAdapter::TypeBridge_int64::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_LONGLONG; + bind->is_unsigned = 0; + buffer = ref->GetInt64(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_uint64::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_LONGLONG; + bind->is_unsigned = 1; + buffer = ref->GetUInt64(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_int32::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_LONG; + bind->is_unsigned = 0; + buffer = ref->GetInt32(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_uint32::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_LONG; + bind->is_unsigned = 1; + buffer = ref->GetUInt32(*msg, desc); + bind->buffer = &buffer; +} +void DatabaseProtobufAdapter::TypeBridge_float::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_FLOAT; + buffer = ref->GetFloat(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_double::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_DOUBLE; + buffer = ref->GetDouble(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_bool::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_TINY; + bind->is_unsigned = 1; + buffer = ref->GetBool(*msg, desc); + bind->buffer = &buffer; +} + +void DatabaseProtobufAdapter::TypeBridge_string::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_STRING; + bind->buffer = (void *) ref->GetString(*msg, desc).c_str(); + bind->buffer_length = ref->GetString(*msg, desc).length(); +} +void DatabaseProtobufAdapter::TypeBridge_enum::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + /* Handle the NULL case */ + if (insert_null(bind, msg)) return; + + bind->buffer_type = MYSQL_TYPE_STRING; + bind->buffer = (void *) ref->GetEnum(*msg, desc)->name().c_str(); + bind->buffer_length = ref->GetEnum(*msg, desc)->name().length(); +} + +void DatabaseProtobufAdapter::TypeBridge_message::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + std::stringstream ss; + for (std::vector::iterator it = types.begin(); + it != types.end(); ++it) { + TypeBridge *bridge = *it; + TypeBridge_message *msg_bridge = dynamic_cast (bridge); + if (msg_bridge != 0) { + const Message *inner_msg = 0; + if (msg_bridge->desc->is_repeated()) { + std::vector &selector = *top_level_msg()->selector; + inner_msg = &ref->GetRepeatedMessage(*msg, msg_bridge->desc, selector[nesting_level+1]); + } else { + inner_msg = &ref->GetMessage(*msg, msg_bridge->desc); + } + msg_bridge->bind(bind, inner_msg); + /* Increment bind pointer */ + bind = &bind[msg_bridge->field_count]; + } else { + /* Bind the plain field and increment the binding pointer */ + bridge->bind(bind, msg); + bind ++; + } + } +} + +void DatabaseProtobufAdapter::TypeBridge_repeated::bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + size_t count = ref->FieldSize(*msg,desc); + if (count == 0) { + bind->buffer_type = MYSQL_TYPE_NULL; + return; + } + + if (buffer) delete[] buffer; + buffer = new char[count * inner->element_size()]; + assert (inner->element_size() > 0); + + char *p = buffer; + for (unsigned i = 0; i < count; i++) { + inner->copy_to(msg, i, p); + p += inner->element_size(); + } + + bind->buffer_type = MYSQL_TYPE_BLOB; + bind->buffer = buffer; + bind->buffer_length = count * inner->element_size(); +} + +void DatabaseProtobufAdapter::TypeBridge_int32::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(int32_t *) p = ref->GetRepeatedInt32(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_uint32::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(uint32_t *) p = ref->GetRepeatedUInt32(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_int64::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(int64_t *) p = ref->GetRepeatedInt64(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_uint64::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(uint64_t *) p = ref->GetRepeatedUInt64(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_double::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(double *) p = ref->GetRepeatedDouble(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_float::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(float *) p = ref->GetRepeatedFloat(*msg, desc, i); +} +void DatabaseProtobufAdapter::TypeBridge_bool::copy_to(const google::protobuf::Message *msg, int i, void *p) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + *(char *) p = ref->GetRepeatedBool(*msg, desc, i) ? '1' : '0'; +} + +void DatabaseProtobufAdapter::error_create_table() { + std::cerr << "ERROR: Cannot create the result table from message description" << std::endl; + std::cerr << " The form only from of messages is: [required DatabaseCampaignMessage, repeated group X {}]" << std::endl; + exit(-1); +} + +int DatabaseProtobufAdapter::TypeBridge_message::gatherTypes(StringJoiner &insert_stmt, StringJoiner &primary_key) { + /* Clear old state */ + repeated_message_stack.clear(); + repeated_message_stack.push_back(this); + for (std::vector::iterator it = types.begin(); it != types.end(); ++it) + delete *it; + types.clear(); + + size_t count = msg_type->field_count(); + field_count = 0; + + for (unsigned i = 0; i < count; i++) { + const FieldDescriptor *field = msg_type->field(i); + assert(field != 0); + + TypeBridge *bridge = 0; + TypeBridge_message *msg_bridge; + bool can_be_repeated = true; // default value + + // For repeated messages + TypeBridge_message *top_level_msg; + + const FieldOptions& field_options = field->options(); + if (field_options.GetExtension(sql_ignore)) { + // Field should be ignored + continue; + } + + switch (field->cpp_type()) { + case FieldDescriptor::CPPTYPE_INT32: + bridge = new TypeBridge_int32(field); + break; + case FieldDescriptor::CPPTYPE_UINT32: + bridge = new TypeBridge_uint32(field); + break; + case FieldDescriptor::CPPTYPE_INT64: + bridge = new TypeBridge_int64(field); + break; + case FieldDescriptor::CPPTYPE_UINT64: + bridge = new TypeBridge_uint64(field); + break; + case FieldDescriptor::CPPTYPE_DOUBLE: + bridge = new TypeBridge_double(field); + break; + case FieldDescriptor::CPPTYPE_FLOAT: + bridge = new TypeBridge_float(field); + break; + case FieldDescriptor::CPPTYPE_BOOL: + bridge = new TypeBridge_bool(field); + break; + case FieldDescriptor::CPPTYPE_ENUM: + can_be_repeated = false; + bridge = new TypeBridge_enum(field); + break; + case FieldDescriptor::CPPTYPE_STRING: + can_be_repeated = false; + bridge = new TypeBridge_string(field); + break; + case FieldDescriptor::CPPTYPE_MESSAGE: + if (field->is_repeated()) { + top_level_msg = this->top_level_msg(); + /* Here we check wether we are on the repeated path + from the root, when we are a repeated entry is ok. + */ + if (!(this == top_level_msg->repeated_message_stack.back())) { + LOG << "Cannot handle repeated inner message in two different paths: " << field->name() << std::endl; + exit(0); + } + } + + if (field->is_optional()) { + LOG << "Cannot handle optional inner message: " << field->name() << std::endl; + exit(-1); + } + + msg_bridge = new TypeBridge_message(field, field->message_type(), this); + bridge = msg_bridge; + types.push_back(bridge); + if (field->is_repeated()) { + while (1) { + TypeBridge_message *back = top_level_msg->repeated_message_stack.back(); + if (back == msg_bridge) + break; + TypeBridge_message *p = msg_bridge; + while (back != 0 && p->parent != back && p->parent != 0) { + p = p->parent; + } + top_level_msg->repeated_message_stack.push_back(p); + } + } + + field_count += msg_bridge->gatherTypes(insert_stmt, primary_key); + + /* Do not the normal field adding process */ + continue; + + default: + std::cerr << "unsupported field: " << field->name() << std::endl; + exit(-1); + break; + } + + if (field->is_repeated() && ! can_be_repeated) { + LOG << "Cannot handle repeated field: " << field->name() << std::endl; + exit(-1); + } + if (field->is_repeated()) { + bridge = new TypeBridge_repeated(field, bridge); + } + + types.push_back(bridge); + field_count ++; + insert_stmt.push_back(field->name()); + if (bridge->primary_key) + primary_key.push_back(field->name()); + } + + return field_count; +} + + +void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) { + assert (toplevel_desc != 0); + + std::stringstream create_table_stmt, insert_stmt; + StringJoiner insert_join, primary_join, question_marks; + + result_table_name = "result_" + toplevel_desc->name(); + + /* Fill our top level dummy type bridge with the fields from the + example message */ + top_level_msg.msg_type = toplevel_desc; + int fields = top_level_msg.gatherTypes(insert_join, primary_join); + for (int i = 0; i < fields; i++) + question_marks.push_back("?"); + + create_table_stmt << "CREATE TABLE IF NOT EXISTS " << result_table_name << "("; + create_table_stmt << top_level_msg.sql_create_stmt() << ", PRIMARY KEY(" << primary_join.join(", ") << "))"; + + insert_stmt << "INSERT INTO " << result_table_name << "(" << insert_join.join(","); + insert_stmt << ") VALUES (" << question_marks.join(",") << ")"; + + // Create the Table + db->query(create_table_stmt.str().c_str()); + + + // Prepare the insert statement + stmt = mysql_stmt_init(db->getHandle()); + if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) { + LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db->getHandle()) << std::endl; + exit(-1); + } +} + + + +int DatabaseProtobufAdapter::field_size_at_pos(const Message *msg, std::vector selector, int pos) { + std::vector< TypeBridge_message *>::iterator it; + if (top_level_msg.repeated_message_stack.size() <= 1) + return 1; + + int i = 1; + for (it = top_level_msg.repeated_message_stack.begin() + 1; + i != pos && it != top_level_msg.repeated_message_stack.end(); ++it) { + + TypeBridge_message *bridge = *it; + const Reflection *ref = msg->GetReflection(); + if (bridge->desc && bridge->desc->is_repeated()) { + msg = &ref->GetRepeatedMessage(*msg, bridge->desc, selector[i]); + } else { + assert(selector[i] == 0); + msg = &ref->GetMessage(*msg, bridge->desc); + } + i++; + } + const Reflection *ref = msg->GetReflection(); + return ref->FieldSize(*msg, top_level_msg.repeated_message_stack[i]->desc); +} + +bool DatabaseProtobufAdapter::insert_row(const google::protobuf::Message *msg) { + const Reflection *ref = msg->GetReflection(); + const Descriptor *d = msg->GetDescriptor(); + assert (d != 0 && ref != 0); + + MYSQL_BIND *bind = new MYSQL_BIND[top_level_msg.field_count]; + + /* We determine how many columns should be produced */ + std::vector selector (top_level_msg.repeated_message_stack.size()); + + while (true) { + // INSERT WITH SELECTOR + top_level_msg.selector = &selector; + + // Use the top_level_msg TypeBridge to bind all parameters + // into the MYSQL_BIND structure + memset(bind, 0, sizeof(*bind) * (top_level_msg.field_count)); + top_level_msg.bind(bind, msg); + + // Insert the binded row + if (mysql_stmt_bind_param(stmt, bind)) { + LOG << "mysql_stmt_bind_param() failed: " << mysql_stmt_error(stmt) << std::endl; + delete[] bind; + return false; + } + + if (mysql_stmt_execute(stmt)) { + LOG << "mysql_stmt_execute() failed: " << mysql_stmt_error(stmt) << std::endl; + delete[] bind; + return false; + } + + + /* Increment the selector */ + unsigned i = selector.size() - 1; + selector[i] ++; + + while (i > 0 && field_size_at_pos(msg, selector, i) <= selector[i]) { + selector[i] = 0; + i--; + selector[i] ++; + } + if (i == 0) break; + } + + delete[] bind; + + return true; + +} diff --git a/src/core/util/DatabaseProtobufAdapter.hpp b/src/core/util/DatabaseProtobufAdapter.hpp new file mode 100644 index 00000000..9abf6b05 --- /dev/null +++ b/src/core/util/DatabaseProtobufAdapter.hpp @@ -0,0 +1,206 @@ +#ifndef __COMM_PROTOBUF_DATABASE_ADAPTER_H__ +#define __COMM_PROTOBUF_DATABASE_ADAPTER_H__ + +#include +#include +#include +#include "DatabaseCampaignMessage.pb.h" +#include "util/Database.hpp" +#include "util/StringJoiner.hpp" + +#include + +namespace fail { + +class DatabaseProtobufAdapter { + Database *db; + MYSQL_STMT *stmt; + + void error_create_table(); + + /** \class TypeBridge + A type bridge bridges the gap between a protobuf type and the + sql database. It defines how the result table is defined, and + how the message types are mapped onto SQL types. The whole + message is mapped into a top level TypeBridge_message */ + struct TypeBridge { + bool primary_key; // !< Should the field be put into the + // !< primary key + const google::protobuf::FieldDescriptor *desc; + + virtual ~TypeBridge() {}; + + TypeBridge(const google::protobuf::FieldDescriptor *desc); + /* The field name in the SQL Table */ + virtual std::string name() { return desc->name(); } + /* The expression in the create stmt for this type. */ + virtual std::string sql_create_stmt() + { return name() + " " + sql_type() + (desc->is_required() ? " NOT NULL": ""); }; + + /* The mapped type */ + virtual std::string sql_type() = 0; + virtual int element_size() { return 0; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *) { }; + + /* A common function that handles NULL values for fields */ + bool insert_null(MYSQL_BIND *bind, const google::protobuf::Message *msg) { + const google::protobuf::Reflection *ref = msg->GetReflection(); + if (!ref->HasField(*msg, desc)) { + bind->buffer_type = MYSQL_TYPE_NULL; + return true; // handled + } + return false; + } + + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg) = 0; + }; + + struct TypeBridge_repeated : TypeBridge { + TypeBridge *inner; + char *buffer; + TypeBridge_repeated(const google::protobuf::FieldDescriptor *desc, TypeBridge *inner) + : TypeBridge(desc), inner(inner), buffer(0) {}; + virtual std::string sql_type() { return "blob"; }; + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_int32 : TypeBridge { + int32_t buffer; + TypeBridge_int32(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + virtual std::string sql_type() { return "INT"; }; + virtual int element_size() { return 4; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_uint32 : TypeBridge { + uint32_t buffer; + TypeBridge_uint32(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + + virtual std::string sql_type() { return "INT"; }; + virtual int element_size() { return 4; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_int64 : TypeBridge { + int64_t buffer; + TypeBridge_int64(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + virtual std::string sql_type() { return "BIGINT"; }; + virtual int element_size() { return 8; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_uint64 : TypeBridge { + uint64_t buffer; + TypeBridge_uint64(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + + virtual std::string sql_type() { return "BIGINT"; }; + virtual int element_size() { return 8; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + struct TypeBridge_double : TypeBridge { + double buffer; + TypeBridge_double(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + + virtual std::string sql_type() { return "DOUBLE"; }; + virtual int element_size() { return 8; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + struct TypeBridge_float : TypeBridge { + float buffer; + TypeBridge_float(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + + virtual std::string sql_type() { return "FLOAT"; }; + virtual int element_size() { return 4; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + struct TypeBridge_bool : TypeBridge { + bool buffer; + TypeBridge_bool(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + + virtual std::string sql_type() { return "TINYINT"; }; + virtual int element_size() { return 1; }; + virtual void copy_to(const google::protobuf::Message *msg, int i, void *); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_string : TypeBridge { + TypeBridge_string(const google::protobuf::FieldDescriptor *desc) + : TypeBridge(desc){}; + virtual std::string sql_type() { return "TEXT"; }; + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_enum : TypeBridge { + int32_t size; + TypeBridge_enum(const google::protobuf::FieldDescriptor *desc) : TypeBridge(desc) {}; + virtual std::string sql_type(); + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + }; + + struct TypeBridge_message : TypeBridge { + const google::protobuf::Descriptor *msg_type; + int nesting_level; + int field_count; + std::vector types; + std::vector *selector; + + /* Pointer to the toplevel message */ + TypeBridge_message *parent; + + TypeBridge_message * top_level_msg() { + TypeBridge_message *p = this; + while (p->parent != 0) p = p->parent; + return p; + } + + std::vector repeated_message_stack; + + TypeBridge_message(const google::protobuf::FieldDescriptor *desc, + const google::protobuf::Descriptor *msg_type, + TypeBridge_message *parent) + : TypeBridge(desc), msg_type(msg_type), parent(parent) { + if (parent) + nesting_level = parent->nesting_level+1; + else + nesting_level = 0; + }; + virtual std::string sql_create_stmt(); + virtual std::string sql_type() { return ""; }; + virtual void bind(MYSQL_BIND *bind, const google::protobuf::Message *msg); + /* Returns the number of enclosed fields */ + int gatherTypes(StringJoiner &insert_stmt, StringJoiner &primary_key); + }; + + TypeBridge_message top_level_msg; + + std::string result_table_name; + + int field_size_at_pos(const google::protobuf::Message *msg, std::vector selector, int pos); + + +public: + DatabaseProtobufAdapter() : db(0), top_level_msg(0, 0, 0) {} + void set_database_handle(Database *db) { this->db = db; } + void create_table(const google::protobuf::Descriptor *); + bool insert_row(const google::protobuf::Message *msg); + std::string result_table() { return result_table_name; } + +}; + +} + +#endif + diff --git a/src/core/util/StringJoiner.hpp b/src/core/util/StringJoiner.hpp new file mode 100644 index 00000000..7ab43dcc --- /dev/null +++ b/src/core/util/StringJoiner.hpp @@ -0,0 +1,69 @@ +#ifndef __UTIL_STRINGJOINER_H +#define __UTIL_STRINGJOINER_H + +#include +#include +#include +#include +#include + + +namespace fail { +/** + * \brief Helper subclass of std::deque for convenient + * concatenating strings with a separator. + * + * The behaviour is similar to the python list().join(",") construct. + * + */ +struct StringJoiner : public std::deque { + /** + * \brief join all strings in the container, + * + * Join all the collected strings to one string. The separator is + * inserted between each element. + */ + std::string join(const char *j) { + std::stringstream ss; + if (size() == 0) + return ""; + + ss << front(); + + std::deque::const_iterator i = begin() + 1; + + while (i != end()) { + ss << j << *i; + i++; + } + return ss.str(); + } + + /** + * \brief append strings to list. + * + * Appends the given value to the list of values if it isn't the + * empty string. "" will be ignored. + */ + void push_back(const value_type &x) { + if (x.compare("") == 0) + return; + std::deque::push_back(x); + } + + /** + * \brief append strings to list. + * + * Appends the given value to the list of values if it isn't the + * empty string. "" will be ignored. + */ + void push_front(const value_type &x) { + if (x.compare("") == 0) + return; + std::deque::push_front(x); + } +}; + +} + +#endif diff --git a/src/plugins/tracing/CMakeLists.txt b/src/plugins/tracing/CMakeLists.txt index efb3da45..f1dbd2d9 100644 --- a/src/plugins/tracing/CMakeLists.txt +++ b/src/plugins/tracing/CMakeLists.txt @@ -1,13 +1,5 @@ set(PLUGIN_NAME tracing) -#### PROTOBUFS #### -set(MY_PROTOS - trace.proto -) -find_package(Protobuf REQUIRED) -include_directories(${PROTOBUF_INCLUDE_DIRS}) -PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${MY_PROTOS}) - set(MY_PLUGIN_SRCS TracingPlugin.cc TracingPlugin.hpp @@ -15,5 +7,7 @@ set(MY_PLUGIN_SRCS include_directories(${CMAKE_CURRENT_BINARY_DIR}) ## Build library -add_library(fail-${PLUGIN_NAME} ${MY_PLUGIN_SRCS} ${PROTO_SRCS} ${PROTO_HDRS}) -target_link_libraries(fail-${PLUGIN_NAME} ${PROTOBUF_LIBRARY}) +add_library(fail-${PLUGIN_NAME} ${MY_PLUGIN_SRCS}) +add_dependencies(fail-${PLUGIN_NAME} fail-comm) +target_link_libraries(fail-${PLUGIN_NAME}) + diff --git a/src/plugins/tracing/TracingPlugin.hpp b/src/plugins/tracing/TracingPlugin.hpp index 4cafa8b2..e8cec478 100644 --- a/src/plugins/tracing/TracingPlugin.hpp +++ b/src/plugins/tracing/TracingPlugin.hpp @@ -8,7 +8,7 @@ #include "efw/ExperimentFlow.hpp" #include "config/FailConfig.hpp" -#include "../plugins/tracing/trace.pb.h" +#include "TracePlugin.pb.h" // Check if configuration dependencies are satisfied: #if !defined(CONFIG_EVENT_BREAKPOINTS) || !defined(CONFIG_EVENT_MEMREAD) || !defined(CONFIG_EVENT_MEMWRITE) diff --git a/tools/import-trace/BasicImporter.cc b/tools/import-trace/BasicImporter.cc index 1b8a8bfc..ccb59f13 100644 --- a/tools/import-trace/BasicImporter.cc +++ b/tools/import-trace/BasicImporter.cc @@ -2,7 +2,7 @@ #include "util/Logger.hpp" #include "BasicImporter.hpp" -extern fail::Logger log; +static fail::Logger LOG("BasicImporter"); bool BasicImporter::create_database() { std::string create_statement = "CREATE TABLE IF NOT EXISTS trace (" @@ -28,7 +28,7 @@ bool BasicImporter::add_trace_event(instruction_count_t begin, instruction_count "VALUES (?,?,?,?, ?,?,?)"); stmt = mysql_stmt_init(db->getHandle()); if (mysql_stmt_prepare(stmt, sql.c_str(), sql.length())) { - log << "query '" << sql << "' failed: " << mysql_error(db->getHandle()) << std::endl; + LOG << "query '" << sql << "' failed: " << mysql_error(db->getHandle()) << std::endl; return false; } } @@ -61,12 +61,12 @@ bool BasicImporter::add_trace_event(instruction_count_t begin, instruction_count } } if (mysql_stmt_bind_param(stmt, bind)) { - log << "mysql_stmt_bind_param() failed: " << mysql_stmt_error(stmt) << std::endl; + LOG << "mysql_stmt_bind_param() failed: " << mysql_stmt_error(stmt) << std::endl; return false; } if (mysql_stmt_execute(stmt)) { - log << "mysql_stmt_execute() failed: " << mysql_stmt_error(stmt) << std::endl; - log << "IP: " << std::hex<< event.ip() << std::endl; + LOG << "mysql_stmt_execute() failed: " << mysql_stmt_error(stmt) << std::endl; + LOG << "IP: " << std::hex<< event.ip() << std::endl; return false; } return true; diff --git a/tools/import-trace/CMakeLists.txt b/tools/import-trace/CMakeLists.txt index bfa1e13d..939d87fa 100644 --- a/tools/import-trace/CMakeLists.txt +++ b/tools/import-trace/CMakeLists.txt @@ -1,22 +1,10 @@ -## Setup desired protobuf descriptions HERE ## -set(MY_PROTOS - ../../src/plugins/tracing/trace.proto -) - set(SRCS Importer.cc BasicImporter.cc DCiAOKernelImporter.cc ) -#### PROTOBUFS #### -find_package(Protobuf REQUIRED) -include_directories(${PROTOBUF_INCLUDE_DIRS}) -include_directories(${CMAKE_CURRENT_BINARY_DIR}) - -PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${MY_PROTOS}) - ## This is the example's campaign server distributing experiment parameters add_executable(import-trace main.cc ${SRCS} ${PROTO_SRCS} ${PROTO_HDRS}) -target_link_libraries(import-trace ${PROTOBUF_LIBRARY} -lmysqlclient fail-util fail-sal) +target_link_libraries(import-trace ${PROTOBUF_LIBRARY} -lmysqlclient fail-util fail-sal fail-comm) install(TARGETS import-trace RUNTIME DESTINATION bin) diff --git a/tools/import-trace/DCiAOKernelImporter.cc b/tools/import-trace/DCiAOKernelImporter.cc index 43ebe03e..a21dbcd6 100644 --- a/tools/import-trace/DCiAOKernelImporter.cc +++ b/tools/import-trace/DCiAOKernelImporter.cc @@ -2,7 +2,7 @@ #include #include "util/Logger.hpp" -extern fail::Logger log; +static fail::Logger LOG("DCiAOKernelImporter"); using namespace fail; @@ -18,12 +18,12 @@ bool DCiAOKernelImporter::inDynamicKernelMemory(fail::address_t addr) { bool DCiAOKernelImporter::copy_to_database(fail::ProtoIStream &ps) { if (m_elf == 0) { - log << "Please give an ELF Binary as a parameter" << std::endl; + LOG << "Please give an ELF Binary as a parameter" << std::endl; exit(-1); } if (getEnterKernelAddress() == 0 || getLeaveKernelAddress() == 0) { - log << "Pleave give a valid CiAO Binary with kernel dependability options enabled" << std::endl; + LOG << "Pleave give a valid CiAO Binary with kernel dependability options enabled" << std::endl; exit(-1); } @@ -93,12 +93,12 @@ bool DCiAOKernelImporter::copy_to_database(fail::ProtoIStream &ps) { // we're currently looking at; the EC is defined by // data_address [last_kernel_leave, read_instr] (instr_absolute) if (!add_trace_event(instr1, instr2, ev)) { - log << "add_trace_event failed" << std::endl; + LOG << "add_trace_event failed" << std::endl; return false; } row_count ++; if (row_count % 1000 == 0) { - log << "Imported " << row_count << " traces into the database" << std::endl; + LOG << "Imported " << row_count << " traces into the database" << std::endl; } } } @@ -106,7 +106,7 @@ bool DCiAOKernelImporter::copy_to_database(fail::ProtoIStream &ps) { } - log << "Inserted " << row_count << " traces into the database" << std::endl; + LOG << "Inserted " << row_count << " traces into the database" << std::endl; return true; } diff --git a/tools/import-trace/Importer.cc b/tools/import-trace/Importer.cc index 7eaf99e5..ef7d3371 100644 --- a/tools/import-trace/Importer.cc +++ b/tools/import-trace/Importer.cc @@ -5,7 +5,7 @@ using namespace fail; -extern Logger log; +static Logger LOG("Importer"); bool Importer::init(const std::string &variant, const std::string &benchmark, Database *db) { this->db = db; @@ -13,7 +13,7 @@ bool Importer::init(const std::string &variant, const std::string &benchmark, Da if (!m_variant_id) { return false; } - log << "Importing to variant " << variant << "/" << benchmark + LOG << "Importing to variant " << variant << "/" << benchmark << " (ID: " << m_variant_id << ")" << std::endl; return true; } @@ -23,7 +23,7 @@ bool Importer::clear_database() { ss << "DELETE FROM trace WHERE variant_id = " << m_variant_id; bool ret = db->query(ss.str().c_str()) == 0 ? false : true; - log << "deleted " << db->affected_rows() << " rows from trace table" << std::endl; + LOG << "deleted " << db->affected_rows() << " rows from trace table" << std::endl; return ret; } @@ -69,12 +69,12 @@ bool Importer::copy_to_database(fail::ProtoIStream &ps) { // we're currently looking at; the EC is defined by // data_address [instr1, instr2] (instr_absolute) if (!add_trace_event(instr1, instr2, ev)) { - log << "add_trace_event failed" << std::endl; + LOG << "add_trace_event failed" << std::endl; return false; } row_count ++; if (row_count % 1000 == 0) { - log << "Imported " << row_count << " traces into the database" << std::endl; + LOG << "Imported " << row_count << " traces into the database" << std::endl; } if (ev.accesstype() == ev.READ) { @@ -91,7 +91,7 @@ bool Importer::copy_to_database(fail::ProtoIStream &ps) { } } - log << "Inserted " << row_count << " traces into the database" << std::endl; + LOG << "Inserted " << row_count << " traces into the database" << std::endl; // FIXME // close all open intervals (right end of the fault-space) with fake trace event diff --git a/tools/import-trace/Importer.hpp b/tools/import-trace/Importer.hpp index 0ecd527d..0ac0f0d6 100644 --- a/tools/import-trace/Importer.hpp +++ b/tools/import-trace/Importer.hpp @@ -7,14 +7,14 @@ #include "util/ElfReader.hpp" #include "sal/SALConfig.hpp" #include "util/Database.hpp" -#include "trace.pb.h" +#include "comm/TracePlugin.pb.h" class Importer { protected: int m_variant_id; fail::ElfReader *m_elf; - fail::Database *db; + fail::Database *db; public: typedef unsigned instruction_count_t; diff --git a/tools/import-trace/main.cc b/tools/import-trace/main.cc index 209da08b..867e96af 100644 --- a/tools/import-trace/main.cc +++ b/tools/import-trace/main.cc @@ -17,12 +17,12 @@ using std::cerr; using std::endl; using std::cout; -Logger log("import-trace", true); +static Logger LOG("import-trace", true); ProtoIStream openProtoStream(std::string input_file) { std::ifstream *gz_test_ptr = new std::ifstream(input_file.c_str()), &gz_test = *gz_test_ptr; if (!gz_test) { - log << "couldn't open " << input_file << endl; + LOG << "couldn't open " << input_file << endl; exit(-1); } unsigned char b1, b2; @@ -31,16 +31,16 @@ ProtoIStream openProtoStream(std::string input_file) { if (b1 == 0x1f && b2 == 0x8b) { igzstream *tracef = new igzstream(input_file.c_str()); if (!tracef) { - log << "couldn't open " << input_file << endl; + LOG << "couldn't open " << input_file << endl; exit(-1); } - log << "opened file " << input_file << " in GZip mode" << endl; + LOG << "opened file " << input_file << " in GZip mode" << endl; delete gz_test_ptr; ProtoIStream ps(tracef); return ps; } - log << "opened file " << input_file << " in normal mode" << endl; + LOG << "opened file " << input_file << " in normal mode" << endl; ProtoIStream ps(gz_test_ptr); return ps; } @@ -85,18 +85,18 @@ int main(int argc, char *argv[]) { if (cmd[IMPORTER].count() > 0) { std::string imp(cmd[IMPORTER].first()->arg); if (imp == "BasicImporter") { - log << "Using BasicImporter" << endl; + LOG << "Using BasicImporter" << endl; importer = new BasicImporter(); } else if (imp == "DCiAOKernelImporter") { - log << "Using DCiAOKernelImporter" << endl; + LOG << "Using DCiAOKernelImporter" << endl; importer = new DCiAOKernelImporter(); } else { - log << "Unkown import method: " << imp << endl; + LOG << "Unkown import method: " << imp << endl; exit(-1); } } else { - log << "Using BasicImporter" << endl; + LOG << "Using BasicImporter" << endl; importer = new BasicImporter(); } @@ -133,7 +133,7 @@ int main(int argc, char *argv[]) { if (!importer->init(variant, benchmark, db)) { - log << "importer->init() failed" << endl; + LOG << "importer->init() failed" << endl; exit(-1); } importer->set_elf_file(elf_file); @@ -143,17 +143,17 @@ int main(int argc, char *argv[]) { //////////////////////////////////////////////////////////////// if (!importer->create_database()) { - log << "create_database() failed" << endl; + LOG << "create_database() failed" << endl; exit(-1); } if (!importer->clear_database()) { - log << "clear_database() failed" << endl; + LOG << "clear_database() failed" << endl; exit(-1); } if (!importer->copy_to_database(ps)) { - log << "copy_to_database() failed" << endl; + LOG << "copy_to_database() failed" << endl; exit(-1); } } diff --git a/tools/prune-trace/BasicPruner.cc b/tools/prune-trace/BasicPruner.cc index 4d9eae7d..c6946c8f 100644 --- a/tools/prune-trace/BasicPruner.cc +++ b/tools/prune-trace/BasicPruner.cc @@ -1,50 +1,50 @@ #include #include "BasicPruner.hpp" #include "util/Logger.hpp" -static fail::Logger log ("BasicPruner"); +static fail::Logger LOG ("BasicPruner"); bool BasicPruner::prune_all() { std::stringstream ss; ss << "INSERT INTO fsppilot (known_outcome, variant_id, instr2, data_address, fspmethod_id) " - "SELECT 0, variant_id, instr2, data_address, " << m_method_id << " " - "FROM trace " - "WHERE variant_id = " << m_variant_id << " AND accesstype = 'R'"; + "SELECT 0, variant_id, instr2, data_address, " << m_method_id << " " + "FROM trace " + "WHERE variant_id = " << m_variant_id << " AND accesstype = 'R'"; if (!db->query(ss.str().c_str())) return false; ss.str(""); int rows = db->affected_rows(); // single entry for known outcome (write access) ss << "INSERT INTO fsppilot (known_outcome, variant_id, instr2, data_address, fspmethod_id) " - "SELECT 1, variant_id, instr2, data_address, " << m_method_id << " " - "FROM trace " - "WHERE variant_id = " << m_variant_id << " AND accesstype = 'W' " - "LIMIT 1"; + "SELECT 1, variant_id, instr2, data_address, " << m_method_id << " " + "FROM trace " + "WHERE variant_id = " << m_variant_id << " AND accesstype = 'W' " + "LIMIT 1"; if (!db->query(ss.str().c_str())) return false; ss.str(""); rows += db->affected_rows(); - log << "created " << rows << " fsppilot entries" << std::endl; + LOG << "created " << rows << " fsppilot entries" << std::endl; ss << "INSERT INTO fspgroup (variant_id, instr2, data_address, fspmethod_id, pilot_id) " - "SELECT variant_id, instr2, data_address, fspmethod_id, id " - "FROM fsppilot " - "WHERE known_outcome = 0 AND fspmethod_id = " << m_method_id << " AND variant_id = " << m_variant_id; + "SELECT variant_id, instr2, data_address, fspmethod_id, id " + "FROM fsppilot " + "WHERE known_outcome = 0 AND fspmethod_id = " << m_method_id << " AND variant_id = " << m_variant_id; if (!db->query(ss.str().c_str())) return false; ss.str(""); rows = db->affected_rows(); ss << "INSERT INTO fspgroup (variant_id, instr2, data_address, fspmethod_id, pilot_id) " - "SELECT t.variant_id, t.instr2, t.data_address, p.fspmethod_id, p.id " - "FROM trace t " - "JOIN fsppilot p " - "ON t.variant_id = p.variant_id AND p.fspmethod_id = " << m_method_id << " AND p.known_outcome = 1 " - "WHERE t.variant_id = " << m_variant_id << " AND t.accesstype = 'W'"; + "SELECT t.variant_id, t.instr2, t.data_address, p.fspmethod_id, p.id " + "FROM trace t " + "JOIN fsppilot p " + "ON t.variant_id = p.variant_id AND p.fspmethod_id = " << m_method_id << " AND p.known_outcome = 1 " + "WHERE t.variant_id = " << m_variant_id << " AND t.accesstype = 'W'"; if (!db->query(ss.str().c_str())) return false; ss.str(""); rows += db->affected_rows(); - log << "created " << rows << " fspgroup entries" << std::endl; + LOG << "created " << rows << " fspgroup entries" << std::endl; return true; } diff --git a/tools/prune-trace/Pruner.cc b/tools/prune-trace/Pruner.cc index e598917d..7a00a589 100644 --- a/tools/prune-trace/Pruner.cc +++ b/tools/prune-trace/Pruner.cc @@ -3,10 +3,11 @@ #include "util/Logger.hpp" using namespace fail; -static Logger log ("Pruner"); +static Logger LOG ("Pruner"); #include "Pruner.hpp" + bool Pruner::init(const std::string &variant, const std::string &benchmark, Database *db) { this->db = db; if (!(m_variant_id = db->get_variant_id(variant, benchmark))) { @@ -15,7 +16,7 @@ bool Pruner::init(const std::string &variant, const std::string &benchmark, Data if (!(m_method_id = db->get_fspmethod_id(method_name()))) { return false; } - log << "Pruning variant " + LOG << "Pruning variant " << variant << "/" << benchmark << " (ID: " << m_variant_id << ")" << " with method " << method_name() << " (ID: " << m_method_id << ")" << std::endl; @@ -52,12 +53,12 @@ bool Pruner::clear_database() { std::stringstream ss; ss << "DELETE FROM fsppilot WHERE variant_id = " << m_variant_id << " AND fspmethod_id = " << m_method_id; bool ret = (bool) db->query(ss.str().c_str()); - log << "deleted " << db->affected_rows() << " rows from fsppilot table" << std::endl; + LOG << "deleted " << db->affected_rows() << " rows from fsppilot table" << std::endl; ss.str(""); ss << "DELETE FROM fspgroup WHERE variant_id = " << m_variant_id << " AND fspmethod_id = " << m_method_id; ret = ret && (bool) db->query(ss.str().c_str()); - log << "deleted " << db->affected_rows() << " rows from fspgroup table" << std::endl; + LOG << "deleted " << db->affected_rows() << " rows from fspgroup table" << std::endl; return ret; } diff --git a/tools/prune-trace/main.cc b/tools/prune-trace/main.cc index 33e28f05..8d1023d9 100644 --- a/tools/prune-trace/main.cc +++ b/tools/prune-trace/main.cc @@ -4,7 +4,7 @@ #include "util/CommandLine.hpp" #include "util/Logger.hpp" -static fail::Logger log("prune-trace", true); +static fail::Logger LOG("prune-trace", true); using namespace fail; using std::endl; @@ -29,7 +29,6 @@ int main(int argc, char *argv[]) { "-v/--variant\t Variant label (default: \"none\")"); CommandLine::option_handle BENCHMARK = cmd.addOption("b", "benchmark", Arg::Required, "-b/--benchmark\t Benchmark label (default: \"none\")\n"); - CommandLine::option_handle PRUNER = cmd.addOption("p", "prune-method", Arg::Required, "-p/--prune-method\t Which import method to use (default: basic)"); @@ -42,15 +41,15 @@ int main(int argc, char *argv[]) { if (cmd[PRUNER].count() > 0) { std::string imp(cmd[PRUNER].first()->arg); if (imp == "basic") { - log << "Using BasicPruner" << endl; + LOG << "Using BasicPruner" << endl; pruner = new BasicPruner(); } else { - log << "Unkown import method: " << imp << endl; + LOG << "Unkown import method: " << imp << endl; exit(-1); } } else { - log << "Using BasicPruner" << endl; + LOG << "Using BasicPruner" << endl; pruner = new BasicPruner(); } @@ -72,7 +71,7 @@ int main(int argc, char *argv[]) { benchmark = "none"; if (!pruner->init(variant, benchmark, db)) { - log << "pruner->init() failed" << endl; + LOG << "pruner->init() failed" << endl; exit(-1); } @@ -80,17 +79,17 @@ int main(int argc, char *argv[]) { // Do the actual import //////////////////////////////////////////////////////////////// if (!pruner->create_database()) { - log << "create_database() failed" << endl; + LOG << "create_database() failed" << endl; exit(-1); } if (!pruner->clear_database()) { - log << "clear_database() failed" << endl; + LOG << "clear_database() failed" << endl; exit(-1); } if (!pruner->prune_all()) { - log << "prune_all() failed" << endl; + LOG << "prune_all() failed" << endl; exit(-1); }