/*
* Copyright (C) 2011-2013 Karlsruhe Institute of Technology
*
* This file is part of Ufo.
*
* This library is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see .
*/
#include
#include
#include
#include "zmq-shim.h"
static void ufo_messenger_interface_init (UfoMessengerIface *iface);
#define UFO_ZMQ_MESSENGER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerPrivate))
G_DEFINE_TYPE_WITH_CODE (UfoZmqMessenger, ufo_zmq_messenger, G_TYPE_OBJECT,
G_IMPLEMENT_INTERFACE (UFO_TYPE_MESSENGER,
ufo_messenger_interface_init))
struct _UfoZmqMessengerPrivate {
gchar *remote_addr;
GMutex *mutex;
gpointer zmq_socket;
gpointer zmq_ctx;
UfoMessengerRole role;
};
/*
* C99 allows flexible length structs that we use to map arbitrary frame lengths
* that are transferred via zmq. Note: Sizes of datatypes should be fixed and
* equal on all plattforms (i.e. don't use a gsize as it has different size on
* x86 & x86_64).
*/
typedef struct _DataFrame {
UfoMessageType type;
guint64 data_size;
// variable length data field
char data[];
} DataFrame;
UfoZmqMessenger *
ufo_zmq_messenger_new (void)
{
UfoZmqMessenger *msger;
msger = UFO_ZMQ_MESSENGER (g_object_new (UFO_TYPE_ZMQ_MESSENGER, NULL));
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
priv->zmq_ctx = zmq_ctx_new ();
return msger;
}
static gboolean
zmq_listen_address_valid (gchar *addr, GError **error)
{
if (!g_str_has_prefix (addr, "tcp://")) {
g_set_error_literal (error, UFO_MESSENGER_ERROR, UFO_MESSENGER_CONNECTION_PROBLEM,
"Address does not use 'tcp://' scheme.");
return FALSE;
}
/* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555
* but not bind to it as it treats it like an interface name (like eth0).
* We have to use IP addresses instead of DNS names.
*/
gchar *host = g_strdup (&addr[6]);
if (!g_ascii_isdigit (host[0]) && host[0] != '*')
g_debug ("Treating address %s as interface device name. Use IP address if supplying a host was intended.", host);
g_free (host);
return TRUE;
}
static void
ufo_zmq_messenger_connect (UfoMessenger *msger,
const gchar *addr,
UfoMessengerRole role,
GError **error)
{
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
g_mutex_lock (priv->mutex);
priv->remote_addr = g_strdup (addr);
priv->role = role;
if (role == UFO_MESSENGER_CLIENT) {
priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REQ);
if (zmq_connect (priv->zmq_socket, priv->remote_addr) == 0) {
g_debug ("Connected to `%s' via socket=%p", priv->remote_addr, priv->zmq_socket);
}
else {
g_set_error (error, UFO_MESSENGER_ERROR, UFO_MESSENGER_CONNECTION_PROBLEM,
"Could not connect to `%s': %s", addr, zmq_strerror (errno));
}
}
else if (role == UFO_MESSENGER_SERVER) {
if (zmq_listen_address_valid (priv->remote_addr, error)) {
priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REP);
gint err = zmq_bind (priv->zmq_socket, priv->remote_addr);
if (err < 0) {
g_set_error (error, UFO_MESSENGER_ERROR, UFO_MESSENGER_CONNECTION_PROBLEM,
"Could not bind to address `%s'", priv->remote_addr);
}
}
}
g_mutex_unlock (priv->mutex);
return;
}
static void
ufo_zmq_messenger_disconnect (UfoMessenger *msger)
{
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
g_mutex_lock (priv->mutex);
if (priv->zmq_socket != NULL) {
zmq_close (priv->zmq_socket);
priv->zmq_socket = NULL;
/* waits for outstanding messages to be flushed */
zmq_term (priv->zmq_ctx);
g_free (priv->remote_addr);
}
g_mutex_unlock (priv->mutex);
return;
}
static UfoMessage *
ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
UfoMessage *request_msg,
GError **error)
{
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
if (request_msg->type == UFO_MESSAGE_ACK && priv->role == UFO_MESSENGER_CLIENT)
g_critical ("Clients can't send ACK messages");
g_mutex_lock (priv->mutex);
UfoMessage *result = NULL;
zmq_msg_t request;
gsize frame_size = sizeof (DataFrame) + request_msg->data_size;
zmq_msg_init_size (&request, frame_size);
DataFrame *frame = (DataFrame *) zmq_msg_data (&request);
frame->data_size = request_msg->data_size;
frame->type = request_msg->type;
//TODO eliminate extra copying
memcpy (frame->data, request_msg->data, request_msg->data_size);
gint err = zmq_msg_send (&request, priv->zmq_socket, 0);
zmq_msg_close (&request);
if (err < 0) {
g_set_error (error, UFO_MESSENGER_ERROR, zmq_errno (),
"Error sending message via %s: %s",
priv->remote_addr, zmq_strerror (zmq_errno ()));
goto finalize;
}
/*
* If this is an ACK message, don't expect a response (send_blocking is then
* most likely being called by the server).
*/
if (request_msg->type == UFO_MESSAGE_ACK)
goto finalize;
/*
* We always need to receive as response as ZMQ requires REQ/REP/REQ/REP/...
* scheme.
*/
zmq_msg_t reply;
zmq_msg_init (&reply);
err = zmq_msg_recv (&reply, priv->zmq_socket, 0);
gint size = zmq_msg_size (&reply);
if (err < 0) {
g_set_error (error, ufo_messenger_error_quark (), zmq_errno(),
"Could not receive from %s: %s ", priv->remote_addr,
zmq_strerror (zmq_errno ()));
goto finalize;
}
DataFrame *resp_frame = (DataFrame *) zmq_msg_data (&reply);
guint64 expected_size = (guint32) (sizeof (DataFrame) + resp_frame->data_size);
if ((guint64) size != expected_size) {
g_set_error (error, ufo_messenger_error_quark(),
UFO_MESSENGER_SIZE_MISSMATCH,
"Received unexpected frame size: %d", size);
goto finalize;
}
UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size);
memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size);
zmq_msg_close (&reply);
result = reply_msg;
finalize:
g_mutex_unlock (priv->mutex);
return result;
}
static UfoMessage *
ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
GError **error)
{
UfoZmqMessengerPrivate *priv;
UfoMessage *result = NULL;
priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
g_assert (priv->role == UFO_MESSENGER_SERVER);
g_mutex_lock (priv->mutex);
zmq_msg_t reply;
zmq_msg_init (&reply);
gint err = zmq_msg_recv (&reply, priv->zmq_socket, 0);
gint size = zmq_msg_size (&reply);
if (err < 0) {
zmq_msg_close (&reply);
g_set_error (error, ufo_messenger_error_quark(), zmq_errno(),
"Could not receive from %s: %s ", priv->remote_addr,
zmq_strerror (zmq_errno ()));
goto finalize;
}
DataFrame *frame = zmq_msg_data (&reply);
guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size);
if ((guint) size != expected_size) {
g_set_error (error, ufo_messenger_error_quark(),
UFO_MESSENGER_SIZE_MISSMATCH,
"Received unexpected frame size: %d, should be: %d",
size, expected_size);
goto finalize;
}
result = ufo_message_new (frame->type, frame->data_size);
memcpy (result->data, frame->data, frame->data_size);
zmq_msg_close (&reply);
finalize:
g_mutex_unlock (priv->mutex);
return result;
}
static void
ufo_messenger_interface_init (UfoMessengerIface *iface)
{
iface->connect = ufo_zmq_messenger_connect;
iface->disconnect = ufo_zmq_messenger_disconnect;
iface->send_blocking = ufo_zmq_messenger_send_blocking;
iface->recv_blocking = ufo_zmq_messenger_recv_blocking;
}
static void
ufo_zmq_messenger_dispose (GObject *object)
{
ufo_zmq_messenger_disconnect (UFO_MESSENGER (object));
}
static void
ufo_zmq_messenger_finalize (GObject *object)
{
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (object);
if (priv->zmq_ctx != NULL) {
zmq_ctx_destroy (priv->zmq_ctx);
priv->zmq_ctx = NULL;
}
g_mutex_free (priv->mutex);
}
static void
ufo_zmq_messenger_class_init (UfoZmqMessengerClass *klass)
{
GObjectClass *oclass = G_OBJECT_CLASS (klass);
oclass->dispose = ufo_zmq_messenger_dispose;
oclass->finalize = ufo_zmq_messenger_finalize;
g_type_class_add_private (klass, sizeof(UfoZmqMessengerPrivate));
}
static void
ufo_zmq_messenger_init (UfoZmqMessenger *msger)
{
UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
priv->zmq_socket = NULL;
priv->zmq_ctx = NULL;
priv->mutex = g_mutex_new ();
}