DatabaseCampaign: MySQL / concurrency fixes

According to
<http://dev.mysql.com/doc/refman/5.5/en/c-api-threaded-clients.html>,
a MySQL connection handle must not be used concurrently with an open
result set and mysql_use_result() in one thread
(DatabaseCampaign::run()), and mysql_query() in another
(DatabaseCampaign::collect_result_thread()).  This indeed leads to
crashes when bounding the outgoing job queue (SERVER_OUT_QUEUE_SIZE),
and maybe even more insidous effects in other cases.  The solution is
to create separate connections for both threads.

Additionally, call mysql_library_init() before spawning any threads.

Change-Id: I2981f2fdc67c9a2cbe8781f1a21654418f621aeb
This commit is contained in:
Horst Schirmeier
2013-12-02 19:33:24 +01:00
parent 0907dfb0ae
commit 33b63651ae
4 changed files with 41 additions and 12 deletions

View File

@ -92,6 +92,8 @@ bool DatabaseCampaign::run() {
log_send << "wait for the clients to complete" << std::endl;
campaignmanager.noMoreParameters();
delete db;
#ifndef __puma
collect_thread.join();
#endif
@ -101,12 +103,18 @@ bool DatabaseCampaign::run() {
void DatabaseCampaign::collect_result_thread() {
log_recv << "Started result receive thread" << std::endl;
// create an own DB connection, because we cannot use one concurrently
Database *db_recv = Database::cmdline_connect();
db_connect.set_insert_database_handle(db_recv);
ExperimentData *res;
while ((res = static_cast<ExperimentData *>(campaignmanager.getDone()))) {
db_connect.insert_row(&res->getMessage());
delete res;
}
delete db_recv;
}
bool DatabaseCampaign::run_variant(Database::Variant variant) {

View File

@ -229,6 +229,9 @@ void Database::cmdline_setup() {
"-h/--hostname \tMYSQL Hostname (default: taken from ~/.my.cnf)");
USERNAME = cmd.addOption("u", "username", Arg::Required,
"-u/--username \tMYSQL Username (default: taken from ~/.my.cnf, or your current user)\n");
// should be called before any threads are spawned
mysql_library_init(0, NULL, NULL);
}
Database * Database::cmdline_connect() {

View File

@ -355,9 +355,11 @@ int DatabaseProtobufAdapter::TypeBridge_message::gatherTypes(StringJoiner &inser
void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) {
assert (toplevel_desc != 0);
std::stringstream create_table_stmt, insert_stmt;
std::stringstream create_table_stmt;
StringJoiner insert_join, primary_join, question_marks;
insert_stmt.str("");
result_table_name = "result_" + toplevel_desc->name();
/* Fill our top level dummy type bridge with the fields from the
@ -376,14 +378,6 @@ void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) {
// Create the Table
db->query(create_table_stmt.str().c_str());
// Prepare the insert statement
stmt = mysql_stmt_init(db->getHandle());
if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) {
LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db->getHandle()) << std::endl;
exit(-1);
}
}
@ -414,6 +408,17 @@ int DatabaseProtobufAdapter::field_size_at_pos(const Message *msg, std::vector<i
bool DatabaseProtobufAdapter::insert_row(const google::protobuf::Message *msg) {
assert (msg->GetDescriptor() != 0 && msg->GetReflection() != 0);
if (!stmt) {
// Prepare the insert statement
// We didn't do that right in create_table() because we need to use the
// right DB connection for that (which may not have existed yet then).
stmt = mysql_stmt_init(db_insert->getHandle());
if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) {
LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db_insert->getHandle()) << std::endl;
exit(-1);
}
}
MYSQL_BIND *bind = new MYSQL_BIND[top_level_msg.field_count];
/* We determine how many columns should be produced */

View File

@ -13,8 +13,9 @@
namespace fail {
class DatabaseProtobufAdapter {
Database *db;
Database *db, *db_insert;
MYSQL_STMT *stmt;
std::stringstream insert_stmt;
void error_create_table();
@ -192,8 +193,20 @@ class DatabaseProtobufAdapter {
public:
DatabaseProtobufAdapter() : db(0), top_level_msg(0, 0, 0) {}
void set_database_handle(Database *db) { this->db = db; }
DatabaseProtobufAdapter() : db(0), db_insert(0), stmt(0), top_level_msg(0, 0, 0) {}
void set_database_handle(Database *db)
{
this->db = db;
if (!db_insert) {
db_insert = db;
}
}
/**
* Set a different database handle to be used in insert_row(). Necessary
* if INSERTs are done in a separate thread, while the handle set with
* set_database_handle() is still in use concurrently.
*/
void set_insert_database_handle(Database *db) { db_insert = db; }
void create_table(const google::protobuf::Descriptor *);
bool insert_row(const google::protobuf::Message *msg);
std::string result_table() { return result_table_name; }