diff --git a/src/core/cpn/DatabaseCampaign.cc b/src/core/cpn/DatabaseCampaign.cc index 2290cdc6..352c6819 100644 --- a/src/core/cpn/DatabaseCampaign.cc +++ b/src/core/cpn/DatabaseCampaign.cc @@ -92,6 +92,8 @@ bool DatabaseCampaign::run() { log_send << "wait for the clients to complete" << std::endl; campaignmanager.noMoreParameters(); + delete db; + #ifndef __puma collect_thread.join(); #endif @@ -101,12 +103,18 @@ bool DatabaseCampaign::run() { void DatabaseCampaign::collect_result_thread() { log_recv << "Started result receive thread" << std::endl; + // create an own DB connection, because we cannot use one concurrently + Database *db_recv = Database::cmdline_connect(); + db_connect.set_insert_database_handle(db_recv); + ExperimentData *res; while ((res = static_cast(campaignmanager.getDone()))) { db_connect.insert_row(&res->getMessage()); delete res; } + + delete db_recv; } bool DatabaseCampaign::run_variant(Database::Variant variant) { diff --git a/src/core/util/Database.cc b/src/core/util/Database.cc index dc0b8073..d4e987d6 100644 --- a/src/core/util/Database.cc +++ b/src/core/util/Database.cc @@ -229,6 +229,9 @@ void Database::cmdline_setup() { "-h/--hostname \tMYSQL Hostname (default: taken from ~/.my.cnf)"); USERNAME = cmd.addOption("u", "username", Arg::Required, "-u/--username \tMYSQL Username (default: taken from ~/.my.cnf, or your current user)\n"); + + // should be called before any threads are spawned + mysql_library_init(0, NULL, NULL); } Database * Database::cmdline_connect() { diff --git a/src/core/util/DatabaseProtobufAdapter.cc b/src/core/util/DatabaseProtobufAdapter.cc index cebc8514..cd7cf0f7 100644 --- a/src/core/util/DatabaseProtobufAdapter.cc +++ b/src/core/util/DatabaseProtobufAdapter.cc @@ -355,9 +355,11 @@ int DatabaseProtobufAdapter::TypeBridge_message::gatherTypes(StringJoiner &inser void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) { assert (toplevel_desc != 0); - std::stringstream create_table_stmt, insert_stmt; + std::stringstream create_table_stmt; StringJoiner insert_join, primary_join, question_marks; + insert_stmt.str(""); + result_table_name = "result_" + toplevel_desc->name(); /* Fill our top level dummy type bridge with the fields from the @@ -376,14 +378,6 @@ void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) { // Create the Table db->query(create_table_stmt.str().c_str()); - - - // Prepare the insert statement - stmt = mysql_stmt_init(db->getHandle()); - if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) { - LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db->getHandle()) << std::endl; - exit(-1); - } } @@ -414,6 +408,17 @@ int DatabaseProtobufAdapter::field_size_at_pos(const Message *msg, std::vectorGetDescriptor() != 0 && msg->GetReflection() != 0); + if (!stmt) { + // Prepare the insert statement + // We didn't do that right in create_table() because we need to use the + // right DB connection for that (which may not have existed yet then). + stmt = mysql_stmt_init(db_insert->getHandle()); + if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) { + LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db_insert->getHandle()) << std::endl; + exit(-1); + } + } + MYSQL_BIND *bind = new MYSQL_BIND[top_level_msg.field_count]; /* We determine how many columns should be produced */ diff --git a/src/core/util/DatabaseProtobufAdapter.hpp b/src/core/util/DatabaseProtobufAdapter.hpp index 9abf6b05..6f0f8ad1 100644 --- a/src/core/util/DatabaseProtobufAdapter.hpp +++ b/src/core/util/DatabaseProtobufAdapter.hpp @@ -13,8 +13,9 @@ namespace fail { class DatabaseProtobufAdapter { - Database *db; + Database *db, *db_insert; MYSQL_STMT *stmt; + std::stringstream insert_stmt; void error_create_table(); @@ -192,8 +193,20 @@ class DatabaseProtobufAdapter { public: - DatabaseProtobufAdapter() : db(0), top_level_msg(0, 0, 0) {} - void set_database_handle(Database *db) { this->db = db; } + DatabaseProtobufAdapter() : db(0), db_insert(0), stmt(0), top_level_msg(0, 0, 0) {} + void set_database_handle(Database *db) + { + this->db = db; + if (!db_insert) { + db_insert = db; + } + } + /** + * Set a different database handle to be used in insert_row(). Necessary + * if INSERTs are done in a separate thread, while the handle set with + * set_database_handle() is still in use concurrently. + */ + void set_insert_database_handle(Database *db) { db_insert = db; } void create_table(const google::protobuf::Descriptor *); bool insert_row(const google::protobuf::Message *msg); std::string result_table() { return result_table_name; }