From 3dcc68440064d9eff468c37fdf76f72ae09d0bb5 Mon Sep 17 00:00:00 2001 From: hsc Date: Fri, 26 Oct 2012 16:13:41 +0000 Subject: [PATCH] read(2)/write(2) wrappers for reliable delivery git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1850 8c4709b5-6ec9-48aa-a5cd-a96041d1645a --- src/core/comm/SocketComm.cc | 49 ++++++++++++++++++++++++++++++------ src/core/comm/SocketComm.hpp | 7 +++++- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/src/core/comm/SocketComm.cc b/src/core/comm/SocketComm.cc index cb09d535..bef80d23 100644 --- a/src/core/comm/SocketComm.cc +++ b/src/core/comm/SocketComm.cc @@ -1,4 +1,5 @@ #include +#include #include "SocketComm.hpp" @@ -8,17 +9,17 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg) { #ifdef USE_SIZE_PREFIX int size = htonl(msg.ByteSize()); - if (write(sockfd, &size, sizeof(size)) != sizeof(size)) { + if (safe_write(sockfd, &size, sizeof(size)) == -1) { return false; } std::string buf; msg.SerializeToString(&buf); - if (write(sockfd, buf.c_str(), buf.size()) != (ssize_t) buf.size()) { + if (safe_write(sockfd, buf.c_str(), buf.size()) == -1) { return false; } #else char c = 0; - if (!msg.SerializeToFileDescriptor(sockfd) || write(sockfd, &c, 1) != 1) { + if (!msg.SerializeToFileDescriptor(sockfd) || safe_write(sockfd, &c, 1) == -1) { return false; } #endif @@ -29,14 +30,12 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg) { #ifdef USE_SIZE_PREFIX int size; - // FIXME: read() may need to be called multiple times until all data was read - if (read(sockfd, &size, sizeof(size)) != sizeof(size)) { + if (safe_read(sockfd, &size, sizeof(size)) == -1) { return false; } size = ntohl(size); char *buf = new char[size]; - // FIXME: read() may need to be called multiple times until all data was read - if (read(sockfd, buf, size) != size) { + if (safe_read(sockfd, buf, size) == -1) { delete [] buf; return false; } @@ -49,4 +48,40 @@ bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg) #endif } +ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count) +{ + ssize_t ret; + const char *cbuf = (const char *) buf; + do { + ret = write(fd, cbuf, count); + if (ret == -1) { + if (errno == EINTR) { + continue; + } + return -1; + } + count -= ret; + cbuf += ret; + } while (count); + return cbuf - (const char *)buf; +} + +ssize_t SocketComm::safe_read(int fd, void *buf, size_t count) +{ + ssize_t ret; + char *cbuf = (char *) buf; + do { + ret = read(fd, cbuf, count); + if (ret == -1) { + if (errno == EINTR) { + continue; + } + return -1; + } + count -= ret; + cbuf += ret; + } while (count); + return cbuf - (const char *) buf; +} + } // end-of-namespace: fail diff --git a/src/core/comm/SocketComm.hpp b/src/core/comm/SocketComm.hpp index 097b023d..99459281 100644 --- a/src/core/comm/SocketComm.hpp +++ b/src/core/comm/SocketComm.hpp @@ -1,11 +1,12 @@ /** - * \brief Socket based communictaion wrapper functions. + * \brief Socket based communication wrapper functions. */ #ifndef __SOCKET_COMM_HPP__ #define __SOCKET_COMM_HPP__ #include +#include #include #include #include @@ -34,6 +35,10 @@ public: * \return false if message reception failed */ static bool rcvMsg(int sockfd, google::protobuf::Message& msg); + +private: + static ssize_t safe_write(int fd, const void *buf, size_t count); + static ssize_t safe_read(int fd, void *buf, size_t count); }; } // end-of-namespace: fail