summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimo Dörr <timo@latecrew.de>2013-10-04 17:18:41 +0200
committerTimo Dörr <timo@latecrew.de>2013-10-23 14:08:07 +0200
commit921d9015736205bb03a02985f6046d931cdd26c9 (patch)
treeeeb00d06512f51c2140c5d342d40eb7da4ef19e2
parent89fc1edf44ca48086c193d5ad64d6a0f7ee12660 (diff)
Abstract away communication into interface
* Create generic interface ufo_messenger_iface * Move all zmq_* stuff into ufo_zmq_messenger that implements that interface * also fixes a bug that happened when closing the zmq_socket from another thread by introducing a special TERMINATE message that will kill ufo-daemon
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/test-remote-node.c21
-rw-r--r--tests/test-suite.c3
-rw-r--r--tests/test-suite.h1
-rw-r--r--tests/test-zmq-messenger.c110
-rw-r--r--tools/ufod.c1
-rw-r--r--ufo/CMakeLists.txt4
-rw-r--r--ufo/ufo-daemon.c270
-rw-r--r--ufo/ufo-messenger-iface.c90
-rw-r--r--ufo/ufo-messenger-iface.h137
-rw-r--r--ufo/ufo-remote-node.c285
-rw-r--r--ufo/ufo-remote-node.h46
-rw-r--r--ufo/ufo-zmq-messenger.c305
-rw-r--r--ufo/ufo-zmq-messenger.h70
-rw-r--r--ufo/ufo.h2
15 files changed, 957 insertions, 389 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 254bfa7..22a9813 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -7,6 +7,7 @@ set(TEST_SRCS
test-graph.c
test-profiler.c
test-remote-node.c
+ test-zmq-messenger.c
)
set(SUITE_BIN "test-suite")
diff --git a/tests/test-remote-node.c b/tests/test-remote-node.c
index 59bd485..8642988 100644
--- a/tests/test-remote-node.c
+++ b/tests/test-remote-node.c
@@ -19,7 +19,6 @@
#include <string.h>
#include <ufo.h>
-#include <zmq.h>
#include "test-suite.h"
typedef struct {
@@ -44,8 +43,8 @@ static void
teardown (Fixture *fixture, gconstpointer data)
{
g_object_unref (fixture->remote_node);
-
ufo_daemon_stop (fixture->daemon);
+
g_object_unref (fixture->daemon);
}
@@ -58,9 +57,27 @@ test_remote_node_get_num_cpus (Fixture *fixture,
g_assert (n_gpus > 0);
}
+static void
+test_remote_node_get_structure (Fixture *fixture,
+ gconstpointer unused)
+{
+ UfoTaskMode mode;
+ UfoInputParam *in_params;
+ guint n_inputs;
+ ufo_remote_node_get_structure (fixture->remote_node, &n_inputs, &in_params, &mode);
+ g_message ("received n_inputs == %d", n_inputs);
+ g_assert (n_inputs == 1);
+ g_message ("received n_dims == %d", in_params->n_dims);
+ g_assert (in_params->n_dims == 2);
+
+}
+
void
test_add_remote_node (void)
{
+ g_test_add ("/remotenode/get_structure",
+ Fixture, NULL,
+ setup, test_remote_node_get_structure, teardown);
g_test_add ("/remotenode/get_num_cpus",
Fixture, NULL,
setup, test_remote_node_get_num_cpus, teardown);
diff --git a/tests/test-suite.c b/tests/test-suite.c
index e6c813a..76a5eab 100644
--- a/tests/test-suite.c
+++ b/tests/test-suite.c
@@ -26,6 +26,7 @@ ignore_log (const gchar *domain,
const gchar *message,
gpointer data)
{
+ // g_print ("%s\n",message);
}
int main(int argc, char *argv[])
@@ -41,8 +42,8 @@ int main(int argc, char *argv[])
test_add_config ();
test_add_graph ();
test_add_profiler ();
+ test_add_zmq_messenger ();
test_add_remote_node ();
-
g_test_run();
return 0;
diff --git a/tests/test-suite.h b/tests/test-suite.h
index cc54ddb..afe699a 100644
--- a/tests/test-suite.h
+++ b/tests/test-suite.h
@@ -6,5 +6,6 @@ void test_add_config (void);
void test_add_graph (void);
void test_add_profiler (void);
void test_add_remote_node (void);
+void test_add_zmq_messenger (void);
#endif
diff --git a/tests/test-zmq-messenger.c b/tests/test-zmq-messenger.c
new file mode 100644
index 0000000..84576f7
--- /dev/null
+++ b/tests/test-zmq-messenger.c
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <ufo/ufo.h>
+#include "test-suite.h"
+
+typedef struct {
+ gchar *addr;
+} Fixture;
+
+static void
+setup (Fixture *fixture, gconstpointer data)
+{
+ fixture->addr = g_strdup ("tcp://127.0.0.1:5555");
+}
+
+static void
+teardown (Fixture *fixture, gconstpointer data)
+{
+ g_free (fixture->addr);
+}
+
+static void send_num_devices_request (gpointer unused)
+{
+ UfoMessenger *msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ gchar *addr = g_strdup ("tcp://127.0.0.1:5555");
+ ufo_messenger_connect (msger, addr, UFO_MESSENGER_CLIENT);
+
+ guint x = 0;
+ while (x++ < 10) {
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_GET_NUM_DEVICES, 0);
+ UfoMessage *response;
+
+ response = ufo_messenger_send_blocking (msger, request, NULL);
+
+ guint16 num_devices = *(guint16 *) response->data;
+ g_assert (num_devices == x);
+
+ ufo_message_free (request);
+ ufo_message_free (response);
+ }
+ ufo_zmq_messenger_disconnect (msger);
+ g_object_unref (msger);
+}
+
+static void handle_num_devices (gpointer unused)
+{
+ UfoMessenger *msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ gchar *addr = g_strdup ("tcp://127.0.0.1:5555");
+ ufo_messenger_connect (msger, addr, UFO_MESSENGER_SERVER);
+
+ guint16 x = 0;
+ GError *err = NULL;
+ while (x++ < 10) {
+ UfoMessage *msg = ufo_messenger_recv_blocking (UFO_MESSENGER (msger), &err);
+ if (err != NULL)
+ g_critical ("%s", err->message);
+
+ UfoMessage *resp;
+ switch (msg->type) {
+ case UFO_MESSAGE_GET_NUM_DEVICES:
+ resp = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
+ *(guint16 *)resp->data = x;
+ ufo_zmq_messenger_send_blocking (msger, resp, NULL);
+ ufo_message_free (resp);
+ break;
+ default:
+ g_critical ("Unexpected message type: %d", msg->type);
+ break;
+ }
+ ufo_message_free (msg);
+ };
+
+ ufo_zmq_messenger_disconnect (msger);
+ g_object_unref (msger);
+}
+
+static void test_zmq_messenger (Fixture *fixture, gconstpointer unused)
+{
+ GThread *server = g_thread_create ((GThreadFunc) handle_num_devices, NULL, TRUE, NULL);
+ GThread *client = g_thread_create ((GThreadFunc) send_num_devices_request, NULL, TRUE, NULL);
+
+ g_thread_join (client);
+ g_thread_join (server);
+}
+
+
+void
+test_add_zmq_messenger (void)
+{
+ g_test_add ("/zmq_messenger/test_messenger",
+ Fixture, NULL,
+ setup, test_zmq_messenger, teardown);
+}
diff --git a/tools/ufod.c b/tools/ufod.c
index 1de2dd1..d7f29b7 100644
--- a/tools/ufod.c
+++ b/tools/ufod.c
@@ -24,7 +24,6 @@
#else
#include <CL/cl.h>
#endif
-#include <zmq.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
diff --git a/ufo/CMakeLists.txt b/ufo/CMakeLists.txt
index 30b246d..1124687 100644
--- a/ufo/CMakeLists.txt
+++ b/ufo/CMakeLists.txt
@@ -15,6 +15,7 @@ set(ufocore_SRCS
ufo-graph.c
ufo-group.c
ufo-input-task.c
+ ufo-messenger-iface.c
ufo-node.c
ufo-output-task.c
ufo-plugin-manager.c
@@ -27,6 +28,7 @@ set(ufocore_SRCS
ufo-task-graph.c
ufo-task-node.c
ufo-basic-ops.c
+ ufo-zmq-messenger.c
)
#}}}
@@ -45,6 +47,7 @@ set(ufocore_HDRS
ufo-graph.h
ufo-group.h
ufo-input-task.h
+ ufo-messenger-iface.h
ufo-node.h
ufo-output-task.h
ufo-plugin-manager.h
@@ -57,6 +60,7 @@ set(ufocore_HDRS
ufo-task-graph.h
ufo-task-node.h
ufo-basic-ops.h
+ ufo-zmq-messenger.h
)
#}}}
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index 3d311bb..0e46c6f 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -33,6 +33,8 @@
#include <ufo/ufo-plugin-manager.h>
#include <ufo/ufo-scheduler.h>
#include <ufo/ufo-task-graph.h>
+#include <ufo/ufo-zmq-messenger.h>
+#include <ufo/ufo-messenger-iface.h>
#include "zmq-shim.h"
@@ -40,8 +42,6 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT)
#define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate))
-#define CHECK_ZMQ(r) if (r == -1) g_warning ("%s:%i: zmq_error: %s\n", __FILE__, __LINE__, zmq_strerror (errno));
-
struct _UfoDaemonPrivate {
UfoConfig *config;
UfoPluginManager *manager;
@@ -58,10 +58,10 @@ struct _UfoDaemonPrivate {
gboolean has_started;
GMutex *started_lock;
GCond *started_cond;
+ UfoMessenger *msger;
};
static gpointer run_scheduler (UfoDaemon *daemon);
-static void validate_zmq_listen_address (gchar *addr);
UfoDaemon *
ufo_daemon_new (UfoConfig *config, gchar *listen_address)
@@ -71,8 +71,6 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address)
g_return_val_if_fail (listen_address != NULL, NULL);
g_return_val_if_fail (config != NULL, NULL);
- validate_zmq_listen_address (listen_address);
-
daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL));
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
@@ -80,68 +78,31 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address)
priv->listen_address = listen_address;
priv->manager = ufo_plugin_manager_new (priv->config);
priv->scheduler = ufo_scheduler_new (priv->config, NULL);
- priv->context = zmq_ctx_new ();
- priv->socket = zmq_socket (priv->context, ZMQ_REP);
-
- int err = zmq_bind (priv->socket, listen_address);
- if (err < 0)
- g_critical ("could not bind to address %s", listen_address);
+ priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
return daemon;
}
static void
-validate_zmq_listen_address (gchar *addr)
-{
- if (!g_str_has_prefix (addr, "tcp://"))
- g_critical ("address didn't start with tcp:// scheme, which is required currently");
-
- /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555
- * but not bind to it as it treats it like an interface name (like eth0).
- * We have to use IP addresses instead of DNS names.
- */
- gchar *host = g_strdup (&addr[6]);
- if (!g_ascii_isdigit (host[0]) && host[0] != '*')
- g_warning ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host);
- g_free (host);
-}
-
-static void
-ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags)
-{
- zmq_msg_t reply;
-
- zmq_msg_init_size (&reply, sizeof (UfoMessage));
- memcpy (zmq_msg_data (&reply), msg, sizeof (UfoMessage));
- zmq_msg_send (&reply, socket, flags);
- zmq_msg_close (&reply);
-}
-
-static void
-send_ack (gpointer socket)
-{
- UfoMessage msg;
- msg.type = UFO_MESSAGE_ACK;
- ufo_msg_send (&msg, socket, 0);
-}
-
-static void
handle_get_num_devices (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
-
- UfoMessage msg;
cl_context context;
+ UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
+ cl_uint *num_devices = g_malloc (sizeof (cl_uint));
context = ufo_scheduler_get_context (priv->scheduler);
UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context,
CL_CONTEXT_NUM_DEVICES,
sizeof (cl_uint),
- &msg.d.n_devices,
+ num_devices,
NULL));
- ufo_msg_send (&msg, priv->socket, 0);
+ *(guint16 *) msg->data = (guint16) *num_devices;
+
+ ufo_messenger_send_blocking (priv->msger, msg, 0);
+ ufo_message_free (msg);
}
static UfoNode *
@@ -166,34 +127,30 @@ remove_dummy_if_present (UfoGraph *graph,
}
static gchar *
-read_json (UfoDaemon *daemon)
+read_json (UfoDaemon *daemon, UfoMessage *msg)
{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
-
- zmq_msg_t json_msg;
- gsize size;
gchar *json;
- zmq_msg_init (&json_msg);
- size = (gsize) zmq_msg_recv (&json_msg, priv->socket, 0);
-
- json = g_malloc0 (size + 1);
- memcpy (json, zmq_msg_data (&json_msg), size);
- zmq_msg_close (&json_msg);
+ json = g_malloc0 (msg->data_size + 1);
+ memcpy (json, msg->data, msg->data_size);
return json;
}
static void
-handle_replicate_json (UfoDaemon *daemon)
+handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
gchar *json;
UfoTaskGraph *graph;
GError *error = NULL;
- json = read_json (daemon);
- send_ack (priv->socket);
+ json = read_json (daemon, msg);
+
+ // send ack
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
ufo_task_graph_read_from_data (graph, priv->manager, json, &error);
@@ -203,10 +160,7 @@ handle_replicate_json (UfoDaemon *daemon)
goto replicate_json_free;
}
- g_message ("Start scheduler");
ufo_scheduler_run (priv->scheduler, graph, NULL);
-
- g_message ("Done");
g_object_unref (priv->scheduler);
priv->scheduler = ufo_scheduler_new (priv->config, NULL);
@@ -217,7 +171,7 @@ replicate_json_free:
}
static void
-handle_stream_json (UfoDaemon *daemon)
+handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
gchar *json;
@@ -227,7 +181,11 @@ handle_stream_json (UfoDaemon *daemon)
UfoNode *last;
GError *error = NULL;
- json = read_json (daemon);
+ json = read_json (daemon, msg);
+ // send ack
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
/* Setup local task graph */
priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
@@ -263,79 +221,65 @@ handle_stream_json (UfoDaemon *daemon)
g_thread_create ((GThreadFunc) run_scheduler, daemon, FALSE, NULL);
g_free (json);
- send_ack (priv->socket);
-}
-
-static void
-handle_setup (UfoDaemon *daemon)
-{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- g_message ("Setup requested");
- send_ack (priv->socket);
}
static void
handle_get_structure (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoMessage header;
- UfoInputParam in_param;
- zmq_msg_t data_msg;
+ UfoMessage *response;
- g_message ("Structure requested");
- header.type = UFO_MESSAGE_STRUCTURE;
+ /* TODO move into .h and share between daemon and remote-node */
+ struct _Structure {
+ guint16 n_inputs;
+ guint16 n_dims;
+ } msg_data;
- /* TODO: do not hardcode these */
- header.d.n_inputs = 1;
- in_param.n_dims = 2;
+ /* TODO don't hardcode these */
+ msg_data.n_inputs = 1;
+ msg_data.n_dims = 2;
- zmq_msg_init_size (&data_msg, sizeof (UfoInputParam));
- memcpy (zmq_msg_data (&data_msg), &in_param, sizeof (UfoInputParam));
+ response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure));
+ *(struct _Structure *) (response->data) = msg_data;
- ufo_msg_send (&header, priv->socket, ZMQ_SNDMORE);
- zmq_msg_send (&data_msg, priv->socket, 0);
- zmq_msg_close (&data_msg);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
}
static void
-handle_send_inputs (UfoDaemon *daemon)
+handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoRequisition *requisition;
- zmq_msg_t requisition_msg;
- zmq_msg_t data_msg;
+ UfoRequisition requisition;
gpointer context;
context = ufo_scheduler_get_context (priv->scheduler);
- /* Receive buffer size */
- zmq_msg_init (&requisition_msg);
- zmq_msg_recv (&requisition_msg, priv->socket, 0);
- g_assert (zmq_msg_size (&requisition_msg) >= sizeof (UfoRequisition));
- requisition = zmq_msg_data (&requisition_msg);
+ struct _Header {
+ UfoRequisition requisition;
+ guint64 buffer_size;
+ };
+ gpointer base = request->data;
+ struct _Header *header = (struct _Header *) base;
+
+ /* Receive buffer size */
+ requisition = header->requisition;
if (priv->input == NULL) {
- priv->input = ufo_buffer_new (requisition, context);
+ priv->input = ufo_buffer_new (&requisition, context);
}
else {
- if (ufo_buffer_cmp_dimensions (priv->input, requisition))
- ufo_buffer_resize (priv->input, requisition);
+ if (ufo_buffer_cmp_dimensions (priv->input, &requisition))
+ ufo_buffer_resize (priv->input, &requisition);
}
-
- zmq_msg_close (&requisition_msg);
-
- /* Receive actual buffer */
- zmq_msg_init (&data_msg);
- zmq_msg_recv (&data_msg, priv->socket, 0);
-
memcpy (ufo_buffer_get_host_array (priv->input, NULL),
- zmq_msg_data (&data_msg),
+ base + sizeof (struct _Header),
ufo_buffer_get_size (priv->input));
-
ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
- zmq_msg_close (&data_msg);
- send_ack (priv->socket);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
}
static void
@@ -343,17 +287,15 @@ handle_get_requisition (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoRequisition requisition;
- zmq_msg_t reply_msg;
/* We need to get the requisition from the last node */
- g_message ("Requisition requested");
ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task),
&requisition);
- zmq_msg_init_size (&reply_msg, sizeof (UfoRequisition));
- memcpy (zmq_msg_data (&reply_msg), &requisition, sizeof (UfoRequisition));
- zmq_msg_send (&reply_msg, priv->socket, 0);
- zmq_msg_close (&reply_msg);
+ UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition));
+ memcpy (msg->data, &requisition, msg->data_size);
+ ufo_messenger_send_blocking (priv->msger, msg, NULL);
+ ufo_message_free (msg);
}
static
@@ -361,16 +303,14 @@ void handle_get_result (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoBuffer *buffer;
- zmq_msg_t reply_msg;
gsize size;
buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task));
size = ufo_buffer_get_size (buffer);
- zmq_msg_init_size (&reply_msg, size);
- memcpy (zmq_msg_data (&reply_msg), ufo_buffer_get_host_array (buffer, NULL), size);
- zmq_msg_send (&reply_msg, priv->socket, 0);
- zmq_msg_close (&reply_msg);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size);
+ memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer);
}
@@ -392,7 +332,9 @@ void handle_cleanup (UfoDaemon *daemon)
* We send the ACK early on, because we don't want to let the host wait for
* actually cleaning up (and waiting some time to unref the input task).
*/
- send_ack (priv->socket);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
// TODO check that we don't need to execture this branch wen priv->input is null
if (priv->input_task && priv->input) {
@@ -410,6 +352,18 @@ void handle_cleanup (UfoDaemon *daemon)
unref_and_free ((GObject **) &priv->task_graph);
}
+static void
+handle_terminate (UfoDaemon *daemon)
+{
+ UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
+
+ priv->run = FALSE;
+ ufo_messenger_disconnect (priv->msger);
+}
+
static gpointer
run_scheduler (UfoDaemon *daemon)
{
@@ -430,48 +384,39 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
while (priv->run) {
- zmq_msg_t request;
+ // zmq_msg_t request;
- zmq_msg_init (&request);
+ // zmq_msg_init (&request);
g_mutex_lock (priv->started_lock);
priv->has_started = TRUE;
g_cond_signal (priv->started_cond);
g_mutex_unlock (priv->started_lock);
- gint err = zmq_msg_recv (&request, priv->socket, 0);
+ // gint err = zmq_msg_recv (&request, priv->socket, 0);
/* if daemon is stopped, socket will be closed and msg_recv
* will yield an error - we simply want to return
*/
- if (err < 0) return;
-
- if (zmq_msg_size (&request) < sizeof (UfoMessage)) {
- g_warning ("Message is smaller than expected\n");
- send_ack (priv->socket);
- }
- else {
- UfoMessage *msg;
-
- msg = (UfoMessage *) zmq_msg_data (&request);
+ GError *err = NULL;
+ UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err);
+ if (err != NULL) {
+ } else {
switch (msg->type) {
case UFO_MESSAGE_GET_NUM_DEVICES:
handle_get_num_devices (daemon);
break;
case UFO_MESSAGE_STREAM_JSON:
- handle_stream_json (daemon);
+ handle_stream_json (daemon, msg);
break;
case UFO_MESSAGE_REPLICATE_JSON:
- handle_replicate_json (daemon);
- break;
- case UFO_MESSAGE_SETUP:
- handle_setup (daemon);
+ handle_replicate_json (daemon, msg);
break;
case UFO_MESSAGE_GET_STRUCTURE:
handle_get_structure (daemon);
break;
case UFO_MESSAGE_SEND_INPUTS:
- handle_send_inputs (daemon);
+ handle_send_inputs (daemon, msg);
break;
case UFO_MESSAGE_GET_REQUISITION:
handle_get_requisition (daemon);
@@ -482,11 +427,14 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
case UFO_MESSAGE_CLEANUP:
handle_cleanup (daemon);
break;
+ case UFO_MESSAGE_TERMINATE:
+ handle_terminate (daemon);
+ break;
default:
- g_print ("unhandled case\n");
+ g_message ("Unknown message received\n");
}
}
- zmq_msg_close (&request);
+ ufo_message_free (msg);
}
}
@@ -495,6 +443,9 @@ ufo_daemon_start (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ /* TODO handle error if unable to connect/bind */
+ ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER);
+
priv->run = TRUE;
priv->thread = g_thread_create ((GThreadFunc)ufo_daemon_start_impl, daemon, TRUE, NULL);
g_return_if_fail (priv->thread != NULL);
@@ -511,15 +462,15 @@ void
ufo_daemon_stop (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- priv->run = FALSE;
-
- zmq_close (priv->socket);
- if (priv->context != NULL) {
- zmq_ctx_destroy (priv->context);
- priv->context = NULL;
- g_assert (priv->context == NULL);
- }
+ /* HACK we can't call _disconnect() as this has to be run from the
+ * thread running the daemon - we thus send a TERMINATE message to
+ * that thread
+ */
+ UfoMessenger *tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT);
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0);
+ ufo_messenger_send_blocking (tmp_msger, request, NULL);
g_thread_join (priv->thread);
}
@@ -534,18 +485,13 @@ ufo_daemon_dispose (GObject *object)
g_object_unref (priv->task_graph);
if (priv->config != NULL)
g_object_unref (priv->config);
+ if (priv->msger != NULL)
+ g_object_unref (priv->msger);
if (priv->manager != NULL)
g_object_unref (priv->manager);
if (priv->scheduler != NULL)
g_object_unref (priv->scheduler);
- zmq_close (priv->socket);
-
- if (priv->context != NULL) {
- zmq_ctx_destroy (priv->context);
- priv->context = NULL;
- }
-
G_OBJECT_CLASS (ufo_daemon_parent_class)->dispose (object);
}
diff --git a/ufo/ufo-messenger-iface.c b/ufo/ufo-messenger-iface.c
new file mode 100644
index 0000000..26ed2a8
--- /dev/null
+++ b/ufo/ufo-messenger-iface.c
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+ #include <ufo/ufo-messenger-iface.h>
+
+typedef UfoMessengerIface UfoMessengerInterface;
+
+void
+ufo_message_free (UfoMessage *msg)
+{
+ if (msg == NULL)
+ return;
+ g_free (msg->data);
+ g_free (msg);
+}
+
+UfoMessage *
+ufo_message_new (UfoMessageType type, guint64 data_size)
+{
+ UfoMessage *msg = g_malloc (sizeof (UfoMessage));
+ msg->type = type;
+ msg->data_size = data_size;
+ if (data_size == 0)
+ msg->data = NULL;
+ else
+ msg->data = g_malloc (data_size);
+ return msg;
+}
+
+G_DEFINE_INTERFACE (UfoMessenger, ufo_messenger, G_TYPE_OBJECT)
+
+/**
+ * UfoTaskError:
+ * @UFO_TASK_ERROR_SETUP: Error during setup of a task.
+ */
+GQuark
+ufo_messenger_error_quark ()
+{
+ return g_quark_from_static_string ("ufo-messenger-error-quark");
+}
+
+void
+ufo_messenger_connect (UfoMessenger *msger,
+ gchar *addr,
+ UfoMessengerRole role)
+{
+ UFO_MESSENGER_GET_IFACE (msger)->connect (msger, addr, role);
+}
+
+void
+ufo_messenger_disconnect (UfoMessenger *msger)
+{
+ UFO_MESSENGER_GET_IFACE (msger)->disconnect (msger);
+}
+
+UfoMessage *
+ufo_messenger_send_blocking (UfoMessenger *msger,
+ UfoMessage *request,
+ GError **error)
+{
+ return UFO_MESSENGER_GET_IFACE (msger)->send_blocking (msger, request, error);
+}
+
+UfoMessage *
+ufo_messenger_recv_blocking (UfoMessenger *msger,
+ GError **error)
+{
+ return UFO_MESSENGER_GET_IFACE (msger)->recv_blocking (msger, error);
+}
+
+static void
+ufo_messenger_default_init (UfoMessengerInterface *iface)
+{
+}
diff --git a/ufo/ufo-messenger-iface.h b/ufo/ufo-messenger-iface.h
new file mode 100644
index 0000000..f6d9271
--- /dev/null
+++ b/ufo/ufo-messenger-iface.h
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef UFO_MESSENGER_H
+#define UFO_MESSENGER_H
+
+#if !defined (__UFO_H_INSIDE__) && !defined (UFO_COMPILATION)
+#error "Only <ufo/ufo.h> can be included directly."
+#endif
+
+#include <ufo/ufo-remote-node.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_MESSENGER (ufo_messenger_get_type())
+#define UFO_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_MESSENGER, UfoMessenger))
+#define UFO_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_MESSENGER, UfoMessengerIface))
+#define UFO_IS_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_MESSENGER))
+#define UFO_IS_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_MESSENGER))
+#define UFO_MESSENGER_GET_IFACE(inst) (G_TYPE_INSTANCE_GET_INTERFACE((inst), UFO_TYPE_MESSENGER, UfoMessengerIface))
+
+#define UFO_MESSENGER_ERROR ufo_messenger_error_quark()
+
+typedef struct _UfoMessenger UfoMessenger;
+typedef struct _UfoMessengerIface UfoMessengerIface;
+typedef struct _UfoMessage UfoMessage;
+
+
+/**
+ * UfoMessageType: (skip)
+ * @UFO_MESSAGE_STREAM_JSON: insert
+ * @UFO_MESSAGE_REPLICATE_JSON: insert
+ * @UFO_MESSAGE_GET_NUM_DEVICES: insert
+ * @UFO_MESSAGE_SETUP: insert
+ * @UFO_MESSAGE_GET_STRUCTURE: insert
+ * @UFO_MESSAGE_STRUCTURE: insert
+ * @UFO_MESSAGE_GET_REQUISITION: insert
+ * @UFO_MESSAGE_REQUISITION: insert
+ * @UFO_MESSAGE_SEND_INPUTS: insert
+ * @UFO_MESSAGE_GET_RESULT: insert
+ * @UFO_MESSAGE_RESULT: insert
+ * @UFO_MESSAGE_CLEANUP: insert
+ * @UFO_MESSAGE_ACK: insert
+ */
+typedef enum {
+ UFO_MESSAGE_STREAM_JSON = 0,
+ UFO_MESSAGE_REPLICATE_JSON,
+ UFO_MESSAGE_GET_NUM_DEVICES,
+ UFO_MESSAGE_GET_STRUCTURE,
+ UFO_MESSAGE_STRUCTURE,
+ UFO_MESSAGE_GET_REQUISITION,
+ UFO_MESSAGE_REQUISITION,
+ UFO_MESSAGE_SEND_INPUTS,
+ UFO_MESSAGE_GET_RESULT,
+ UFO_MESSAGE_RESULT,
+ UFO_MESSAGE_CLEANUP,
+ UFO_MESSAGE_TERMINATE,
+ UFO_MESSAGE_ACK
+} UfoMessageType;
+
+/**
+ * UfoMessage: (skip)
+ * @type: Type of the wire message
+ */
+struct _UfoMessage {
+ UfoMessageType type;
+ guint64 data_size;
+ gpointer data;
+};
+
+void ufo_message_free (UfoMessage *msg);
+UfoMessage * ufo_message_new (UfoMessageType type, guint64 data_size);
+
+typedef enum {
+ UFO_MESSENGER_BUFFER_FULL,
+ UFO_MESSENGER_SIZE_MISSMATCH
+} UfoMessengerError;
+
+typedef enum {
+ UFO_MESSENGER_CLIENT,
+ UFO_MESSENGER_SERVER
+} UfoMessengerRole;
+
+struct _UfoMessengerIface {
+ /*< private >*/
+ GTypeInterface parent_iface;
+
+ void (*connect) (UfoMessenger *msger,
+ gchar *addr,
+ UfoMessengerRole role);
+
+ void (*disconnect) (UfoMessenger *msger);
+
+ UfoMessage * (*send_blocking) (UfoMessenger *msger,
+ UfoMessage *request,
+ GError **error);
+
+ UfoMessage * (*recv_blocking) (UfoMessenger *msger,
+ GError **error);
+};
+
+
+void ufo_messenger_connect (UfoMessenger *msger,
+ gchar *addr,
+ UfoMessengerRole role);
+
+void ufo_messenger_disconnect (UfoMessenger *msger);
+
+UfoMessage *ufo_messenger_send_blocking (UfoMessenger *msger,
+ UfoMessage *request,
+ GError **error);
+
+UfoMessage *ufo_messenger_recv_blocking (UfoMessenger *msger,
+ GError **error);
+
+GQuark ufo_messenger_error_quark (void);
+GType ufo_messenger_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c
index e72f6f9..dcf4fdc 100644
--- a/ufo/ufo-remote-node.c
+++ b/ufo/ufo-remote-node.c
@@ -19,6 +19,8 @@
#include <string.h>
#include <ufo/ufo-remote-node.h>
+#include <ufo/ufo-messenger-iface.h>
+#include <ufo/ufo-zmq-messenger.h>
#include "zmq-shim.h"
@@ -26,14 +28,12 @@ G_DEFINE_TYPE (UfoRemoteNode, ufo_remote_node, UFO_TYPE_NODE)
#define UFO_REMOTE_NODE_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_REMOTE_NODE, UfoRemoteNodePrivate))
-static void ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags);
-static void receive_ack (gpointer socket);
-
struct _UfoRemoteNodePrivate {
gpointer context;
gpointer socket;
guint n_inputs;
GMutex *mutex;
+ UfoMessenger *msger;
};
UfoNode *
@@ -44,67 +44,50 @@ ufo_remote_node_new (const gchar *address)
g_return_val_if_fail (address != NULL, NULL);
node = UFO_REMOTE_NODE (g_object_new (UFO_TYPE_REMOTE_NODE, NULL));
- priv = node->priv;
- priv->socket = zmq_socket (priv->context, ZMQ_REQ);
+ priv = UFO_REMOTE_NODE_GET_PRIVATE (node);
- if (zmq_connect (priv->socket, address) == 0) {
- g_message ("Connected remote node to `%s' via socket=%p",
- address,
- priv->socket);
- return UFO_NODE (node);
- }
- else {
- g_warning ("Could not connect to `%s': %s",
- address,
- zmq_strerror (errno));
- g_object_unref (node);
- return NULL;
- }
+ priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+
+ gchar *addr = g_strdup (address);
+ ufo_messenger_connect (priv->msger, addr, UFO_MESSENGER_CLIENT);
+ g_free(addr);
+
+ return UFO_NODE (node);
}
guint
ufo_remote_node_get_num_gpus (UfoRemoteNode *node)
{
- UfoRemoteNodePrivate *priv;
- UfoMessage request;
- UfoMessage result;
- zmq_msg_t reply;
-
g_return_val_if_fail (UFO_IS_REMOTE_NODE (node), 0);
- priv = node->priv;
-
- g_mutex_lock (priv->mutex);
- request.type = UFO_MESSAGE_GET_NUM_DEVICES;
- ufo_msg_send (&request, priv->socket, 0);
+ UfoRemoteNodePrivate *priv;
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_GET_NUM_DEVICES, 0);
- zmq_msg_init (&reply);
- zmq_msg_recv (&reply, priv->socket, 0);
- memcpy (&result, zmq_msg_data (&reply), sizeof (UfoMessage));
- zmq_msg_close (&reply);
+ priv = node->priv;
- g_mutex_unlock (priv->mutex);
+ UfoMessage *result;
+ result = ufo_messenger_send_blocking (priv->msger, request, NULL);
+ guint n_devices = * (guint16 *) result->data;
- return result.d.n_devices;
+ ufo_message_free (request);
+ ufo_message_free (result);
+ g_assert (n_devices < 32);
+ return n_devices;
}
void
ufo_remote_node_request_setup (UfoRemoteNode *node)
{
- UfoRemoteNodePrivate *priv;
- UfoMessage request;
+ // TODO setup isn't in use, remove it
+ //g_assert (FALSE);
- g_return_if_fail (UFO_IS_REMOTE_NODE (node));
-
- priv = node->priv;
- request.type = UFO_MESSAGE_SETUP;
-
- g_mutex_lock (priv->mutex);
+ // g_return_if_fail (UFO_IS_REMOTE_NODE (node));
+ // UfoRemoteNodePrivate *priv = UFO_REMOTE_NODE_GET_PRIVATE (node);
- ufo_msg_send (&request, priv->socket, 0);
- receive_ack (priv->socket);
-
- g_mutex_unlock (priv->mutex);
+ // UfoMessage *request;
+ // request = ufo_message_new (UFO_MESSAGE_SETUP, 0);
+ // ufo_message_send_blocking (request);
+ // ufo_message_free (request);
}
void
@@ -113,36 +96,28 @@ ufo_remote_node_send_json (UfoRemoteNode *node,
const gchar *json)
{
UfoRemoteNodePrivate *priv;
- UfoMessage request;
- gsize size;
- zmq_msg_t json_msg;
+ UfoMessage *request;
+ guint64 size;
g_return_if_fail (UFO_IS_REMOTE_NODE (node));
priv = node->priv;
+ UfoMessageType type;
switch (mode) {
case UFO_REMOTE_MODE_STREAM:
- request.type = UFO_MESSAGE_STREAM_JSON;
+ type = UFO_MESSAGE_STREAM_JSON;
break;
case UFO_REMOTE_MODE_REPLICATE:
- request.type = UFO_MESSAGE_REPLICATE_JSON;
+ type = UFO_MESSAGE_REPLICATE_JSON;
break;
}
- g_mutex_lock (priv->mutex);
-
- ufo_msg_send (&request, priv->socket, ZMQ_SNDMORE);
-
- size = strlen (json);
- zmq_msg_init_size (&json_msg, size);
- memcpy (zmq_msg_data (&json_msg), json, size);
- zmq_msg_send (&json_msg, priv->socket, 0);
- zmq_msg_close (&json_msg);
+ size = (guint64) strlen (json);
+ request = ufo_message_new (type, size);
- receive_ack (priv->socket);
-
- g_mutex_unlock (priv->mutex);
+ memcpy (request->data, json, size);
+ ufo_messenger_send_blocking (priv->msger, request, NULL);
}
void
@@ -152,41 +127,33 @@ ufo_remote_node_get_structure (UfoRemoteNode *node,
UfoTaskMode *mode)
{
UfoRemoteNodePrivate *priv;
- UfoMessage request;
- UfoMessage *header;
- zmq_msg_t header_msg;
- zmq_msg_t payload_msg;
- UfoInputParam *in_param;
-
- g_return_if_fail (UFO_IS_REMOTE_NODE (node));
+ UfoMessage *request, *response;
priv = node->priv;
+
+ struct _Structure {
+ guint16 n_inputs;
+ guint16 n_dims;
+ } msg_data;
+
+ g_return_if_fail (UFO_IS_REMOTE_NODE (node));
*mode = UFO_TASK_MODE_PROCESSOR;
- request.type = UFO_MESSAGE_GET_STRUCTURE;
- g_mutex_lock (priv->mutex);
- ufo_msg_send (&request, priv->socket, 0);
+ request = ufo_message_new (UFO_MESSAGE_GET_STRUCTURE, 0);
+ response = ufo_messenger_send_blocking (priv->msger, request, NULL);
+ g_assert (response->data_size == sizeof (struct _Structure));
- /* Receive header */
- zmq_msg_init (&header_msg);
- zmq_msg_recv (&header_msg, priv->socket, 0);
- header = (UfoMessage *) zmq_msg_data (&header_msg);
+ msg_data = *(struct _Structure *) response->data;
- /* Receive payload */
- zmq_msg_init (&payload_msg);
- zmq_msg_recv (&payload_msg, priv->socket, 0);
- in_param = (UfoInputParam *) zmq_msg_data (&payload_msg);
+ priv->n_inputs = msg_data.n_inputs;
+ *n_inputs = msg_data.n_inputs;
- priv->n_inputs = header->d.n_inputs;
- *n_inputs = header->d.n_inputs;
*in_params = g_new0 (UfoInputParam, 1);
- (*in_params)[0].n_dims = in_param->n_dims;
+ (*in_params)[0].n_dims = msg_data.n_dims;
- zmq_msg_close (&header_msg);
- zmq_msg_close (&payload_msg);
-
- g_mutex_unlock (priv->mutex);
+ ufo_message_free (request);
+ ufo_message_free (response);
}
void
@@ -194,48 +161,48 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node,
UfoBuffer **inputs)
{
UfoRemoteNodePrivate *priv;
- UfoMessage request;
+ UfoMessage *request;
g_return_if_fail (UFO_IS_REMOTE_NODE (node));
priv = node->priv;
- request.type = UFO_MESSAGE_SEND_INPUTS;
-
- g_mutex_lock (priv->mutex);
-
- ufo_msg_send (&request, priv->socket, ZMQ_SNDMORE);
/*
* For each of the input data items send two frames: the first one contains
* the size as an UfoRequisition struct and the second one the raw byte
* data.
*/
- for (guint i = 0; i < priv->n_inputs; i++) {
+ struct _Header {
UfoRequisition requisition;
- zmq_msg_t requisition_msg;
- zmq_msg_t data_msg;
- gsize size;
- gint flags;
+ guint64 buffer_size;
+ };
- ufo_buffer_get_requisition (inputs[i], &requisition);
- size = ufo_buffer_get_size (inputs[i]);
-
- zmq_msg_init_size (&requisition_msg, sizeof (UfoRequisition));
- zmq_msg_init_size (&data_msg, size);
-
- memcpy (zmq_msg_data (&requisition_msg), &requisition, sizeof (UfoRequisition));
- memcpy (zmq_msg_data (&data_msg), ufo_buffer_get_host_array (inputs[i], NULL), size);
+ // determine our total message size
+ guint64 size;
+ for (guint i = 0; i < priv->n_inputs; i++) {
+ guint64 buffer_size = ufo_buffer_get_size (inputs[i]);
+ size += buffer_size;
+ }
+ gpointer buffer = g_malloc (priv->n_inputs * sizeof (struct _Header) + size);
- flags = i == priv->n_inputs - 1 ? 0 : ZMQ_SNDMORE;
- zmq_msg_send (&requisition_msg, priv->socket, ZMQ_SNDMORE);
- zmq_msg_send (&data_msg, priv->socket, flags);
+ gpointer base = buffer;
- zmq_msg_close (&requisition_msg);
- zmq_msg_close (&data_msg);
+ for (guint i = 0; i < priv->n_inputs; i++) {
+ struct _Header *header = g_new0 (struct _Header, 1);
+ ufo_buffer_get_requisition (inputs[i], &header->requisition);
+ header->buffer_size = (guint64) ufo_buffer_get_size (inputs[i]);
+
+ memcpy (base, header, sizeof (struct _Header));
+ base += sizeof (struct _Header);
+ memcpy (base, ufo_buffer_get_host_array (inputs[i], NULL), header->buffer_size);
+ base += header->buffer_size;
}
+ request = ufo_message_new (UFO_MESSAGE_SEND_INPUTS, size);
+ g_free (request->data);
+ request->data = buffer;
+ // send as a single message
+ ufo_messenger_send_blocking (priv->msger, request, NULL);
- receive_ack (priv->socket);
- g_mutex_unlock (priv->mutex);
}
void
@@ -243,31 +210,23 @@ ufo_remote_node_get_result (UfoRemoteNode *node,
UfoBuffer *buffer)
{
UfoRemoteNodePrivate *priv;
- UfoMessage request;
- zmq_msg_t reply_msg;
+ UfoMessage *request, *response;
gpointer host_array;
g_return_if_fail (UFO_IS_REMOTE_NODE (node));
priv = node->priv;
- request.type = UFO_MESSAGE_GET_RESULT;
-
- g_mutex_lock (priv->mutex);
-
- ufo_msg_send (&request, priv->socket, 0);
-
- /* Get the remote data and put it into our buffer */
- zmq_msg_init (&reply_msg);
- zmq_msg_recv (&reply_msg, priv->socket, 0);
+ request = ufo_message_new (UFO_MESSAGE_GET_RESULT, 0);
+ response = ufo_messenger_send_blocking (priv->msger, request, NULL);
ufo_buffer_discard_location (buffer);
host_array = ufo_buffer_get_host_array (buffer, NULL);
- g_assert (ufo_buffer_get_size (buffer) == zmq_msg_size (&reply_msg));
- memcpy (host_array, zmq_msg_data (&reply_msg), ufo_buffer_get_size (buffer));
+ g_assert (ufo_buffer_get_size (buffer) == response->data_size);
- zmq_msg_close (&reply_msg);
+ memcpy (host_array, response->data, ufo_buffer_get_size (buffer));
- g_mutex_unlock (priv->mutex);
+ ufo_message_free (request);
+ ufo_message_free (response);
}
void
@@ -275,75 +234,45 @@ ufo_remote_node_get_requisition (UfoRemoteNode *node,
UfoRequisition *requisition)
{
UfoRemoteNodePrivate *priv;
- UfoMessage request;
- zmq_msg_t reply_msg;
+ UfoMessage *request, *response;
g_return_if_fail (UFO_IS_REMOTE_NODE (node));
priv = node->priv;
- request.type = UFO_MESSAGE_GET_REQUISITION;
-
- g_mutex_lock (priv->mutex);
+ request = ufo_message_new (UFO_MESSAGE_GET_REQUISITION, 0);
+ response = ufo_messenger_send_blocking (priv->msger, request, NULL);
- ufo_msg_send (&request, priv->socket, 0);
+ g_assert (response->data_size == sizeof (UfoRequisition));
+ memcpy (requisition, response->data, sizeof (UfoRequisition));
- zmq_msg_init (&reply_msg);
- zmq_msg_recv (&reply_msg, priv->socket, 0);
- g_assert (zmq_msg_size (&reply_msg) >= sizeof (UfoRequisition));
- memcpy (requisition, zmq_msg_data (&reply_msg), sizeof (UfoRequisition));
- zmq_msg_close (&reply_msg);
-
- g_mutex_unlock (priv->mutex);
+ ufo_message_free(request);
+ ufo_message_free(response);
}
static void
-cleanup_remote (gpointer socket)
+cleanup_remote (UfoRemoteNodePrivate *priv)
{
- UfoMessage request;
-
- request.type = UFO_MESSAGE_CLEANUP;
-
- ufo_msg_send (&request, socket, 0);
- receive_ack (socket);
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_CLEANUP, 0);
+ ufo_messenger_send_blocking (priv->msger, request, NULL);
+ ufo_message_free (request);
}
static void
-ufo_msg_send (UfoMessage *msg,
- gpointer socket,
- gint flags)
+terminate_remote (UfoRemoteNodePrivate *priv)
{
- zmq_msg_t request;
-
- zmq_msg_init_size (&request, sizeof (UfoMessage));
- memcpy (zmq_msg_data (&request), msg, sizeof (UfoMessage));
- zmq_msg_send (&request, socket, flags);
- zmq_msg_close (&request);
-}
-
-static void
-receive_ack (gpointer socket)
-{
- zmq_msg_t reply_msg;
-
- zmq_msg_init (&reply_msg);
- zmq_msg_recv (&reply_msg, socket, 0);
- zmq_msg_close (&reply_msg);
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0);
+ ufo_messenger_send_blocking (priv->msger, request, NULL);
+ ufo_message_free (request);
}
static void
ufo_remote_node_dispose (GObject *object)
{
UfoRemoteNodePrivate *priv;
-
priv = UFO_REMOTE_NODE_GET_PRIVATE (object);
- if (priv->socket != NULL) {
- cleanup_remote (priv->socket);
-
- g_debug ("Close socket=%p", priv->socket);
- zmq_close (priv->socket);
- priv->socket = NULL;
- }
+ cleanup_remote (priv);
+ ufo_messenger_disconnect (priv->msger);
G_OBJECT_CLASS (ufo_remote_node_parent_class)->dispose (object);
}
@@ -355,9 +284,9 @@ ufo_remote_node_finalize (GObject *object)
priv = UFO_REMOTE_NODE_GET_PRIVATE (object);
g_mutex_free (priv->mutex);
-
+
if (priv->context != NULL) {
- g_debug ("Destroy zmq_context=%p", priv->context);
+ g_debug ("RemoteNode destroying zmq_context=%p", priv->context);
zmq_ctx_destroy (priv->context);
priv->context = NULL;
}
diff --git a/ufo/ufo-remote-node.h b/ufo/ufo-remote-node.h
index 1124ddd..c73ec00 100644
--- a/ufo/ufo-remote-node.h
+++ b/ufo/ufo-remote-node.h
@@ -27,6 +27,7 @@
#include <ufo/ufo-node.h>
#include <ufo/ufo-buffer.h>
#include <ufo/ufo-task-iface.h>
+#include <ufo/ufo-messenger-iface.h>
G_BEGIN_DECLS
@@ -40,7 +41,6 @@ G_BEGIN_DECLS
typedef struct _UfoRemoteNode UfoRemoteNode;
typedef struct _UfoRemoteNodeClass UfoRemoteNodeClass;
typedef struct _UfoRemoteNodePrivate UfoRemoteNodePrivate;
-typedef struct _UfoMessage UfoMessage;
/**
* UfoRemoteNode:
@@ -78,50 +78,6 @@ typedef enum {
} UfoRemoteMode;
-/**
- * UfoMessageType: (skip)
- * @UFO_MESSAGE_STREAM_JSON: insert
- * @UFO_MESSAGE_REPLICATE_JSON: insert
- * @UFO_MESSAGE_GET_NUM_DEVICES: insert
- * @UFO_MESSAGE_SETUP: insert
- * @UFO_MESSAGE_GET_STRUCTURE: insert
- * @UFO_MESSAGE_STRUCTURE: insert
- * @UFO_MESSAGE_GET_REQUISITION: insert
- * @UFO_MESSAGE_REQUISITION: insert
- * @UFO_MESSAGE_SEND_INPUTS: insert
- * @UFO_MESSAGE_GET_RESULT: insert
- * @UFO_MESSAGE_RESULT: insert
- * @UFO_MESSAGE_CLEANUP: insert
- * @UFO_MESSAGE_ACK: insert
- */
-typedef enum {
- UFO_MESSAGE_STREAM_JSON = 0,
- UFO_MESSAGE_REPLICATE_JSON,
- UFO_MESSAGE_GET_NUM_DEVICES,
- UFO_MESSAGE_SETUP,
- UFO_MESSAGE_GET_STRUCTURE,
- UFO_MESSAGE_STRUCTURE,
- UFO_MESSAGE_GET_REQUISITION,
- UFO_MESSAGE_REQUISITION,
- UFO_MESSAGE_SEND_INPUTS,
- UFO_MESSAGE_GET_RESULT,
- UFO_MESSAGE_RESULT,
- UFO_MESSAGE_CLEANUP,
- UFO_MESSAGE_ACK
-} UfoMessageType;
-
-/**
- * UfoMessage: (skip)
- * @type: Type of the wire message
- */
-struct _UfoMessage {
- UfoMessageType type;
-
- union {
- guint16 n_inputs;
- guint16 n_devices;
- } d;
-};
UfoNode *ufo_remote_node_new (const gchar *address);
guint ufo_remote_node_get_num_gpus (UfoRemoteNode *node);
diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c
new file mode 100644
index 0000000..83b167f
--- /dev/null
+++ b/ufo/ufo-zmq-messenger.c
@@ -0,0 +1,305 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <ufo/ufo-zmq-messenger.h>
+#include <zmq.h>
+#include <string.h>
+
+
+static void ufo_messenger_interface_init (UfoMessengerIface *iface);
+
+#define UFO_ZMQ_MESSENGER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerPrivate))
+
+G_DEFINE_TYPE_WITH_CODE (UfoZmqMessenger, ufo_zmq_messenger, G_TYPE_OBJECT,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_MESSENGER,
+ ufo_messenger_interface_init))
+
+
+struct _UfoZmqMessengerPrivate {
+ gchar *remote_addr;
+ GMutex *mutex;
+ gpointer zmq_socket;
+ gpointer zmq_ctx;
+ UfoMessengerRole role;
+};
+
+/* C99 allows flexible length structs that we use to map
+* arbitrary frame lengths that are transferred via zmq.
+* Note: Sizes of datatypes should be fixed and equal on all plattforms
+* (i.e. don't use a gsize as it has different size on x86 & x86_64)
+*/
+typedef struct _DataFrame {
+ UfoMessageType type;
+ guint64 data_size;
+ // variable length data field
+ char data[];
+} DataFrame;
+
+UfoZmqMessenger *
+ufo_zmq_messenger_new (void)
+{
+ UfoZmqMessenger *msger;
+ msger = UFO_ZMQ_MESSENGER (g_object_new (UFO_TYPE_ZMQ_MESSENGER, NULL));
+
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+ priv->zmq_ctx = zmq_ctx_new ();
+
+ return msger;
+}
+
+static void
+validate_zmq_listen_address (gchar *addr)
+{
+ if (!g_str_has_prefix (addr, "tcp://"))
+ g_critical ("address didn't start with tcp:// scheme, which is required currently");
+
+ /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555
+ * but not bind to it as it treats it like an interface name (like eth0).
+ * We have to use IP addresses instead of DNS names.
+ */
+ gchar *host = g_strdup (&addr[6]);
+ if (!g_ascii_isdigit (host[0]) && host[0] != '*')
+ g_message ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host);
+ g_free (host);
+}
+
+void
+ufo_zmq_messenger_connect (UfoMessenger *msger, gchar *addr, UfoMessengerRole role)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+ g_mutex_lock (priv->mutex);
+
+ priv->remote_addr = g_strdup (addr);
+ priv->role = role;
+
+ if (role == UFO_MESSENGER_CLIENT) {
+ priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REQ);
+
+ if (zmq_connect (priv->zmq_socket, priv->remote_addr) == 0) {
+ g_message ("Connected to `%s' via socket=%p",
+ priv->remote_addr,
+ priv->zmq_socket);
+ }
+ else {
+ g_warning ("Could not connect to `%s': %s",
+ addr,
+ zmq_strerror (errno));
+ }
+ } else if (role == UFO_MESSENGER_SERVER) {
+ validate_zmq_listen_address (priv->remote_addr);
+ priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REP);
+
+ gint err = zmq_bind (priv->zmq_socket, priv->remote_addr);
+ if (err < 0)
+ g_critical ("could not bind to address %s", priv->remote_addr);
+ }
+
+ g_mutex_unlock (priv->mutex);
+ return;
+}
+
+void
+ufo_zmq_messenger_disconnect (UfoMessenger *msger)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+
+ g_mutex_lock (priv->mutex);
+
+ if (priv->zmq_socket != NULL) {
+ zmq_close (priv->zmq_socket);
+ priv->zmq_socket = NULL;
+
+ // waits for outstanding messages to be flushed
+ zmq_term (priv->zmq_ctx);
+ g_free (priv->remote_addr);
+ }
+
+ g_mutex_unlock (priv->mutex);
+ return;
+}
+
+UfoMessage *
+ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
+ UfoMessage *request_msg,
+ GError **error)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+
+ if (request_msg->type == UFO_MESSAGE_ACK && priv->role == UFO_MESSENGER_CLIENT)
+ g_critical ("Clients can't send ACK messages");
+
+ g_mutex_lock (priv->mutex);
+
+ UfoMessage *result = NULL;
+ zmq_msg_t request;
+
+ gsize frame_size = sizeof (DataFrame) + request_msg->data_size;
+ zmq_msg_init_size (&request, frame_size);
+ DataFrame *frame = (DataFrame *) zmq_msg_data (&request);
+
+ frame->data_size = request_msg->data_size;
+ frame->type = request_msg->type;
+ memcpy (frame->data, request_msg->data, request_msg->data_size);
+
+ gint err = zmq_msg_send (&request, priv->zmq_socket, 0);
+ zmq_msg_close (&request);
+
+ if (err < 0) {
+ g_set_error (error, ufo_messenger_error_quark (), zmq_errno (),
+ "Error sending message via %s: %s",
+ priv->remote_addr, zmq_strerror (zmq_errno ()));
+ goto finalize;
+ }
+
+ /* if this is an ACK message, don't expect a response
+ * (send_blocking is then most likely being called by the server)
+ */
+ if (request_msg->type == UFO_MESSAGE_ACK) {
+ goto finalize;
+ }
+
+ /* we always need to receive as response as ZMQ
+ * requires REQ/REP/REQ/REP/... scheme
+ */
+ zmq_msg_t reply;
+ zmq_msg_init (&reply);
+
+ gint size = zmq_msg_recv (&reply, priv->zmq_socket, 0);
+ if (size < 0) {
+ g_set_error (error, ufo_messenger_error_quark (), zmq_errno(),
+ "Could not receive from %s: %s ", priv->remote_addr,
+ zmq_strerror (zmq_errno ()));
+ goto finalize;
+ }
+
+ DataFrame *resp_frame = (DataFrame *) zmq_msg_data (&reply);
+
+ guint64 expected_size = (guint32) (sizeof (DataFrame) + resp_frame->data_size);
+ if ((guint64) size != expected_size) {
+ g_set_error (error, ufo_messenger_error_quark(),
+ UFO_MESSENGER_SIZE_MISSMATCH,
+ "Received unexpected frame size: %d", size);
+ goto finalize;
+ }
+
+ UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size);
+ memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size);
+
+ zmq_msg_close (&reply);
+ result = reply_msg;
+ goto finalize;
+
+ finalize:
+ g_mutex_unlock (priv->mutex);
+ return result;
+
+}
+
+UfoMessage *
+ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
+ GError **error)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+ g_assert (priv->role == UFO_MESSENGER_SERVER);
+
+ g_mutex_lock (priv->mutex);
+
+ UfoMessage *result;
+ zmq_msg_t reply;
+ zmq_msg_init (&reply);
+ gint size = zmq_msg_recv (&reply, priv->zmq_socket, 0);
+
+ if (size < 0) {
+ zmq_msg_close (&reply);
+ g_set_error (error, ufo_messenger_error_quark(), zmq_errno(),
+ "Could not receive from %s: %s ", priv->remote_addr,
+ zmq_strerror (zmq_errno ()));
+ goto finalize;
+ }
+
+ DataFrame *frame = zmq_msg_data (&reply);
+
+ guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size);
+ if ((guint)size != expected_size) {
+ g_set_error (error, ufo_messenger_error_quark(),
+ UFO_MESSENGER_SIZE_MISSMATCH,
+ "Received unexpected frame size: %d, should be: %d",
+ size, expected_size);
+ goto finalize;
+ }
+
+ UfoMessage *msg = ufo_message_new (frame->type, frame->data_size);
+ memcpy (msg->data, frame->data, frame->data_size);
+
+ zmq_msg_close (&reply);
+ result = msg;
+ goto finalize;
+
+ finalize:
+ g_mutex_unlock (priv->mutex);
+ return result;
+}
+
+static void
+ufo_messenger_interface_init (UfoMessengerIface *iface)
+{
+ iface->connect = ufo_zmq_messenger_connect;
+ iface->disconnect = ufo_zmq_messenger_disconnect;
+ iface->send_blocking = ufo_zmq_messenger_send_blocking;
+ iface->recv_blocking = ufo_zmq_messenger_recv_blocking;
+}
+
+
+static void
+ufo_zmq_messenger_dispose (GObject *object)
+{
+ ufo_zmq_messenger_disconnect (UFO_MESSENGER (object));
+}
+
+static void
+ufo_zmq_messenger_finalize (GObject *object)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (object);
+
+ if (priv->zmq_ctx != NULL) {
+ zmq_ctx_destroy (priv->zmq_ctx);
+ priv->zmq_ctx = NULL;
+ }
+
+ g_mutex_free (priv->mutex);
+}
+
+static void
+ufo_zmq_messenger_class_init (UfoZmqMessengerClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+ oclass->dispose = ufo_zmq_messenger_dispose;
+ oclass->finalize = ufo_zmq_messenger_finalize;
+
+ g_type_class_add_private (klass, sizeof(UfoZmqMessengerPrivate));
+}
+
+static void
+ufo_zmq_messenger_init (UfoZmqMessenger *msger)
+{
+ UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+ priv->zmq_socket = NULL;
+ priv->zmq_ctx = NULL;
+ priv->mutex = g_mutex_new ();
+}
diff --git a/ufo/ufo-zmq-messenger.h b/ufo/ufo-zmq-messenger.h
new file mode 100644
index 0000000..19462cb
--- /dev/null
+++ b/ufo/ufo-zmq-messenger.h
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ZMQ_MESSENGER_H
+#define __UFO_ZMQ_MESSENGER_H
+
+#include <ufo/ufo-remote-node.h>
+#include <ufo/ufo-messenger-iface.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ZMQ_MESSENGER (ufo_zmq_messenger_get_type())
+#define UFO_ZMQ_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessenger))
+#define UFO_IS_ZMQ_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_MESSENGER))
+#define UFO_ZMQ_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerClass))
+#define UFO_IS_ZMQ_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_MESSENGER))
+#define UFO_ZMQ_MESSENGER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerClass))
+
+typedef struct _UfoZmqMessenger UfoZmqMessenger;
+typedef struct _UfoZmqMessengerClass UfoZmqMessengerClass;
+typedef struct _UfoZmqMessengerPrivate UfoZmqMessengerPrivate;
+
+struct _UfoZmqMessenger {
+ /*< private >*/
+ GObject parent_instance;
+
+ UfoZmqMessengerPrivate *priv;
+};
+
+struct _UfoZmqMessengerClass {
+ /*< private >*/
+ GObjectClass parent_class;
+};
+
+UfoZmqMessenger *ufo_zmq_messenger_new (void);
+GType ufo_zmq_messenger_get_type (void);
+
+void ufo_zmq_messenger_connect (UfoMessenger *msger,
+ gchar *addr,
+ UfoMessengerRole role);
+
+void ufo_zmq_messenger_disconnect (UfoMessenger *msg);
+
+UfoMessage *ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
+ UfoMessage *request,
+ GError **error);
+
+UfoMessage *ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
+ GError **error);
+
+G_END_DECLS
+
+#endif
diff --git a/ufo/ufo.h b/ufo/ufo.h
index 6fdc9b3..9584ca2 100644
--- a/ufo/ufo.h
+++ b/ufo/ufo.h
@@ -48,6 +48,8 @@
#include <ufo/ufo-task-iface.h>
#include <ufo/ufo-task-node.h>
#include <ufo/ufo-basic-ops.h>
+#include <ufo/ufo-messenger-iface.h>
+#include <ufo/ufo-zmq-messenger.h>
#undef __UFO_H_INSIDE__