diff --git a/src/core/comm/CMakeLists.txt b/src/core/comm/CMakeLists.txt index 592bc718..de3068a0 100644 --- a/src/core/comm/CMakeLists.txt +++ b/src/core/comm/CMakeLists.txt @@ -1,7 +1,5 @@ set(SRCS ExperimentData.hpp - SocketComm.hpp - SocketComm.cc ) ## Setup desired protobuf descriptions HERE ## diff --git a/src/core/comm/SocketComm.cc b/src/core/comm/SocketComm.cc deleted file mode 100644 index 7b8f4b23..00000000 --- a/src/core/comm/SocketComm.cc +++ /dev/null @@ -1,117 +0,0 @@ -#include -#include -#include -#include - -#include "SocketComm.hpp" - -namespace fail { - -void SocketComm::init() -{ - // It's usually much easier to handle the error on write(), than to do - // anything intelligent in a SIGPIPE handler. - signal(SIGPIPE, SIG_IGN); -} - -bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg) -{ - int size = htonl(msg.ByteSize()); - std::string buf; - if (safe_write(sockfd, &size, sizeof(size)) == -1 - || !msg.SerializeToString(&buf) - || safe_write(sockfd, buf.c_str(), buf.size()) == -1) { - return false; - } - return true; -} - -bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg) -{ - char *buf; - int bufsiz; - if ((buf = getBuf(sockfd, &bufsiz))) { - std::string st(buf, bufsiz); - delete [] buf; - return msg.ParseFromString(st); - } - return false; -} - -bool SocketComm::dropMsg(int sockfd) -{ - char *buf; - int bufsiz; - if ((buf = getBuf(sockfd, &bufsiz))) { - delete [] buf; - return true; - } - return false; -} - -char * SocketComm::getBuf(int sockfd, int *size) -{ - char *buf; - if (safe_read(sockfd, size, sizeof(int)) == -1) { - return 0; - } - *size = ntohl(*size); - buf = new char[*size]; - if (safe_read(sockfd, buf, *size) == -1) { - delete [] buf; - return 0; - } - return buf; -} - -int SocketComm::timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout) -{ - struct pollfd pfd = {sockfd, POLLIN, 0}; - int ret = poll(&pfd, 1, timeout); - if (ret > 0) { - return accept(sockfd, addr, addrlen); - } - errno = EWOULDBLOCK; - return -1; -} - -ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count) -{ - ssize_t ret; - const char *cbuf = (const char *) buf; - do { - ret = write(fd, cbuf, count); - if (ret == -1) { - if (errno == EINTR) { - continue; - } - return -1; - } - count -= ret; - cbuf += ret; - } while (count); - return cbuf - (const char *)buf; -} - -ssize_t SocketComm::safe_read(int fd, void *buf, size_t count) -{ - ssize_t ret; - char *cbuf = (char *) buf; - do { - ret = read(fd, cbuf, count); - if (ret == -1) { - if (errno == EINTR) { - continue; - } - return -1; - } else if (ret == 0) { - // this deliberately deviates from read(2) - return -1; - } - count -= ret; - cbuf += ret; - } while (count); - return cbuf - (const char *) buf; -} - -} // end-of-namespace: fail diff --git a/src/core/comm/SocketComm.hpp b/src/core/comm/SocketComm.hpp deleted file mode 100644 index 5bb1084e..00000000 --- a/src/core/comm/SocketComm.hpp +++ /dev/null @@ -1,66 +0,0 @@ -/** - * \brief Socket based communication wrapper functions. - */ - -#ifndef __SOCKET_COMM_HPP__ -#define __SOCKET_COMM_HPP__ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace fail { - -class SocketComm { -public: - /** - * This allows us to ignore SIGPIPE. - */ - static void init(); - /** - * Send Protobuf-generated message - * @param sockfd open socket descriptor to write to - * @param Msg Reference to Protobuf generated message type - * \return false if message sending failed - */ - static bool sendMsg(int sockfd, google::protobuf::Message& msg); - /** - * Receive Protobuf-generated message - * @param sockfd open socket descriptor to read from - * @param Msg Reference to Protobuf generated message type - * \return false if message reception failed - */ - static bool rcvMsg(int sockfd, google::protobuf::Message& msg); - - /** - * Receive Protobuf-generated message and just drop it - * @param sockfd open socket descriptor to read from - * \return false if message reception failed - */ - static bool dropMsg(int sockfd); - - /** - * An accept() wrapper that times out (using poll(2)) - * @param sockfd listening socket descriptor to accept connections from - * @param addr same as accept()'s - * @param addrlen same as accept()'s - * @param timeout timeout in milliseconds (see poll(2)) - * \return < 0 on failure, > 0 for a new socket connection - */ - static int timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout); - -private: - static char *getBuf(int sockfd, int *size); - static ssize_t safe_write(int fd, const void *buf, size_t count); - static ssize_t safe_read(int fd, void *buf, size_t count); -}; - -} // end-of-namespace: fail - -#endif // __SOCKET_COMM_HPP__ diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index bcf96417..02807070 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -12,7 +12,6 @@ #include #include -#include "comm/SocketComm.hpp" #include "JobServer.hpp" #ifndef __puma @@ -83,28 +82,23 @@ static bool rcvMsg(tcp::socket &socket, google::protobuf::Message &msg, boost::system::error_code ec; int size; size_t len = async_read(socket, buffer(&size, sizeof(size)), yield[ec]); - if (len != sizeof(size)) { + if (ec || len != sizeof(size)) { + std::cerr << ec.message() << std::endl; std::cerr << "Read " << len << " instead of " << sizeof(size) << " bytes from socket" << std::endl; return false; } - if (ec) { - std::cerr << ec.message() << std::endl; - return false; - } const size_t msg_size = ntohl(size); std::vector buf(msg_size); len = async_read(socket, buffer(buf), yield[ec]); - if (len != msg_size) { + if (ec || len != sizeof(size)) { + std::cerr << ec.message() << std::endl; std::cerr << "Read " << len << " instead of " << msg_size << " bytes from socket" << std::endl; return false; } - if (ec) { - std::cerr << ec.message() << std::endl; - return false; - } + return drop ? true : msg.ParseFromArray(buf.data(), buf.size()); } @@ -167,7 +161,6 @@ JobServer::JobServer(const unsigned short port) : m_d(std::make_shared()), m_port(port), m_finish(false), m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE) { - SocketComm::init(); m_runid = std::time(0); #ifndef __puma m_serverThread = new boost::thread(&JobServer::run, this); diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 18efc7cb..8704dae6 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -7,7 +7,6 @@ #include "config/FailConfig.hpp" #include "comm/ExperimentData.hpp" #include "comm/FailControlMessage.pb.h" -#include "comm/SocketComm.hpp" #include #include diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index c5ca59db..d411ab7d 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -1,28 +1,34 @@ +#include +#include +#include +#include + +#include +#include + #include "JobClient.hpp" -#include "comm/SocketComm.hpp" using namespace std; +using namespace boost::asio; namespace fail { +struct JobClient::impl { + io_service ios; + ip::tcp::socket socket; + + impl() : socket(ios) {} +}; + JobClient::JobClient(const std::string& server, int port) - : m_server(server), m_server_port(port), + : m_d(new impl), m_server(server), m_server_port(port), m_server_runid(0), // server accepts this for virgin clients m_job_runtime_total(0), m_job_throughput(CLIENT_JOB_INITIAL), // will be corrected after measurement m_job_total(0), m_connect_failed(false) { - SocketComm::init(); - m_server_ent = gethostbyname(m_server.c_str()); - - cout << "JobServer: " << m_server.c_str() << endl; - - if (m_server_ent == NULL) { - herror("[Client@gethostbyname()]"); - // TODO: Log-level? - exit(1); - } + cout << "JobServer: " << server << ":" << port << endl; srand(time(NULL)); // needed for random backoff (see connectToServer) } @@ -30,6 +36,7 @@ JobClient::~JobClient() { // Send back completed jobs to the server sendResultsToServer(); + delete m_d; } bool JobClient::connectToServer() @@ -39,52 +46,40 @@ bool JobClient::connectToServer() return false; } - int retries = CLIENT_RETRY_COUNT; - while (true) { - // Connect to server - struct sockaddr_in serv_addr; - m_sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (m_sockfd < 0) { - perror("[Client@socket()]"); - // TODO: Log-level? - exit(0); - } + boost::asio::ip::tcp::resolver resolver(m_d->ios); + boost::asio::ip::tcp::resolver::query query( + m_server, std::to_string(m_server_port)); - /* Enable address reuse */ - int on = 1; - setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) ); + // random engine for backoff. + std::mt19937_64 engine(time(NULL)); - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - memcpy(&serv_addr.sin_addr.s_addr, m_server_ent->h_addr, m_server_ent->h_length); - serv_addr.sin_port = htons(m_server_port); - - if (connect(m_sockfd, (sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { - perror("[Client@connect()]"); - close(m_sockfd); - // TODO: Log-level? - if (retries > 0) { - // Wait CLIENT_RAND_BACKOFF_TSTART to RAND_BACKOFF_TEND seconds: - int delay = rand() % (CLIENT_RAND_BACKOFF_TEND-CLIENT_RAND_BACKOFF_TSTART) + CLIENT_RAND_BACKOFF_TSTART; - cout << "[Client] Retrying to connect to server in ~" << delay << "s..." << endl; - // TODO: Log-level? - sleep(delay); - usleep(rand() % 1000000); - --retries; - continue; + for (int retries = CLIENT_RETRY_COUNT; retries > 0; --retries) { + for (ip::tcp::resolver::iterator end, + addrs = resolver.resolve(query); + addrs != end; ++addrs) { + boost::system::error_code error; + m_d->socket.connect(addrs->endpoint(), error); + if (!error) { + cout << "[Client] Connection established!" + << endl; + return true; } - cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT << " times); " - << "I'll give it up!" << endl; - // TODO: Log-level? - m_connect_failed = true; - return false; // finally: unable to connect, give it up :-( + perror("[Client@connect()]"); + m_d->socket.close(); } - break; // connected! :-) - } - cout << "[Client] Connection established!" << endl; - // TODO: Log-level? - return true; + std::uniform_real_distribution<> distribution( + CLIENT_RAND_BACKOFF_TSTART, CLIENT_RAND_BACKOFF_TEND); + const auto delay = std::chrono::duration(distribution(engine)); + cout << "[Client] Retrying to connect to server in ~" << delay.count() + << "s..." << endl; + std::this_thread::sleep_for(delay); + } + + cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT + << " times); I'll give it up!" << endl; + m_connect_failed = true; + return false; } bool JobClient::getParam(ExperimentData& exp) @@ -109,6 +104,59 @@ bool JobClient::getParam(ExperimentData& exp) } } +template +bool sendMsg(Socket &s, google::protobuf::Message &msg) +{ + int size = htonl(msg.ByteSize()); + const auto msg_size = msg.ByteSize() + sizeof(size); + std::string buf; + + if (!msg.SerializeToString(&buf)) + return false; + + boost::array bufs{buffer(&size, sizeof(size)), + buffer(buf)}; + + boost::system::error_code ec; + const auto len = boost::asio::write(s, bufs, ec); + if (ec || len != msg_size) { + std::cerr << ec.message() << std::endl; + std::cerr << "Sent " << len << " instead of " << msg_size + << " bytes from socket" << std::endl; + return false; + } + + return true; +} + +template +bool rcvMsg(Socket &s, google::protobuf::Message &msg) +{ + int size; + std::size_t len; + boost::system::error_code ec; + + len = boost::asio::read(s, buffer(&size, sizeof(size)), ec); + if (ec || len != sizeof(size)) { + std::cerr << ec.message() << std::endl; + std::cerr << "Read " << len << " instead of " << sizeof(size) + << " bytes from socket" << std::endl; + return false; + } + + const auto msglen = ntohl(size); + std::vector buf(msglen); + len = boost::asio::read(s, buffer(buf), ec); + if (ec || len != sizeof(size)) { + std::cerr << ec.message() << std::endl; + std::cerr << "Read " << len << " instead of " << msglen + << " bytes from socket" << std::endl; + return false; + } + + return msg.ParseFromArray(buf.data(), buf.size()); +} + FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp) { FailControlMessage ctrlmsg; @@ -127,15 +175,15 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp ctrlmsg.set_run_id(m_server_runid); ctrlmsg.set_job_size(m_job_throughput); //Request for a number of jobs - if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { + if (!sendMsg(m_d->socket, ctrlmsg)) { + m_d->socket.close(); // Failed to send message? Retry. - close(m_sockfd); return FailControlMessage::COME_AGAIN; } ctrlmsg.Clear(); - if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) { + if (!rcvMsg(m_d->socket, ctrlmsg)) { + m_d->socket.close(); // Failed to receive message? Retry. - close(m_sockfd); return FailControlMessage::COME_AGAIN; } @@ -148,7 +196,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp for (i = 0; i < ctrlmsg.job_size(); i++) { ExperimentData* temp_exp = new ExperimentData(exp.getMessage().New()); - if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) { + if (!rcvMsg(m_d->socket, temp_exp->getMessage())) { // looks like we won't receive more jobs now, cleanup delete &temp_exp->getMessage(); delete temp_exp; @@ -170,7 +218,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp default: break; } - close(m_sockfd); + m_d->socket.close(); //start time measurement for throughput calculation m_job_runtime.startTimer(); @@ -266,14 +314,14 @@ bool JobClient::sendResultsToServer() cout << "]"; // TODO: Log-level? - if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { - close(m_sockfd); + if (!sendMsg(m_d->socket, ctrlmsg)) { + m_d->socket.close(); return false; } for (i = 0; i < ctrlmsg.job_size(); i++) { - if (!SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage())) { - close(m_sockfd); + if (!sendMsg(m_d->socket, m_results.front()->getMessage())) { + m_d->socket.close(); return false; } delete &m_results.front()->getMessage(); @@ -282,7 +330,7 @@ bool JobClient::sendResultsToServer() } // Close connection. - close(m_sockfd); + m_d->socket.close(); return true; } return true; diff --git a/src/core/efw/JobClient.hpp b/src/core/efw/JobClient.hpp index f7a0c7c2..2f80a1fb 100644 --- a/src/core/efw/JobClient.hpp +++ b/src/core/efw/JobClient.hpp @@ -7,8 +7,11 @@ #include #include #include +#include + +// Xlib.h #defines this, which breaks protobuf headers. +#undef Status -#include "comm/SocketComm.hpp" #include "comm/ExperimentData.hpp" #include "comm/FailControlMessage.pb.h" #include "config/FailConfig.hpp" @@ -24,10 +27,12 @@ namespace fail { */ class JobClient { private: + struct impl; + impl *m_d; // meh. With a managed pointer everything needs to be >= C++11. + std::string m_server; int m_server_port; - struct hostent* m_server_ent; - int m_sockfd; + uint64_t m_server_runid; WallclockTimer m_job_runtime; diff --git a/src/experiments/l4-sys/experimentFI.cc b/src/experiments/l4-sys/experimentFI.cc index 215eefb5..84f049f5 100644 --- a/src/experiments/l4-sys/experimentFI.cc +++ b/src/experiments/l4-sys/experimentFI.cc @@ -1,3 +1,4 @@ +#include #include #include "experiment.hpp" diff --git a/src/experiments/l4-sys/experimentParameter.cc b/src/experiments/l4-sys/experimentParameter.cc index 3a978360..ee9fba21 100644 --- a/src/experiments/l4-sys/experimentParameter.cc +++ b/src/experiments/l4-sys/experimentParameter.cc @@ -1,3 +1,4 @@ +#include #include #include #include