Merge branch 'use_size_prefix-REMOVED'
This commit is contained in:
@ -15,7 +15,6 @@ void SocketComm::init()
|
|||||||
|
|
||||||
bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
|
bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
|
||||||
{
|
{
|
||||||
#ifdef USE_SIZE_PREFIX
|
|
||||||
int size = htonl(msg.ByteSize());
|
int size = htonl(msg.ByteSize());
|
||||||
std::string buf;
|
std::string buf;
|
||||||
if (safe_write(sockfd, &size, sizeof(size)) == -1
|
if (safe_write(sockfd, &size, sizeof(size)) == -1
|
||||||
@ -23,35 +22,45 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
|
|||||||
|| safe_write(sockfd, buf.c_str(), buf.size()) == -1) {
|
|| safe_write(sockfd, buf.c_str(), buf.size()) == -1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
char c = 0;
|
|
||||||
if (!msg.SerializeToFileDescriptor(sockfd)
|
|
||||||
|| safe_write(sockfd, &c, 1) == -1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
|
bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
|
||||||
{
|
{
|
||||||
#ifdef USE_SIZE_PREFIX
|
char *buf;
|
||||||
int size;
|
int bufsiz;
|
||||||
if (safe_read(sockfd, &size, sizeof(size)) == -1) {
|
if ((buf = getBuf(sockfd, &bufsiz))) {
|
||||||
return false;
|
std::string st(buf, bufsiz);
|
||||||
}
|
|
||||||
size = ntohl(size);
|
|
||||||
char *buf = new char[size];
|
|
||||||
if (safe_read(sockfd, buf, size) == -1) {
|
|
||||||
delete [] buf;
|
delete [] buf;
|
||||||
return false;
|
return msg.ParseFromString(st);
|
||||||
}
|
}
|
||||||
std::string st(buf, size);
|
return false;
|
||||||
delete [] buf;
|
}
|
||||||
return msg.ParseFromString(st);
|
|
||||||
#else
|
bool SocketComm::dropMsg(int sockfd)
|
||||||
return msg.ParseFromFileDescriptor(sockfd);
|
{
|
||||||
#endif
|
char *buf;
|
||||||
|
int bufsiz;
|
||||||
|
if ((buf = getBuf(sockfd, &bufsiz))) {
|
||||||
|
delete [] buf;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char * SocketComm::getBuf(int sockfd, int *size)
|
||||||
|
{
|
||||||
|
char *buf;
|
||||||
|
if (safe_read(sockfd, size, sizeof(int)) == -1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
*size = ntohl(*size);
|
||||||
|
buf = new char[*size];
|
||||||
|
if (safe_read(sockfd, buf, *size) == -1) {
|
||||||
|
delete [] buf;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
|
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
|
||||||
|
|||||||
@ -15,8 +15,6 @@
|
|||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <google/protobuf/message.h>
|
#include <google/protobuf/message.h>
|
||||||
|
|
||||||
#define USE_SIZE_PREFIX
|
|
||||||
|
|
||||||
namespace fail {
|
namespace fail {
|
||||||
|
|
||||||
class SocketComm {
|
class SocketComm {
|
||||||
@ -40,7 +38,15 @@ public:
|
|||||||
*/
|
*/
|
||||||
static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
|
static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive Protobuf-generated message and just drop it
|
||||||
|
* @param sockfd open socket descriptor to read from
|
||||||
|
* \return false if message reception failed
|
||||||
|
*/
|
||||||
|
static bool dropMsg(int sockfd);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
static char * getBuf(int sockfd, int *size);
|
||||||
static ssize_t safe_write(int fd, const void *buf, size_t count);
|
static ssize_t safe_write(int fd, const void *buf, size_t count);
|
||||||
static ssize_t safe_read(int fd, void *buf, size_t count);
|
static ssize_t safe_read(int fd, void *buf, size_t count);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -357,7 +357,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
|
|||||||
cout << "[Server] Received another result for workload id ["
|
cout << "[Server] Received another result for workload id ["
|
||||||
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl;
|
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl;
|
||||||
|
|
||||||
// TODO: Any need for error-handling here?
|
SocketComm::dropMsg(minion.getSocketDescriptor());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user