summaryrefslogtreecommitdiff
path: root/msg.c
diff options
context:
space:
mode:
authorNeil Brown <neilb@suse.de>2008-07-12 20:27:40 +1000
committerNeil Brown <neilb@suse.de>2008-07-12 20:27:40 +1000
commitbfa44e2e7afb88a9f2d6083f8ff31c2d50cc78dc (patch)
tree4a4e140f3e3e4611320e872bc2bb88b622dc83ce /msg.c
parent4d43913ce07ffbcb1ae8e7bdd06a4bd67cd07791 (diff)
Revise message passing code.
More here
Diffstat (limited to 'msg.c')
-rw-r--r--msg.c231
1 files changed, 90 insertions, 141 deletions
diff --git a/msg.c b/msg.c
index ac40ee5a..123c0e55 100644
--- a/msg.c
+++ b/msg.c
@@ -29,160 +29,113 @@
#include <sys/socket.h>
#include <sys/un.h>
#include "mdadm.h"
+#include "mdmon.h"
-enum tx_rx_state {
- TX_RX_START,
- TX_RX_SEQ,
- TX_RX_NUM_BYTES,
- TX_RX_BUF,
- TX_RX_END,
- TX_RX_SUCCESS,
- TX_RX_ERR,
-};
-
-const int start_magic = 0x5a5aa5a5;
-const int end_magic = 0xa5a55a5a;
-
-#define txrx(fd, buf, size, flags) (recv_send ? \
- recv(fd, buf, size, flags) : \
- send(fd, buf, size, flags))
-
-/* non-blocking send/receive with n second timeout */
-static enum tx_rx_state
-tx_rx_message(int fd, struct md_message *msg, int recv_send, int tmo)
+static const __u32 start_magic = 0x5a5aa5a5;
+static const __u32 end_magic = 0xa5a55a5a;
+
+static int send_buf(int fd, const void* buf, int len, int tmo)
{
- int d = recv_send ? 0 : start_magic;
- int flags = recv_send ? 0 : MSG_NOSIGNAL;
- enum tx_rx_state state = TX_RX_START;
- void *buf = &d;
- size_t size = sizeof(d);
- off_t n = 0;
- int rc;
- int again;
-
- do {
- again = 0;
- rc = txrx(fd, buf + n, size - n, flags);
- if (rc <= 0) { /* error */
- if (rc == -1 && errno == EAGAIN)
- again = 1;
- else
- state = TX_RX_ERR;
- } else if (rc + n == size) /* done */
- switch (state) {
- case TX_RX_START:
- if (recv_send && d != start_magic)
- state = TX_RX_ERR;
- else {
- state = TX_RX_SEQ;
- buf = &msg->seq;
- size = sizeof(msg->seq);
- n = 0;
- }
- break;
- case TX_RX_SEQ:
- state = TX_RX_NUM_BYTES;
- buf = &msg->num_bytes;
- size = sizeof(msg->num_bytes);
- n = 0;
- break;
- case TX_RX_NUM_BYTES:
- if (msg->num_bytes >
- 1024*1024)
- state = TX_RX_ERR;
- else if (recv_send && msg->num_bytes) {
- msg->buf = malloc(msg->num_bytes);
- if (!msg->buf)
- state = TX_RX_ERR;
- else {
- state = TX_RX_BUF;
- buf = msg->buf;
- size = msg->num_bytes;
- n = 0;
- }
- } else if (!recv_send && msg->num_bytes) {
- state = TX_RX_BUF;
- buf = msg->buf;
- size = msg->num_bytes;
- n = 0;
- } else {
- d = recv_send ? 0 : end_magic;
- state = TX_RX_END;
- buf = &d;
- size = sizeof(d);
- n = 0;
- }
- break;
- case TX_RX_BUF:
- d = recv_send ? 0 : end_magic;
- state = TX_RX_END;
- buf = &d;
- size = sizeof(d);
- n = 0;
- break;
- case TX_RX_END:
- if (recv_send && d != end_magic)
- state = TX_RX_ERR;
- else
- state = TX_RX_SUCCESS;
- break;
- case TX_RX_ERR:
- case TX_RX_SUCCESS:
- break;
- }
- else /* continue */
- n += rc;
-
- if (again) {
- fd_set set;
- struct timeval timeout = { tmo, 0 };
- struct timeval *ptmo = tmo ? &timeout : NULL;
-
- FD_ZERO(&set);
- FD_SET(fd, &set);
-
- if (recv_send)
- rc = select(fd + 1, &set, NULL, NULL, ptmo);
- else
- rc = select(fd + 1, NULL, &set, NULL, ptmo);
-
- if (rc <= 0)
- state = TX_RX_ERR;
- }
- } while (state < TX_RX_SUCCESS);
+ fd_set set;
+ int rv;
+ struct timeval timeout = {tmo, 0};
+ struct timeval *ptmo = tmo ? &timeout : NULL;
+
+ while (len) {
+ FD_ZERO(&set);
+ FD_SET(fd, &set);
+ rv = select(fd+1, NULL, &set, NULL, ptmo);
+ if (rv <= 0)
+ return -1;
+ rv = write(fd, buf, len);
+ if (rv <= 0)
+ return -1;
+ len -= rv;
+ buf += rv;
+ }
+ return 0;
+}
- return state;
+static int recv_buf(int fd, void* buf, int len, int tmo)
+{
+ fd_set set;
+ int rv;
+ struct timeval timeout = {tmo, 0};
+ struct timeval *ptmo = tmo ? &timeout : NULL;
+
+ while (len) {
+ FD_ZERO(&set);
+ FD_SET(fd, &set);
+ rv = select(fd+1, &set, NULL, NULL, ptmo);
+ if (rv <= 0)
+ return -1;
+ rv = read(fd, buf, len);
+ if (rv <= 0)
+ return -1;
+ len -= rv;
+ buf += rv;
+ }
+ return 0;
}
-int receive_message(int fd, struct md_message *msg, int tmo)
+int send_message(int fd, struct metadata_update *msg, int tmo)
{
- if (tx_rx_message(fd, msg, 1, tmo) == TX_RX_SUCCESS)
- return 0;
- else
- return -1;
+ __u32 len = msg->len;
+ int rv;
+
+ rv = send_buf(fd, &start_magic, 4, tmo);
+ rv = rv ?: send_buf(fd, &len, 4, tmo);
+ if (len)
+ rv = rv ?: send_buf(fd, msg->buf, msg->len, tmo);
+ rv = send_buf(fd, &end_magic, 4, tmo);
+
+ return rv;
}
-int send_message(int fd, struct md_message *msg, int tmo)
+int receive_message(int fd, struct metadata_update *msg, int tmo)
{
- if (tx_rx_message(fd, msg, 0, tmo) == TX_RX_SUCCESS)
- return 0;
- else
+ __u32 magic;
+ __u32 len;
+ int rv;
+
+ rv = recv_buf(fd, &magic, 4, tmo);
+ if (rv < 0 || magic != start_magic)
+ return -1;
+ rv = recv_buf(fd, &len, 4, tmo);
+ if (rv < 0 || len > MSG_MAX_LEN)
return -1;
+ if (len) {
+ msg->buf = malloc(len);
+ if (msg->buf == NULL)
+ return -1;
+ rv = recv_buf(fd, msg->buf, len, tmo);
+ if (rv < 0) {
+ free(msg->buf);
+ return -1;
+ }
+ } else
+ msg->buf = NULL;
+ rv = recv_buf(fd, &magic, 4, tmo);
+ if (rv < 0 || magic != end_magic) {
+ free(msg->buf);
+ return -1;
+ }
+ msg->len = len;
+ return 0;
}
-int ack(int fd, int seq, int tmo)
+int ack(int fd, int tmo)
{
- struct md_message msg = { .seq = seq, .num_bytes = 0 };
+ struct metadata_update msg = { .len = 0 };
return send_message(fd, &msg, tmo);
}
-int nack(int fd, int err, int tmo)
+int wait_reply(int fd, int tmo)
{
- struct md_message msg = { .seq = err, .num_bytes = 0 };
-
- return send_message(fd, &msg, tmo);
+ struct metadata_update msg;
+ return receive_message(fd, &msg, tmo);
}
int connect_monitor(char *devname)
@@ -214,21 +167,17 @@ int connect_monitor(char *devname)
int ping_monitor(char *devname)
{
int sfd = connect_monitor(devname);
- struct md_message msg;
int err = 0;
if (sfd < 0)
return sfd;
/* try to ping existing socket */
- if (ack(sfd, 0, 0) != 0)
+ if (ack(sfd, 0) != 0)
err = -1;
/* check the reply */
- if (!err && receive_message(sfd, &msg, 0) != 0)
- err = -1;
-
- if (msg.seq != 0)
+ if (!err && wait_reply(sfd, 0) != 0)
err = -1;
close(sfd);