Merge branch 'mysql-concurrency-fixes'
This commit is contained in:
@ -34,7 +34,7 @@ else(MYSQL_CONFIG)
|
|||||||
/usr/local/mysql/include/mysql
|
/usr/local/mysql/include/mysql
|
||||||
/usr/include
|
/usr/include
|
||||||
/usr/include/mysql
|
/usr/include/mysql
|
||||||
#find_library(mysqlclient ...
|
#find_library(mysqlclient_r ...
|
||||||
# PATHS
|
# PATHS
|
||||||
# ${MYSQL_ADD_LIBRARY_PATH}
|
# ${MYSQL_ADD_LIBRARY_PATH}
|
||||||
# /usr/lib/mysql
|
# /usr/lib/mysql
|
||||||
|
|||||||
@ -92,6 +92,8 @@ bool DatabaseCampaign::run() {
|
|||||||
log_send << "wait for the clients to complete" << std::endl;
|
log_send << "wait for the clients to complete" << std::endl;
|
||||||
campaignmanager.noMoreParameters();
|
campaignmanager.noMoreParameters();
|
||||||
|
|
||||||
|
delete db;
|
||||||
|
|
||||||
#ifndef __puma
|
#ifndef __puma
|
||||||
collect_thread.join();
|
collect_thread.join();
|
||||||
#endif
|
#endif
|
||||||
@ -101,12 +103,27 @@ bool DatabaseCampaign::run() {
|
|||||||
void DatabaseCampaign::collect_result_thread() {
|
void DatabaseCampaign::collect_result_thread() {
|
||||||
log_recv << "Started result receive thread" << std::endl;
|
log_recv << "Started result receive thread" << std::endl;
|
||||||
|
|
||||||
|
// create an own DB connection, because we cannot use one concurrently
|
||||||
|
Database *db_recv = Database::cmdline_connect();
|
||||||
|
db_connect.set_insert_database_handle(db_recv);
|
||||||
|
|
||||||
ExperimentData *res;
|
ExperimentData *res;
|
||||||
|
|
||||||
while ((res = static_cast<ExperimentData *>(campaignmanager.getDone()))) {
|
while ((res = static_cast<ExperimentData *>(campaignmanager.getDone()))) {
|
||||||
db_connect.insert_row(&res->getMessage());
|
db_connect.insert_row(&res->getMessage());
|
||||||
delete res;
|
delete res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log_recv << "Results complete, updating DB statistics ..." << std::endl;
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "ANALYZE TABLE " << db_connect.result_table();
|
||||||
|
if (!db_recv->query(ss.str().c_str())) {
|
||||||
|
log_recv << "failed!" << std::endl;
|
||||||
|
} else {
|
||||||
|
log_recv << "done." << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete db_recv;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DatabaseCampaign::run_variant(Database::Variant variant) {
|
bool DatabaseCampaign::run_variant(Database::Variant variant) {
|
||||||
|
|||||||
@ -229,6 +229,9 @@ void Database::cmdline_setup() {
|
|||||||
"-h/--hostname \tMYSQL Hostname (default: taken from ~/.my.cnf)");
|
"-h/--hostname \tMYSQL Hostname (default: taken from ~/.my.cnf)");
|
||||||
USERNAME = cmd.addOption("u", "username", Arg::Required,
|
USERNAME = cmd.addOption("u", "username", Arg::Required,
|
||||||
"-u/--username \tMYSQL Username (default: taken from ~/.my.cnf, or your current user)\n");
|
"-u/--username \tMYSQL Username (default: taken from ~/.my.cnf, or your current user)\n");
|
||||||
|
|
||||||
|
// should be called before any threads are spawned
|
||||||
|
mysql_library_init(0, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
Database * Database::cmdline_connect() {
|
Database * Database::cmdline_connect() {
|
||||||
|
|||||||
@ -355,9 +355,11 @@ int DatabaseProtobufAdapter::TypeBridge_message::gatherTypes(StringJoiner &inser
|
|||||||
void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) {
|
void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) {
|
||||||
assert (toplevel_desc != 0);
|
assert (toplevel_desc != 0);
|
||||||
|
|
||||||
std::stringstream create_table_stmt, insert_stmt;
|
std::stringstream create_table_stmt;
|
||||||
StringJoiner insert_join, primary_join, question_marks;
|
StringJoiner insert_join, primary_join, question_marks;
|
||||||
|
|
||||||
|
insert_stmt.str("");
|
||||||
|
|
||||||
result_table_name = "result_" + toplevel_desc->name();
|
result_table_name = "result_" + toplevel_desc->name();
|
||||||
|
|
||||||
/* Fill our top level dummy type bridge with the fields from the
|
/* Fill our top level dummy type bridge with the fields from the
|
||||||
@ -376,14 +378,6 @@ void DatabaseProtobufAdapter::create_table(const Descriptor *toplevel_desc) {
|
|||||||
|
|
||||||
// Create the Table
|
// Create the Table
|
||||||
db->query(create_table_stmt.str().c_str());
|
db->query(create_table_stmt.str().c_str());
|
||||||
|
|
||||||
|
|
||||||
// Prepare the insert statement
|
|
||||||
stmt = mysql_stmt_init(db->getHandle());
|
|
||||||
if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) {
|
|
||||||
LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db->getHandle()) << std::endl;
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -414,6 +408,17 @@ int DatabaseProtobufAdapter::field_size_at_pos(const Message *msg, std::vector<i
|
|||||||
bool DatabaseProtobufAdapter::insert_row(const google::protobuf::Message *msg) {
|
bool DatabaseProtobufAdapter::insert_row(const google::protobuf::Message *msg) {
|
||||||
assert (msg->GetDescriptor() != 0 && msg->GetReflection() != 0);
|
assert (msg->GetDescriptor() != 0 && msg->GetReflection() != 0);
|
||||||
|
|
||||||
|
if (!stmt) {
|
||||||
|
// Prepare the insert statement
|
||||||
|
// We didn't do that right in create_table() because we need to use the
|
||||||
|
// right DB connection for that (which may not have existed yet then).
|
||||||
|
stmt = mysql_stmt_init(db_insert->getHandle());
|
||||||
|
if (mysql_stmt_prepare(stmt, insert_stmt.str().c_str(), insert_stmt.str().length())) {
|
||||||
|
LOG << "query '" << insert_stmt.str() << "' failed: " << mysql_error(db_insert->getHandle()) << std::endl;
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MYSQL_BIND *bind = new MYSQL_BIND[top_level_msg.field_count];
|
MYSQL_BIND *bind = new MYSQL_BIND[top_level_msg.field_count];
|
||||||
|
|
||||||
/* We determine how many columns should be produced */
|
/* We determine how many columns should be produced */
|
||||||
|
|||||||
@ -13,8 +13,9 @@
|
|||||||
namespace fail {
|
namespace fail {
|
||||||
|
|
||||||
class DatabaseProtobufAdapter {
|
class DatabaseProtobufAdapter {
|
||||||
Database *db;
|
Database *db, *db_insert;
|
||||||
MYSQL_STMT *stmt;
|
MYSQL_STMT *stmt;
|
||||||
|
std::stringstream insert_stmt;
|
||||||
|
|
||||||
void error_create_table();
|
void error_create_table();
|
||||||
|
|
||||||
@ -192,8 +193,20 @@ class DatabaseProtobufAdapter {
|
|||||||
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DatabaseProtobufAdapter() : db(0), top_level_msg(0, 0, 0) {}
|
DatabaseProtobufAdapter() : db(0), db_insert(0), stmt(0), top_level_msg(0, 0, 0) {}
|
||||||
void set_database_handle(Database *db) { this->db = db; }
|
void set_database_handle(Database *db)
|
||||||
|
{
|
||||||
|
this->db = db;
|
||||||
|
if (!db_insert) {
|
||||||
|
db_insert = db;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Set a different database handle to be used in insert_row(). Necessary
|
||||||
|
* if INSERTs are done in a separate thread, while the handle set with
|
||||||
|
* set_database_handle() is still in use concurrently.
|
||||||
|
*/
|
||||||
|
void set_insert_database_handle(Database *db) { db_insert = db; }
|
||||||
void create_table(const google::protobuf::Descriptor *);
|
void create_table(const google::protobuf::Descriptor *);
|
||||||
bool insert_row(const google::protobuf::Message *msg);
|
bool insert_row(const google::protobuf::Message *msg);
|
||||||
std::string result_table() { return result_table_name; }
|
std::string result_table() { return result_table_name; }
|
||||||
|
|||||||
@ -30,6 +30,6 @@ target_link_libraries(fail-${EXPERIMENT_NAME} ${PROTOBUF_LIBRARY} fail-llvmdisas
|
|||||||
|
|
||||||
## This is the example's campaign server distributing experiment parameters
|
## This is the example's campaign server distributing experiment parameters
|
||||||
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
||||||
target_link_libraries(${EXPERIMENT_NAME}-server -Wl,--start-group fail-${EXPERIMENT_NAME} fail-sal fail-util fail-cpn fail-comm ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY} -lmysqlclient -Wl,--end-group)
|
target_link_libraries(${EXPERIMENT_NAME}-server -Wl,--start-group fail-${EXPERIMENT_NAME} fail-sal fail-util fail-cpn fail-comm ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY} -lmysqlclient_r -Wl,--end-group)
|
||||||
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
||||||
|
|
||||||
|
|||||||
@ -42,5 +42,5 @@ target_link_libraries(fail-${EXPERIMENT_NAME} ${LIBUDIS86_LIBRARIES} ${PROTOBUF_
|
|||||||
|
|
||||||
## This is the example's campaign server distributing experiment parameters
|
## This is the example's campaign server distributing experiment parameters
|
||||||
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
||||||
target_link_libraries(${EXPERIMENT_NAME}-server fail-${EXPERIMENT_NAME} -Wl,--start-group fail-sal fail-util fail-cpn fail-comm ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY} -lmysqlclient -Wl,--end-group)
|
target_link_libraries(${EXPERIMENT_NAME}-server fail-${EXPERIMENT_NAME} -Wl,--start-group fail-sal fail-util fail-cpn fail-comm ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY} -lmysqlclient_r -Wl,--end-group)
|
||||||
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
||||||
|
|||||||
@ -31,5 +31,5 @@ target_link_libraries(fail-${EXPERIMENT_NAME} ${PROTOBUF_LIBRARY})
|
|||||||
|
|
||||||
## This is the example's campaign server distributing experiment parameters
|
## This is the example's campaign server distributing experiment parameters
|
||||||
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
add_executable(${EXPERIMENT_NAME}-server main.cc)
|
||||||
target_link_libraries(${EXPERIMENT_NAME}-server fail-${EXPERIMENT_NAME} fail fail-cpn fail-util -lmysqlclient ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY})
|
target_link_libraries(${EXPERIMENT_NAME}-server fail-${EXPERIMENT_NAME} fail fail-cpn fail-util -lmysqlclient_r ${PROTOBUF_LIBRARY} ${Boost_THREAD_LIBRARY})
|
||||||
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
install(TARGETS ${EXPERIMENT_NAME}-server RUNTIME DESTINATION bin)
|
||||||
|
|||||||
Reference in New Issue
Block a user