diff options
Diffstat (limited to 'modules/ctrl_tcp/tcp_netstring.c')
-rw-r--r-- | modules/ctrl_tcp/tcp_netstring.c | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/modules/ctrl_tcp/tcp_netstring.c b/modules/ctrl_tcp/tcp_netstring.c new file mode 100644 index 0000000..2ad4742 --- /dev/null +++ b/modules/ctrl_tcp/tcp_netstring.c @@ -0,0 +1,221 @@ +/** + * @file tcp_netstring.c TCP netstring framing + * + * Copyright (C) 2018 46 Labs LLC + */ + +#include <math.h> +#include <string.h> + +#include <re_types.h> +#include <re_fmt.h> +#include <re_mem.h> +#include <re_mbuf.h> +#include <re_tcp.h> +#include <re_net.h> + +#include "tcp_netstring.h" +#include "netstring/netstring.h" + + +#define DEBUG_MODULE "tcp_netstring" +#define DEBUG_LEVEL 5 +#include <re_dbg.h> + + +struct netstring { + struct tcp_conn *tc; + struct tcp_helper *th; + struct mbuf *mb; + netstring_frame_h *frameh; + void *arg; + + uint64_t n_tx; + uint64_t n_rx; +}; + + +/* responsible for adding the netstring header + - assumes that the sent MBUF contains a complete packet + */ +static bool netstring_send_handler(int *err, struct mbuf *mb, void *arg) +{ + struct netstring *netstring = arg; + size_t num_len; + char num_str[32]; + + if (mb->pos < NETSTRING_HEADER_SIZE) { + DEBUG_WARNING("send: not enough space for netstring header\n"); + *err = ENOMEM; + return true; + } + + if (mbuf_get_left(mb) > NETSTRING_MAX_SIZE) { + DEBUG_WARNING("send: buffer exceeds max size\n"); + *err = EMSGSIZE; + return true; + } + + /* Build the netstring. */ + if (mbuf_get_left(mb) == 0) { + mb->buf[0] = '0'; + mb->buf[1] = ':'; + mb->buf[2] = ','; + + mb->end += 3; + + return false; + } + + re_snprintf(num_str, sizeof(num_str), "%zu", mbuf_get_left(mb)); + num_len = strlen(num_str); + + mb->pos = NETSTRING_HEADER_SIZE - (num_len + 1); + mbuf_write_mem(mb, (uint8_t*) num_str, num_len); + mb->pos = NETSTRING_HEADER_SIZE - (num_len + 1); + mb->buf[mb->pos + num_len] = ':'; + mb->buf[mb->end] = ','; + + mb->end += 1; + + ++netstring->n_tx; + + return false; +} + + +static bool netstring_recv_handler(int *errp, struct mbuf *mbx, bool *estab, + void *arg) +{ + struct netstring *netstring = arg; + int err = 0; + size_t pos = 0; + (void)estab; + + /* handle re-assembly */ + if (!netstring->mb) { + netstring->mb = mbuf_alloc(1024); + if (!netstring->mb) { + *errp = ENOMEM; + return true; + } + } + + pos = netstring->mb->pos; + + netstring->mb->pos = netstring->mb->end; + + err = mbuf_write_mem(netstring->mb, mbuf_buf(mbx), + mbuf_get_left(mbx)); + if (err) + goto out; + + netstring->mb->pos = pos; + + /* extract all NETSTRING-frames in the TCP-stream */ + for (;;) { + + size_t len, end; + struct mbuf mb; + + if (mbuf_get_left(netstring->mb) < (3)) + break; + + err = netstring_read((char*)netstring->mb->buf, + netstring->mb->end, + (char**)&mb.buf, &len); + if (err) { + + if (err == NETSTRING_ERROR_TOO_SHORT) { + DEBUG_INFO("receive: %s\n", + netstring_error_str(err)); + } + + else { + DEBUG_WARNING("receive: %s\n", + netstring_error_str(err)); + netstring->mb = mem_deref(netstring->mb); + } + + return false; + } + + pos = netstring->mb->pos; + end = netstring->mb->end; + + netstring->mb->end = pos + len; + + ++netstring->n_rx; + + netstring->frameh(&mb, netstring->arg); + + netstring->mb->pos = pos + netstring_buffer_size(len); + netstring->mb->end = end; + + if (netstring->mb->pos >= netstring->mb->end) { + netstring->mb = mem_deref(netstring->mb); + break; + } + + continue; + } + + out: + if (err) + *errp = err; + + return true; /* always handled */ +} + + +static void destructor(void *arg) +{ + struct netstring *netstring = arg; + + mem_deref(netstring->th); + mem_deref(netstring->tc); + mem_deref(netstring->mb); +} + + +int netstring_insert(struct netstring **netstringp, struct tcp_conn *tc, + int layer, netstring_frame_h *frameh, void *arg) +{ + struct netstring *netstring; + int err; + + if (!netstringp || !tc || !frameh) + return EINVAL; + + netstring = mem_zalloc(sizeof(*netstring), destructor); + if (!netstring) + return ENOMEM; + + netstring->tc = mem_ref(tc); + err = tcp_register_helper(&netstring->th, tc, layer, NULL, + netstring_send_handler, + netstring_recv_handler, netstring); + if (err) + goto out; + + netstring->frameh = frameh; + netstring->arg = arg; + + out: + if (err) + mem_deref(netstring); + else + *netstringp = netstring; + + return err; +} + + +int netstring_debug(struct re_printf *pf, const struct netstring *netstring) +{ + if (!netstring) + return 0; + + return re_hprintf(pf, "tx=%llu, rx=%llu", + netstring->n_tx, netstring->n_rx); +} |