read(2)/write(2) wrappers for reliable delivery
git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1850 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
This commit is contained in:
@ -1,4 +1,5 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include "SocketComm.hpp"
|
#include "SocketComm.hpp"
|
||||||
|
|
||||||
@ -8,17 +9,17 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
|
|||||||
{
|
{
|
||||||
#ifdef USE_SIZE_PREFIX
|
#ifdef USE_SIZE_PREFIX
|
||||||
int size = htonl(msg.ByteSize());
|
int size = htonl(msg.ByteSize());
|
||||||
if (write(sockfd, &size, sizeof(size)) != sizeof(size)) {
|
if (safe_write(sockfd, &size, sizeof(size)) == -1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
std::string buf;
|
std::string buf;
|
||||||
msg.SerializeToString(&buf);
|
msg.SerializeToString(&buf);
|
||||||
if (write(sockfd, buf.c_str(), buf.size()) != (ssize_t) buf.size()) {
|
if (safe_write(sockfd, buf.c_str(), buf.size()) == -1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
char c = 0;
|
char c = 0;
|
||||||
if (!msg.SerializeToFileDescriptor(sockfd) || write(sockfd, &c, 1) != 1) {
|
if (!msg.SerializeToFileDescriptor(sockfd) || safe_write(sockfd, &c, 1) == -1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -29,14 +30,12 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
|
|||||||
{
|
{
|
||||||
#ifdef USE_SIZE_PREFIX
|
#ifdef USE_SIZE_PREFIX
|
||||||
int size;
|
int size;
|
||||||
// FIXME: read() may need to be called multiple times until all data was read
|
if (safe_read(sockfd, &size, sizeof(size)) == -1) {
|
||||||
if (read(sockfd, &size, sizeof(size)) != sizeof(size)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
size = ntohl(size);
|
size = ntohl(size);
|
||||||
char *buf = new char[size];
|
char *buf = new char[size];
|
||||||
// FIXME: read() may need to be called multiple times until all data was read
|
if (safe_read(sockfd, buf, size) == -1) {
|
||||||
if (read(sockfd, buf, size) != size) {
|
|
||||||
delete [] buf;
|
delete [] buf;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -49,4 +48,40 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
|
||||||
|
{
|
||||||
|
ssize_t ret;
|
||||||
|
const char *cbuf = (const char *) buf;
|
||||||
|
do {
|
||||||
|
ret = write(fd, cbuf, count);
|
||||||
|
if (ret == -1) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
count -= ret;
|
||||||
|
cbuf += ret;
|
||||||
|
} while (count);
|
||||||
|
return cbuf - (const char *)buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t SocketComm::safe_read(int fd, void *buf, size_t count)
|
||||||
|
{
|
||||||
|
ssize_t ret;
|
||||||
|
char *cbuf = (char *) buf;
|
||||||
|
do {
|
||||||
|
ret = read(fd, cbuf, count);
|
||||||
|
if (ret == -1) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
count -= ret;
|
||||||
|
cbuf += ret;
|
||||||
|
} while (count);
|
||||||
|
return cbuf - (const char *) buf;
|
||||||
|
}
|
||||||
|
|
||||||
} // end-of-namespace: fail
|
} // end-of-namespace: fail
|
||||||
|
|||||||
@ -1,11 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* \brief Socket based communictaion wrapper functions.
|
* \brief Socket based communication wrapper functions.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef __SOCKET_COMM_HPP__
|
#ifndef __SOCKET_COMM_HPP__
|
||||||
#define __SOCKET_COMM_HPP__
|
#define __SOCKET_COMM_HPP__
|
||||||
|
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <unistd.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
@ -34,6 +35,10 @@ public:
|
|||||||
* \return false if message reception failed
|
* \return false if message reception failed
|
||||||
*/
|
*/
|
||||||
static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
|
static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
|
||||||
|
|
||||||
|
private:
|
||||||
|
static ssize_t safe_write(int fd, const void *buf, size_t count);
|
||||||
|
static ssize_t safe_read(int fd, void *buf, size_t count);
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end-of-namespace: fail
|
} // end-of-namespace: fail
|
||||||
|
|||||||
Reference in New Issue
Block a user