Move JobClient to Boost::asio as well

I did this mainly so server and client use a common networking API
IMO, using Boost::asio results in nicer name-lookup code.
Since no longer needed, I removed the SocketComm stuff.
The client is still synchronous; I see no benefit in having it
asynchronous.

I'm not super happy with the random backoff by the clients, if they
can't connect to the server. It makes the code really messy, 3 retries
is totally arbitrary, as is the backup windows. I believe launching
the server and clients in the correct order should be handled by a
launch script
Change-Id: Ifea64919fc228aa530c90449686f51bf63eb70e7
This commit is contained in:
Hannes Weisbach
2017-09-22 04:50:15 +02:00
committed by Horst Schirmeier
parent 191219ad06
commit 9272c5cbed
9 changed files with 127 additions and 265 deletions

View File

@ -1,7 +1,5 @@
set(SRCS
ExperimentData.hpp
SocketComm.hpp
SocketComm.cc
)
## Setup desired protobuf descriptions HERE ##

View File

@ -1,117 +0,0 @@
#include <string>
#include <errno.h>
#include <signal.h>
#include <poll.h>
#include "SocketComm.hpp"
namespace fail {
void SocketComm::init()
{
// It's usually much easier to handle the error on write(), than to do
// anything intelligent in a SIGPIPE handler.
signal(SIGPIPE, SIG_IGN);
}
bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
{
int size = htonl(msg.ByteSize());
std::string buf;
if (safe_write(sockfd, &size, sizeof(size)) == -1
|| !msg.SerializeToString(&buf)
|| safe_write(sockfd, buf.c_str(), buf.size()) == -1) {
return false;
}
return true;
}
bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
{
char *buf;
int bufsiz;
if ((buf = getBuf(sockfd, &bufsiz))) {
std::string st(buf, bufsiz);
delete [] buf;
return msg.ParseFromString(st);
}
return false;
}
bool SocketComm::dropMsg(int sockfd)
{
char *buf;
int bufsiz;
if ((buf = getBuf(sockfd, &bufsiz))) {
delete [] buf;
return true;
}
return false;
}
char * SocketComm::getBuf(int sockfd, int *size)
{
char *buf;
if (safe_read(sockfd, size, sizeof(int)) == -1) {
return 0;
}
*size = ntohl(*size);
buf = new char[*size];
if (safe_read(sockfd, buf, *size) == -1) {
delete [] buf;
return 0;
}
return buf;
}
int SocketComm::timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout)
{
struct pollfd pfd = {sockfd, POLLIN, 0};
int ret = poll(&pfd, 1, timeout);
if (ret > 0) {
return accept(sockfd, addr, addrlen);
}
errno = EWOULDBLOCK;
return -1;
}
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
{
ssize_t ret;
const char *cbuf = (const char *) buf;
do {
ret = write(fd, cbuf, count);
if (ret == -1) {
if (errno == EINTR) {
continue;
}
return -1;
}
count -= ret;
cbuf += ret;
} while (count);
return cbuf - (const char *)buf;
}
ssize_t SocketComm::safe_read(int fd, void *buf, size_t count)
{
ssize_t ret;
char *cbuf = (char *) buf;
do {
ret = read(fd, cbuf, count);
if (ret == -1) {
if (errno == EINTR) {
continue;
}
return -1;
} else if (ret == 0) {
// this deliberately deviates from read(2)
return -1;
}
count -= ret;
cbuf += ret;
} while (count);
return cbuf - (const char *) buf;
}
} // end-of-namespace: fail

View File

@ -1,66 +0,0 @@
/**
* \brief Socket based communication wrapper functions.
*/
#ifndef __SOCKET_COMM_HPP__
#define __SOCKET_COMM_HPP__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <iostream>
#include <fstream>
#include <google/protobuf/message.h>
namespace fail {
class SocketComm {
public:
/**
* This allows us to ignore SIGPIPE.
*/
static void init();
/**
* Send Protobuf-generated message
* @param sockfd open socket descriptor to write to
* @param Msg Reference to Protobuf generated message type
* \return false if message sending failed
*/
static bool sendMsg(int sockfd, google::protobuf::Message& msg);
/**
* Receive Protobuf-generated message
* @param sockfd open socket descriptor to read from
* @param Msg Reference to Protobuf generated message type
* \return false if message reception failed
*/
static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
/**
* Receive Protobuf-generated message and just drop it
* @param sockfd open socket descriptor to read from
* \return false if message reception failed
*/
static bool dropMsg(int sockfd);
/**
* An accept() wrapper that times out (using poll(2))
* @param sockfd listening socket descriptor to accept connections from
* @param addr same as accept()'s
* @param addrlen same as accept()'s
* @param timeout timeout in milliseconds (see poll(2))
* \return < 0 on failure, > 0 for a new socket connection
*/
static int timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout);
private:
static char *getBuf(int sockfd, int *size);
static ssize_t safe_write(int fd, const void *buf, size_t count);
static ssize_t safe_read(int fd, void *buf, size_t count);
};
} // end-of-namespace: fail
#endif // __SOCKET_COMM_HPP__

View File

@ -12,7 +12,6 @@
#include <thread>
#include <tuple>
#include "comm/SocketComm.hpp"
#include "JobServer.hpp"
#ifndef __puma
@ -83,28 +82,23 @@ static bool rcvMsg(tcp::socket &socket, google::protobuf::Message &msg,
boost::system::error_code ec;
int size;
size_t len = async_read(socket, buffer(&size, sizeof(size)), yield[ec]);
if (len != sizeof(size)) {
if (ec || len != sizeof(size)) {
std::cerr << ec.message() << std::endl;
std::cerr << "Read " << len << " instead of " << sizeof(size)
<< " bytes from socket" << std::endl;
return false;
}
if (ec) {
std::cerr << ec.message() << std::endl;
return false;
}
const size_t msg_size = ntohl(size);
std::vector<char> buf(msg_size);
len = async_read(socket, buffer(buf), yield[ec]);
if (len != msg_size) {
if (ec || len != sizeof(size)) {
std::cerr << ec.message() << std::endl;
std::cerr << "Read " << len << " instead of " << msg_size
<< " bytes from socket" << std::endl;
return false;
}
if (ec) {
std::cerr << ec.message() << std::endl;
return false;
}
return drop ? true : msg.ParseFromArray(buf.data(), buf.size());
}
@ -167,7 +161,6 @@ JobServer::JobServer(const unsigned short port)
: m_d(std::make_shared<impl>()), m_port(port), m_finish(false),
m_threadtimeout(0), m_undoneJobs(SERVER_OUT_QUEUE_SIZE)
{
SocketComm::init();
m_runid = std::time(0);
#ifndef __puma
m_serverThread = new boost::thread(&JobServer::run, this);

View File

@ -7,7 +7,6 @@
#include "config/FailConfig.hpp"
#include "comm/ExperimentData.hpp"
#include "comm/FailControlMessage.pb.h"
#include "comm/SocketComm.hpp"
#include <list>
#include <ctime>

View File

@ -1,28 +1,34 @@
#include <chrono>
#include <random>
#include <string>
#include <thread>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include "JobClient.hpp"
#include "comm/SocketComm.hpp"
using namespace std;
using namespace boost::asio;
namespace fail {
struct JobClient::impl {
io_service ios;
ip::tcp::socket socket;
impl() : socket(ios) {}
};
JobClient::JobClient(const std::string& server, int port)
: m_server(server), m_server_port(port),
: m_d(new impl), m_server(server), m_server_port(port),
m_server_runid(0), // server accepts this for virgin clients
m_job_runtime_total(0),
m_job_throughput(CLIENT_JOB_INITIAL), // will be corrected after measurement
m_job_total(0),
m_connect_failed(false)
{
SocketComm::init();
m_server_ent = gethostbyname(m_server.c_str());
cout << "JobServer: " << m_server.c_str() << endl;
if (m_server_ent == NULL) {
herror("[Client@gethostbyname()]");
// TODO: Log-level?
exit(1);
}
cout << "JobServer: " << server << ":" << port << endl;
srand(time(NULL)); // needed for random backoff (see connectToServer)
}
@ -30,6 +36,7 @@ JobClient::~JobClient()
{
// Send back completed jobs to the server
sendResultsToServer();
delete m_d;
}
bool JobClient::connectToServer()
@ -39,53 +46,41 @@ bool JobClient::connectToServer()
return false;
}
int retries = CLIENT_RETRY_COUNT;
while (true) {
// Connect to server
struct sockaddr_in serv_addr;
m_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd < 0) {
perror("[Client@socket()]");
// TODO: Log-level?
exit(0);
}
boost::asio::ip::tcp::resolver resolver(m_d->ios);
boost::asio::ip::tcp::resolver::query query(
m_server, std::to_string(m_server_port));
/* Enable address reuse */
int on = 1;
setsockopt( m_sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
memcpy(&serv_addr.sin_addr.s_addr, m_server_ent->h_addr, m_server_ent->h_length);
serv_addr.sin_port = htons(m_server_port);
if (connect(m_sockfd, (sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("[Client@connect()]");
close(m_sockfd);
// TODO: Log-level?
if (retries > 0) {
// Wait CLIENT_RAND_BACKOFF_TSTART to RAND_BACKOFF_TEND seconds:
int delay = rand() % (CLIENT_RAND_BACKOFF_TEND-CLIENT_RAND_BACKOFF_TSTART) + CLIENT_RAND_BACKOFF_TSTART;
cout << "[Client] Retrying to connect to server in ~" << delay << "s..." << endl;
// TODO: Log-level?
sleep(delay);
usleep(rand() % 1000000);
--retries;
continue;
}
cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT << " times); "
<< "I'll give it up!" << endl;
// TODO: Log-level?
m_connect_failed = true;
return false; // finally: unable to connect, give it up :-(
}
break; // connected! :-)
}
cout << "[Client] Connection established!" << endl;
// TODO: Log-level?
// random engine for backoff.
std::mt19937_64 engine(time(NULL));
for (int retries = CLIENT_RETRY_COUNT; retries > 0; --retries) {
for (ip::tcp::resolver::iterator end,
addrs = resolver.resolve(query);
addrs != end; ++addrs) {
boost::system::error_code error;
m_d->socket.connect(addrs->endpoint(), error);
if (!error) {
cout << "[Client] Connection established!"
<< endl;
return true;
}
perror("[Client@connect()]");
m_d->socket.close();
}
std::uniform_real_distribution<> distribution(
CLIENT_RAND_BACKOFF_TSTART, CLIENT_RAND_BACKOFF_TEND);
const auto delay = std::chrono::duration<double>(distribution(engine));
cout << "[Client] Retrying to connect to server in ~" << delay.count()
<< "s..." << endl;
std::this_thread::sleep_for(delay);
}
cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT
<< " times); I'll give it up!" << endl;
m_connect_failed = true;
return false;
}
bool JobClient::getParam(ExperimentData& exp)
{
@ -109,6 +104,59 @@ bool JobClient::getParam(ExperimentData& exp)
}
}
template <typename Socket>
bool sendMsg(Socket &s, google::protobuf::Message &msg)
{
int size = htonl(msg.ByteSize());
const auto msg_size = msg.ByteSize() + sizeof(size);
std::string buf;
if (!msg.SerializeToString(&buf))
return false;
boost::array<const_buffer, 2> bufs{buffer(&size, sizeof(size)),
buffer(buf)};
boost::system::error_code ec;
const auto len = boost::asio::write(s, bufs, ec);
if (ec || len != msg_size) {
std::cerr << ec.message() << std::endl;
std::cerr << "Sent " << len << " instead of " << msg_size
<< " bytes from socket" << std::endl;
return false;
}
return true;
}
template <typename Socket>
bool rcvMsg(Socket &s, google::protobuf::Message &msg)
{
int size;
std::size_t len;
boost::system::error_code ec;
len = boost::asio::read(s, buffer(&size, sizeof(size)), ec);
if (ec || len != sizeof(size)) {
std::cerr << ec.message() << std::endl;
std::cerr << "Read " << len << " instead of " << sizeof(size)
<< " bytes from socket" << std::endl;
return false;
}
const auto msglen = ntohl(size);
std::vector<char> buf(msglen);
len = boost::asio::read(s, buffer(buf), ec);
if (ec || len != sizeof(size)) {
std::cerr << ec.message() << std::endl;
std::cerr << "Read " << len << " instead of " << msglen
<< " bytes from socket" << std::endl;
return false;
}
return msg.ParseFromArray(buf.data(), buf.size());
}
FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp)
{
FailControlMessage ctrlmsg;
@ -127,15 +175,15 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
ctrlmsg.set_run_id(m_server_runid);
ctrlmsg.set_job_size(m_job_throughput); //Request for a number of jobs
if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) {
if (!sendMsg(m_d->socket, ctrlmsg)) {
m_d->socket.close();
// Failed to send message? Retry.
close(m_sockfd);
return FailControlMessage::COME_AGAIN;
}
ctrlmsg.Clear();
if (!SocketComm::rcvMsg(m_sockfd, ctrlmsg)) {
if (!rcvMsg(m_d->socket, ctrlmsg)) {
m_d->socket.close();
// Failed to receive message? Retry.
close(m_sockfd);
return FailControlMessage::COME_AGAIN;
}
@ -148,7 +196,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
for (i = 0; i < ctrlmsg.job_size(); i++) {
ExperimentData* temp_exp = new ExperimentData(exp.getMessage().New());
if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) {
if (!rcvMsg(m_d->socket, temp_exp->getMessage())) {
// looks like we won't receive more jobs now, cleanup
delete &temp_exp->getMessage();
delete temp_exp;
@ -170,7 +218,7 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp
default:
break;
}
close(m_sockfd);
m_d->socket.close();
//start time measurement for throughput calculation
m_job_runtime.startTimer();
@ -266,14 +314,14 @@ bool JobClient::sendResultsToServer()
cout << "]";
// TODO: Log-level?
if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) {
close(m_sockfd);
if (!sendMsg(m_d->socket, ctrlmsg)) {
m_d->socket.close();
return false;
}
for (i = 0; i < ctrlmsg.job_size(); i++) {
if (!SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage())) {
close(m_sockfd);
if (!sendMsg(m_d->socket, m_results.front()->getMessage())) {
m_d->socket.close();
return false;
}
delete &m_results.front()->getMessage();
@ -282,7 +330,7 @@ bool JobClient::sendResultsToServer()
}
// Close connection.
close(m_sockfd);
m_d->socket.close();
return true;
}
return true;

View File

@ -7,8 +7,11 @@
#include <unistd.h>
#include <iostream>
#include <deque>
#include <memory>
// Xlib.h #defines this, which breaks protobuf headers.
#undef Status
#include "comm/SocketComm.hpp"
#include "comm/ExperimentData.hpp"
#include "comm/FailControlMessage.pb.h"
#include "config/FailConfig.hpp"
@ -24,10 +27,12 @@ namespace fail {
*/
class JobClient {
private:
struct impl;
impl *m_d; // meh. With a managed pointer everything needs to be >= C++11.
std::string m_server;
int m_server_port;
struct hostent* m_server_ent;
int m_sockfd;
uint64_t m_server_runid;
WallclockTimer m_job_runtime;

View File

@ -1,3 +1,4 @@
#include <fstream>
#include <iostream>
#include "experiment.hpp"

View File

@ -1,3 +1,4 @@
#include <fstream>
#include <iostream>
#include <list>
#include <stdlib.h>