diff options
author | Timo Dörr <timo@latecrew.de> | 2013-10-04 17:18:41 +0200 |
---|---|---|
committer | Timo Dörr <timo@latecrew.de> | 2013-10-23 14:08:07 +0200 |
commit | 921d9015736205bb03a02985f6046d931cdd26c9 (patch) | |
tree | eeb00d06512f51c2140c5d342d40eb7da4ef19e2 | |
parent | 89fc1edf44ca48086c193d5ad64d6a0f7ee12660 (diff) |
Abstract away communication into interface
* Create generic interface ufo_messenger_iface
* Move all zmq_* stuff into ufo_zmq_messenger that implements that
interface
* also fixes a bug that happened when closing the zmq_socket from
another thread by introducing a special TERMINATE message that
will kill ufo-daemon
-rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
-rw-r--r-- | tests/test-remote-node.c | 21 | ||||
-rw-r--r-- | tests/test-suite.c | 3 | ||||
-rw-r--r-- | tests/test-suite.h | 1 | ||||
-rw-r--r-- | tests/test-zmq-messenger.c | 110 | ||||
-rw-r--r-- | tools/ufod.c | 1 | ||||
-rw-r--r-- | ufo/CMakeLists.txt | 4 | ||||
-rw-r--r-- | ufo/ufo-daemon.c | 270 | ||||
-rw-r--r-- | ufo/ufo-messenger-iface.c | 90 | ||||
-rw-r--r-- | ufo/ufo-messenger-iface.h | 137 | ||||
-rw-r--r-- | ufo/ufo-remote-node.c | 285 | ||||
-rw-r--r-- | ufo/ufo-remote-node.h | 46 | ||||
-rw-r--r-- | ufo/ufo-zmq-messenger.c | 305 | ||||
-rw-r--r-- | ufo/ufo-zmq-messenger.h | 70 | ||||
-rw-r--r-- | ufo/ufo.h | 2 |
15 files changed, 957 insertions, 389 deletions
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 254bfa7..22a9813 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,6 +7,7 @@ set(TEST_SRCS test-graph.c test-profiler.c test-remote-node.c + test-zmq-messenger.c ) set(SUITE_BIN "test-suite") diff --git a/tests/test-remote-node.c b/tests/test-remote-node.c index 59bd485..8642988 100644 --- a/tests/test-remote-node.c +++ b/tests/test-remote-node.c @@ -19,7 +19,6 @@ #include <string.h> #include <ufo.h> -#include <zmq.h> #include "test-suite.h" typedef struct { @@ -44,8 +43,8 @@ static void teardown (Fixture *fixture, gconstpointer data) { g_object_unref (fixture->remote_node); - ufo_daemon_stop (fixture->daemon); + g_object_unref (fixture->daemon); } @@ -58,9 +57,27 @@ test_remote_node_get_num_cpus (Fixture *fixture, g_assert (n_gpus > 0); } +static void +test_remote_node_get_structure (Fixture *fixture, + gconstpointer unused) +{ + UfoTaskMode mode; + UfoInputParam *in_params; + guint n_inputs; + ufo_remote_node_get_structure (fixture->remote_node, &n_inputs, &in_params, &mode); + g_message ("received n_inputs == %d", n_inputs); + g_assert (n_inputs == 1); + g_message ("received n_dims == %d", in_params->n_dims); + g_assert (in_params->n_dims == 2); + +} + void test_add_remote_node (void) { + g_test_add ("/remotenode/get_structure", + Fixture, NULL, + setup, test_remote_node_get_structure, teardown); g_test_add ("/remotenode/get_num_cpus", Fixture, NULL, setup, test_remote_node_get_num_cpus, teardown); diff --git a/tests/test-suite.c b/tests/test-suite.c index e6c813a..76a5eab 100644 --- a/tests/test-suite.c +++ b/tests/test-suite.c @@ -26,6 +26,7 @@ ignore_log (const gchar *domain, const gchar *message, gpointer data) { + // g_print ("%s\n",message); } int main(int argc, char *argv[]) @@ -41,8 +42,8 @@ int main(int argc, char *argv[]) test_add_config (); test_add_graph (); test_add_profiler (); + test_add_zmq_messenger (); test_add_remote_node (); - g_test_run(); return 0; diff --git a/tests/test-suite.h b/tests/test-suite.h index cc54ddb..afe699a 100644 --- a/tests/test-suite.h +++ b/tests/test-suite.h @@ -6,5 +6,6 @@ void test_add_config (void); void test_add_graph (void); void test_add_profiler (void); void test_add_remote_node (void); +void test_add_zmq_messenger (void); #endif diff --git a/tests/test-zmq-messenger.c b/tests/test-zmq-messenger.c new file mode 100644 index 0000000..84576f7 --- /dev/null +++ b/tests/test-zmq-messenger.c @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <ufo/ufo.h> +#include "test-suite.h" + +typedef struct { + gchar *addr; +} Fixture; + +static void +setup (Fixture *fixture, gconstpointer data) +{ + fixture->addr = g_strdup ("tcp://127.0.0.1:5555"); +} + +static void +teardown (Fixture *fixture, gconstpointer data) +{ + g_free (fixture->addr); +} + +static void send_num_devices_request (gpointer unused) +{ + UfoMessenger *msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + gchar *addr = g_strdup ("tcp://127.0.0.1:5555"); + ufo_messenger_connect (msger, addr, UFO_MESSENGER_CLIENT); + + guint x = 0; + while (x++ < 10) { + UfoMessage *request = ufo_message_new (UFO_MESSAGE_GET_NUM_DEVICES, 0); + UfoMessage *response; + + response = ufo_messenger_send_blocking (msger, request, NULL); + + guint16 num_devices = *(guint16 *) response->data; + g_assert (num_devices == x); + + ufo_message_free (request); + ufo_message_free (response); + } + ufo_zmq_messenger_disconnect (msger); + g_object_unref (msger); +} + +static void handle_num_devices (gpointer unused) +{ + UfoMessenger *msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + gchar *addr = g_strdup ("tcp://127.0.0.1:5555"); + ufo_messenger_connect (msger, addr, UFO_MESSENGER_SERVER); + + guint16 x = 0; + GError *err = NULL; + while (x++ < 10) { + UfoMessage *msg = ufo_messenger_recv_blocking (UFO_MESSENGER (msger), &err); + if (err != NULL) + g_critical ("%s", err->message); + + UfoMessage *resp; + switch (msg->type) { + case UFO_MESSAGE_GET_NUM_DEVICES: + resp = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); + *(guint16 *)resp->data = x; + ufo_zmq_messenger_send_blocking (msger, resp, NULL); + ufo_message_free (resp); + break; + default: + g_critical ("Unexpected message type: %d", msg->type); + break; + } + ufo_message_free (msg); + }; + + ufo_zmq_messenger_disconnect (msger); + g_object_unref (msger); +} + +static void test_zmq_messenger (Fixture *fixture, gconstpointer unused) +{ + GThread *server = g_thread_create ((GThreadFunc) handle_num_devices, NULL, TRUE, NULL); + GThread *client = g_thread_create ((GThreadFunc) send_num_devices_request, NULL, TRUE, NULL); + + g_thread_join (client); + g_thread_join (server); +} + + +void +test_add_zmq_messenger (void) +{ + g_test_add ("/zmq_messenger/test_messenger", + Fixture, NULL, + setup, test_zmq_messenger, teardown); +} diff --git a/tools/ufod.c b/tools/ufod.c index 1de2dd1..d7f29b7 100644 --- a/tools/ufod.c +++ b/tools/ufod.c @@ -24,7 +24,6 @@ #else #include <CL/cl.h> #endif -#include <zmq.h> #include <signal.h> #include <stdlib.h> #include <string.h> diff --git a/ufo/CMakeLists.txt b/ufo/CMakeLists.txt index 30b246d..1124687 100644 --- a/ufo/CMakeLists.txt +++ b/ufo/CMakeLists.txt @@ -15,6 +15,7 @@ set(ufocore_SRCS ufo-graph.c ufo-group.c ufo-input-task.c + ufo-messenger-iface.c ufo-node.c ufo-output-task.c ufo-plugin-manager.c @@ -27,6 +28,7 @@ set(ufocore_SRCS ufo-task-graph.c ufo-task-node.c ufo-basic-ops.c + ufo-zmq-messenger.c ) #}}} @@ -45,6 +47,7 @@ set(ufocore_HDRS ufo-graph.h ufo-group.h ufo-input-task.h + ufo-messenger-iface.h ufo-node.h ufo-output-task.h ufo-plugin-manager.h @@ -57,6 +60,7 @@ set(ufocore_HDRS ufo-task-graph.h ufo-task-node.h ufo-basic-ops.h + ufo-zmq-messenger.h ) #}}} diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index 3d311bb..0e46c6f 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -33,6 +33,8 @@ #include <ufo/ufo-plugin-manager.h> #include <ufo/ufo-scheduler.h> #include <ufo/ufo-task-graph.h> +#include <ufo/ufo-zmq-messenger.h> +#include <ufo/ufo-messenger-iface.h> #include "zmq-shim.h" @@ -40,8 +42,6 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT) #define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate)) -#define CHECK_ZMQ(r) if (r == -1) g_warning ("%s:%i: zmq_error: %s\n", __FILE__, __LINE__, zmq_strerror (errno)); - struct _UfoDaemonPrivate { UfoConfig *config; UfoPluginManager *manager; @@ -58,10 +58,10 @@ struct _UfoDaemonPrivate { gboolean has_started; GMutex *started_lock; GCond *started_cond; + UfoMessenger *msger; }; static gpointer run_scheduler (UfoDaemon *daemon); -static void validate_zmq_listen_address (gchar *addr); UfoDaemon * ufo_daemon_new (UfoConfig *config, gchar *listen_address) @@ -71,8 +71,6 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address) g_return_val_if_fail (listen_address != NULL, NULL); g_return_val_if_fail (config != NULL, NULL); - validate_zmq_listen_address (listen_address); - daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL)); UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); @@ -80,68 +78,31 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address) priv->listen_address = listen_address; priv->manager = ufo_plugin_manager_new (priv->config); priv->scheduler = ufo_scheduler_new (priv->config, NULL); - priv->context = zmq_ctx_new (); - priv->socket = zmq_socket (priv->context, ZMQ_REP); - - int err = zmq_bind (priv->socket, listen_address); - if (err < 0) - g_critical ("could not bind to address %s", listen_address); + priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); return daemon; } static void -validate_zmq_listen_address (gchar *addr) -{ - if (!g_str_has_prefix (addr, "tcp://")) - g_critical ("address didn't start with tcp:// scheme, which is required currently"); - - /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555 - * but not bind to it as it treats it like an interface name (like eth0). - * We have to use IP addresses instead of DNS names. - */ - gchar *host = g_strdup (&addr[6]); - if (!g_ascii_isdigit (host[0]) && host[0] != '*') - g_warning ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host); - g_free (host); -} - -static void -ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags) -{ - zmq_msg_t reply; - - zmq_msg_init_size (&reply, sizeof (UfoMessage)); - memcpy (zmq_msg_data (&reply), msg, sizeof (UfoMessage)); - zmq_msg_send (&reply, socket, flags); - zmq_msg_close (&reply); -} - -static void -send_ack (gpointer socket) -{ - UfoMessage msg; - msg.type = UFO_MESSAGE_ACK; - ufo_msg_send (&msg, socket, 0); -} - -static void handle_get_num_devices (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - - UfoMessage msg; cl_context context; + UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); + cl_uint *num_devices = g_malloc (sizeof (cl_uint)); context = ufo_scheduler_get_context (priv->scheduler); UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), - &msg.d.n_devices, + num_devices, NULL)); - ufo_msg_send (&msg, priv->socket, 0); + *(guint16 *) msg->data = (guint16) *num_devices; + + ufo_messenger_send_blocking (priv->msger, msg, 0); + ufo_message_free (msg); } static UfoNode * @@ -166,34 +127,30 @@ remove_dummy_if_present (UfoGraph *graph, } static gchar * -read_json (UfoDaemon *daemon) +read_json (UfoDaemon *daemon, UfoMessage *msg) { - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - - zmq_msg_t json_msg; - gsize size; gchar *json; - zmq_msg_init (&json_msg); - size = (gsize) zmq_msg_recv (&json_msg, priv->socket, 0); - - json = g_malloc0 (size + 1); - memcpy (json, zmq_msg_data (&json_msg), size); - zmq_msg_close (&json_msg); + json = g_malloc0 (msg->data_size + 1); + memcpy (json, msg->data, msg->data_size); return json; } static void -handle_replicate_json (UfoDaemon *daemon) +handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); gchar *json; UfoTaskGraph *graph; GError *error = NULL; - json = read_json (daemon); - send_ack (priv->socket); + json = read_json (daemon, msg); + + // send ack + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); ufo_task_graph_read_from_data (graph, priv->manager, json, &error); @@ -203,10 +160,7 @@ handle_replicate_json (UfoDaemon *daemon) goto replicate_json_free; } - g_message ("Start scheduler"); ufo_scheduler_run (priv->scheduler, graph, NULL); - - g_message ("Done"); g_object_unref (priv->scheduler); priv->scheduler = ufo_scheduler_new (priv->config, NULL); @@ -217,7 +171,7 @@ replicate_json_free: } static void -handle_stream_json (UfoDaemon *daemon) +handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); gchar *json; @@ -227,7 +181,11 @@ handle_stream_json (UfoDaemon *daemon) UfoNode *last; GError *error = NULL; - json = read_json (daemon); + json = read_json (daemon, msg); + // send ack + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); /* Setup local task graph */ priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); @@ -263,79 +221,65 @@ handle_stream_json (UfoDaemon *daemon) g_thread_create ((GThreadFunc) run_scheduler, daemon, FALSE, NULL); g_free (json); - send_ack (priv->socket); -} - -static void -handle_setup (UfoDaemon *daemon) -{ - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - g_message ("Setup requested"); - send_ack (priv->socket); } static void handle_get_structure (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoMessage header; - UfoInputParam in_param; - zmq_msg_t data_msg; + UfoMessage *response; - g_message ("Structure requested"); - header.type = UFO_MESSAGE_STRUCTURE; + /* TODO move into .h and share between daemon and remote-node */ + struct _Structure { + guint16 n_inputs; + guint16 n_dims; + } msg_data; - /* TODO: do not hardcode these */ - header.d.n_inputs = 1; - in_param.n_dims = 2; + /* TODO don't hardcode these */ + msg_data.n_inputs = 1; + msg_data.n_dims = 2; - zmq_msg_init_size (&data_msg, sizeof (UfoInputParam)); - memcpy (zmq_msg_data (&data_msg), &in_param, sizeof (UfoInputParam)); + response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure)); + *(struct _Structure *) (response->data) = msg_data; - ufo_msg_send (&header, priv->socket, ZMQ_SNDMORE); - zmq_msg_send (&data_msg, priv->socket, 0); - zmq_msg_close (&data_msg); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); } static void -handle_send_inputs (UfoDaemon *daemon) +handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoRequisition *requisition; - zmq_msg_t requisition_msg; - zmq_msg_t data_msg; + UfoRequisition requisition; gpointer context; context = ufo_scheduler_get_context (priv->scheduler); - /* Receive buffer size */ - zmq_msg_init (&requisition_msg); - zmq_msg_recv (&requisition_msg, priv->socket, 0); - g_assert (zmq_msg_size (&requisition_msg) >= sizeof (UfoRequisition)); - requisition = zmq_msg_data (&requisition_msg); + struct _Header { + UfoRequisition requisition; + guint64 buffer_size; + }; + gpointer base = request->data; + struct _Header *header = (struct _Header *) base; + + /* Receive buffer size */ + requisition = header->requisition; if (priv->input == NULL) { - priv->input = ufo_buffer_new (requisition, context); + priv->input = ufo_buffer_new (&requisition, context); } else { - if (ufo_buffer_cmp_dimensions (priv->input, requisition)) - ufo_buffer_resize (priv->input, requisition); + if (ufo_buffer_cmp_dimensions (priv->input, &requisition)) + ufo_buffer_resize (priv->input, &requisition); } - - zmq_msg_close (&requisition_msg); - - /* Receive actual buffer */ - zmq_msg_init (&data_msg); - zmq_msg_recv (&data_msg, priv->socket, 0); - memcpy (ufo_buffer_get_host_array (priv->input, NULL), - zmq_msg_data (&data_msg), + base + sizeof (struct _Header), ufo_buffer_get_size (priv->input)); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); - zmq_msg_close (&data_msg); - send_ack (priv->socket); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); } static void @@ -343,17 +287,15 @@ handle_get_requisition (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoRequisition requisition; - zmq_msg_t reply_msg; /* We need to get the requisition from the last node */ - g_message ("Requisition requested"); ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task), &requisition); - zmq_msg_init_size (&reply_msg, sizeof (UfoRequisition)); - memcpy (zmq_msg_data (&reply_msg), &requisition, sizeof (UfoRequisition)); - zmq_msg_send (&reply_msg, priv->socket, 0); - zmq_msg_close (&reply_msg); + UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition)); + memcpy (msg->data, &requisition, msg->data_size); + ufo_messenger_send_blocking (priv->msger, msg, NULL); + ufo_message_free (msg); } static @@ -361,16 +303,14 @@ void handle_get_result (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoBuffer *buffer; - zmq_msg_t reply_msg; gsize size; buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task)); size = ufo_buffer_get_size (buffer); - zmq_msg_init_size (&reply_msg, size); - memcpy (zmq_msg_data (&reply_msg), ufo_buffer_get_host_array (buffer, NULL), size); - zmq_msg_send (&reply_msg, priv->socket, 0); - zmq_msg_close (&reply_msg); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size); + memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size); + ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer); } @@ -392,7 +332,9 @@ void handle_cleanup (UfoDaemon *daemon) * We send the ACK early on, because we don't want to let the host wait for * actually cleaning up (and waiting some time to unref the input task). */ - send_ack (priv->socket); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); // TODO check that we don't need to execture this branch wen priv->input is null if (priv->input_task && priv->input) { @@ -410,6 +352,18 @@ void handle_cleanup (UfoDaemon *daemon) unref_and_free ((GObject **) &priv->task_graph); } +static void +handle_terminate (UfoDaemon *daemon) +{ + UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); + + priv->run = FALSE; + ufo_messenger_disconnect (priv->msger); +} + static gpointer run_scheduler (UfoDaemon *daemon) { @@ -430,48 +384,39 @@ ufo_daemon_start_impl (UfoDaemon *daemon) UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); while (priv->run) { - zmq_msg_t request; + // zmq_msg_t request; - zmq_msg_init (&request); + // zmq_msg_init (&request); g_mutex_lock (priv->started_lock); priv->has_started = TRUE; g_cond_signal (priv->started_cond); g_mutex_unlock (priv->started_lock); - gint err = zmq_msg_recv (&request, priv->socket, 0); + // gint err = zmq_msg_recv (&request, priv->socket, 0); /* if daemon is stopped, socket will be closed and msg_recv * will yield an error - we simply want to return */ - if (err < 0) return; - - if (zmq_msg_size (&request) < sizeof (UfoMessage)) { - g_warning ("Message is smaller than expected\n"); - send_ack (priv->socket); - } - else { - UfoMessage *msg; - - msg = (UfoMessage *) zmq_msg_data (&request); + GError *err = NULL; + UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err); + if (err != NULL) { + } else { switch (msg->type) { case UFO_MESSAGE_GET_NUM_DEVICES: handle_get_num_devices (daemon); break; case UFO_MESSAGE_STREAM_JSON: - handle_stream_json (daemon); + handle_stream_json (daemon, msg); break; case UFO_MESSAGE_REPLICATE_JSON: - handle_replicate_json (daemon); - break; - case UFO_MESSAGE_SETUP: - handle_setup (daemon); + handle_replicate_json (daemon, msg); break; case UFO_MESSAGE_GET_STRUCTURE: handle_get_structure (daemon); break; case UFO_MESSAGE_SEND_INPUTS: - handle_send_inputs (daemon); + handle_send_inputs (daemon, msg); break; case UFO_MESSAGE_GET_REQUISITION: handle_get_requisition (daemon); @@ -482,11 +427,14 @@ ufo_daemon_start_impl (UfoDaemon *daemon) case UFO_MESSAGE_CLEANUP: handle_cleanup (daemon); break; + case UFO_MESSAGE_TERMINATE: + handle_terminate (daemon); + break; default: - g_print ("unhandled case\n"); + g_message ("Unknown message received\n"); } } - zmq_msg_close (&request); + ufo_message_free (msg); } } @@ -495,6 +443,9 @@ ufo_daemon_start (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + /* TODO handle error if unable to connect/bind */ + ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER); + priv->run = TRUE; priv->thread = g_thread_create ((GThreadFunc)ufo_daemon_start_impl, daemon, TRUE, NULL); g_return_if_fail (priv->thread != NULL); @@ -511,15 +462,15 @@ void ufo_daemon_stop (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - priv->run = FALSE; - - zmq_close (priv->socket); - if (priv->context != NULL) { - zmq_ctx_destroy (priv->context); - priv->context = NULL; - g_assert (priv->context == NULL); - } + /* HACK we can't call _disconnect() as this has to be run from the + * thread running the daemon - we thus send a TERMINATE message to + * that thread + */ + UfoMessenger *tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT); + UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0); + ufo_messenger_send_blocking (tmp_msger, request, NULL); g_thread_join (priv->thread); } @@ -534,18 +485,13 @@ ufo_daemon_dispose (GObject *object) g_object_unref (priv->task_graph); if (priv->config != NULL) g_object_unref (priv->config); + if (priv->msger != NULL) + g_object_unref (priv->msger); if (priv->manager != NULL) g_object_unref (priv->manager); if (priv->scheduler != NULL) g_object_unref (priv->scheduler); - zmq_close (priv->socket); - - if (priv->context != NULL) { - zmq_ctx_destroy (priv->context); - priv->context = NULL; - } - G_OBJECT_CLASS (ufo_daemon_parent_class)->dispose (object); } diff --git a/ufo/ufo-messenger-iface.c b/ufo/ufo-messenger-iface.c new file mode 100644 index 0000000..26ed2a8 --- /dev/null +++ b/ufo/ufo-messenger-iface.c @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + + #include <ufo/ufo-messenger-iface.h> + +typedef UfoMessengerIface UfoMessengerInterface; + +void +ufo_message_free (UfoMessage *msg) +{ + if (msg == NULL) + return; + g_free (msg->data); + g_free (msg); +} + +UfoMessage * +ufo_message_new (UfoMessageType type, guint64 data_size) +{ + UfoMessage *msg = g_malloc (sizeof (UfoMessage)); + msg->type = type; + msg->data_size = data_size; + if (data_size == 0) + msg->data = NULL; + else + msg->data = g_malloc (data_size); + return msg; +} + +G_DEFINE_INTERFACE (UfoMessenger, ufo_messenger, G_TYPE_OBJECT) + +/** + * UfoTaskError: + * @UFO_TASK_ERROR_SETUP: Error during setup of a task. + */ +GQuark +ufo_messenger_error_quark () +{ + return g_quark_from_static_string ("ufo-messenger-error-quark"); +} + +void +ufo_messenger_connect (UfoMessenger *msger, + gchar *addr, + UfoMessengerRole role) +{ + UFO_MESSENGER_GET_IFACE (msger)->connect (msger, addr, role); +} + +void +ufo_messenger_disconnect (UfoMessenger *msger) +{ + UFO_MESSENGER_GET_IFACE (msger)->disconnect (msger); +} + +UfoMessage * +ufo_messenger_send_blocking (UfoMessenger *msger, + UfoMessage *request, + GError **error) +{ + return UFO_MESSENGER_GET_IFACE (msger)->send_blocking (msger, request, error); +} + +UfoMessage * +ufo_messenger_recv_blocking (UfoMessenger *msger, + GError **error) +{ + return UFO_MESSENGER_GET_IFACE (msger)->recv_blocking (msger, error); +} + +static void +ufo_messenger_default_init (UfoMessengerInterface *iface) +{ +} diff --git a/ufo/ufo-messenger-iface.h b/ufo/ufo-messenger-iface.h new file mode 100644 index 0000000..f6d9271 --- /dev/null +++ b/ufo/ufo-messenger-iface.h @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef UFO_MESSENGER_H +#define UFO_MESSENGER_H + +#if !defined (__UFO_H_INSIDE__) && !defined (UFO_COMPILATION) +#error "Only <ufo/ufo.h> can be included directly." +#endif + +#include <ufo/ufo-remote-node.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_MESSENGER (ufo_messenger_get_type()) +#define UFO_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_MESSENGER, UfoMessenger)) +#define UFO_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_MESSENGER, UfoMessengerIface)) +#define UFO_IS_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_MESSENGER)) +#define UFO_IS_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_MESSENGER)) +#define UFO_MESSENGER_GET_IFACE(inst) (G_TYPE_INSTANCE_GET_INTERFACE((inst), UFO_TYPE_MESSENGER, UfoMessengerIface)) + +#define UFO_MESSENGER_ERROR ufo_messenger_error_quark() + +typedef struct _UfoMessenger UfoMessenger; +typedef struct _UfoMessengerIface UfoMessengerIface; +typedef struct _UfoMessage UfoMessage; + + +/** + * UfoMessageType: (skip) + * @UFO_MESSAGE_STREAM_JSON: insert + * @UFO_MESSAGE_REPLICATE_JSON: insert + * @UFO_MESSAGE_GET_NUM_DEVICES: insert + * @UFO_MESSAGE_SETUP: insert + * @UFO_MESSAGE_GET_STRUCTURE: insert + * @UFO_MESSAGE_STRUCTURE: insert + * @UFO_MESSAGE_GET_REQUISITION: insert + * @UFO_MESSAGE_REQUISITION: insert + * @UFO_MESSAGE_SEND_INPUTS: insert + * @UFO_MESSAGE_GET_RESULT: insert + * @UFO_MESSAGE_RESULT: insert + * @UFO_MESSAGE_CLEANUP: insert + * @UFO_MESSAGE_ACK: insert + */ +typedef enum { + UFO_MESSAGE_STREAM_JSON = 0, + UFO_MESSAGE_REPLICATE_JSON, + UFO_MESSAGE_GET_NUM_DEVICES, + UFO_MESSAGE_GET_STRUCTURE, + UFO_MESSAGE_STRUCTURE, + UFO_MESSAGE_GET_REQUISITION, + UFO_MESSAGE_REQUISITION, + UFO_MESSAGE_SEND_INPUTS, + UFO_MESSAGE_GET_RESULT, + UFO_MESSAGE_RESULT, + UFO_MESSAGE_CLEANUP, + UFO_MESSAGE_TERMINATE, + UFO_MESSAGE_ACK +} UfoMessageType; + +/** + * UfoMessage: (skip) + * @type: Type of the wire message + */ +struct _UfoMessage { + UfoMessageType type; + guint64 data_size; + gpointer data; +}; + +void ufo_message_free (UfoMessage *msg); +UfoMessage * ufo_message_new (UfoMessageType type, guint64 data_size); + +typedef enum { + UFO_MESSENGER_BUFFER_FULL, + UFO_MESSENGER_SIZE_MISSMATCH +} UfoMessengerError; + +typedef enum { + UFO_MESSENGER_CLIENT, + UFO_MESSENGER_SERVER +} UfoMessengerRole; + +struct _UfoMessengerIface { + /*< private >*/ + GTypeInterface parent_iface; + + void (*connect) (UfoMessenger *msger, + gchar *addr, + UfoMessengerRole role); + + void (*disconnect) (UfoMessenger *msger); + + UfoMessage * (*send_blocking) (UfoMessenger *msger, + UfoMessage *request, + GError **error); + + UfoMessage * (*recv_blocking) (UfoMessenger *msger, + GError **error); +}; + + +void ufo_messenger_connect (UfoMessenger *msger, + gchar *addr, + UfoMessengerRole role); + +void ufo_messenger_disconnect (UfoMessenger *msger); + +UfoMessage *ufo_messenger_send_blocking (UfoMessenger *msger, + UfoMessage *request, + GError **error); + +UfoMessage *ufo_messenger_recv_blocking (UfoMessenger *msger, + GError **error); + +GQuark ufo_messenger_error_quark (void); +GType ufo_messenger_get_type (void); + +G_END_DECLS + +#endif diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c index e72f6f9..dcf4fdc 100644 --- a/ufo/ufo-remote-node.c +++ b/ufo/ufo-remote-node.c @@ -19,6 +19,8 @@ #include <string.h> #include <ufo/ufo-remote-node.h> +#include <ufo/ufo-messenger-iface.h> +#include <ufo/ufo-zmq-messenger.h> #include "zmq-shim.h" @@ -26,14 +28,12 @@ G_DEFINE_TYPE (UfoRemoteNode, ufo_remote_node, UFO_TYPE_NODE) #define UFO_REMOTE_NODE_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_REMOTE_NODE, UfoRemoteNodePrivate)) -static void ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags); -static void receive_ack (gpointer socket); - struct _UfoRemoteNodePrivate { gpointer context; gpointer socket; guint n_inputs; GMutex *mutex; + UfoMessenger *msger; }; UfoNode * @@ -44,67 +44,50 @@ ufo_remote_node_new (const gchar *address) g_return_val_if_fail (address != NULL, NULL); node = UFO_REMOTE_NODE (g_object_new (UFO_TYPE_REMOTE_NODE, NULL)); - priv = node->priv; - priv->socket = zmq_socket (priv->context, ZMQ_REQ); + priv = UFO_REMOTE_NODE_GET_PRIVATE (node); - if (zmq_connect (priv->socket, address) == 0) { - g_message ("Connected remote node to `%s' via socket=%p", - address, - priv->socket); - return UFO_NODE (node); - } - else { - g_warning ("Could not connect to `%s': %s", - address, - zmq_strerror (errno)); - g_object_unref (node); - return NULL; - } + priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + + gchar *addr = g_strdup (address); + ufo_messenger_connect (priv->msger, addr, UFO_MESSENGER_CLIENT); + g_free(addr); + + return UFO_NODE (node); } guint ufo_remote_node_get_num_gpus (UfoRemoteNode *node) { - UfoRemoteNodePrivate *priv; - UfoMessage request; - UfoMessage result; - zmq_msg_t reply; - g_return_val_if_fail (UFO_IS_REMOTE_NODE (node), 0); - priv = node->priv; - - g_mutex_lock (priv->mutex); - request.type = UFO_MESSAGE_GET_NUM_DEVICES; - ufo_msg_send (&request, priv->socket, 0); + UfoRemoteNodePrivate *priv; + UfoMessage *request = ufo_message_new (UFO_MESSAGE_GET_NUM_DEVICES, 0); - zmq_msg_init (&reply); - zmq_msg_recv (&reply, priv->socket, 0); - memcpy (&result, zmq_msg_data (&reply), sizeof (UfoMessage)); - zmq_msg_close (&reply); + priv = node->priv; - g_mutex_unlock (priv->mutex); + UfoMessage *result; + result = ufo_messenger_send_blocking (priv->msger, request, NULL); + guint n_devices = * (guint16 *) result->data; - return result.d.n_devices; + ufo_message_free (request); + ufo_message_free (result); + g_assert (n_devices < 32); + return n_devices; } void ufo_remote_node_request_setup (UfoRemoteNode *node) { - UfoRemoteNodePrivate *priv; - UfoMessage request; + // TODO setup isn't in use, remove it + //g_assert (FALSE); - g_return_if_fail (UFO_IS_REMOTE_NODE (node)); - - priv = node->priv; - request.type = UFO_MESSAGE_SETUP; - - g_mutex_lock (priv->mutex); + // g_return_if_fail (UFO_IS_REMOTE_NODE (node)); + // UfoRemoteNodePrivate *priv = UFO_REMOTE_NODE_GET_PRIVATE (node); - ufo_msg_send (&request, priv->socket, 0); - receive_ack (priv->socket); - - g_mutex_unlock (priv->mutex); + // UfoMessage *request; + // request = ufo_message_new (UFO_MESSAGE_SETUP, 0); + // ufo_message_send_blocking (request); + // ufo_message_free (request); } void @@ -113,36 +96,28 @@ ufo_remote_node_send_json (UfoRemoteNode *node, const gchar *json) { UfoRemoteNodePrivate *priv; - UfoMessage request; - gsize size; - zmq_msg_t json_msg; + UfoMessage *request; + guint64 size; g_return_if_fail (UFO_IS_REMOTE_NODE (node)); priv = node->priv; + UfoMessageType type; switch (mode) { case UFO_REMOTE_MODE_STREAM: - request.type = UFO_MESSAGE_STREAM_JSON; + type = UFO_MESSAGE_STREAM_JSON; break; case UFO_REMOTE_MODE_REPLICATE: - request.type = UFO_MESSAGE_REPLICATE_JSON; + type = UFO_MESSAGE_REPLICATE_JSON; break; } - g_mutex_lock (priv->mutex); - - ufo_msg_send (&request, priv->socket, ZMQ_SNDMORE); - - size = strlen (json); - zmq_msg_init_size (&json_msg, size); - memcpy (zmq_msg_data (&json_msg), json, size); - zmq_msg_send (&json_msg, priv->socket, 0); - zmq_msg_close (&json_msg); + size = (guint64) strlen (json); + request = ufo_message_new (type, size); - receive_ack (priv->socket); - - g_mutex_unlock (priv->mutex); + memcpy (request->data, json, size); + ufo_messenger_send_blocking (priv->msger, request, NULL); } void @@ -152,41 +127,33 @@ ufo_remote_node_get_structure (UfoRemoteNode *node, UfoTaskMode *mode) { UfoRemoteNodePrivate *priv; - UfoMessage request; - UfoMessage *header; - zmq_msg_t header_msg; - zmq_msg_t payload_msg; - UfoInputParam *in_param; - - g_return_if_fail (UFO_IS_REMOTE_NODE (node)); + UfoMessage *request, *response; priv = node->priv; + + struct _Structure { + guint16 n_inputs; + guint16 n_dims; + } msg_data; + + g_return_if_fail (UFO_IS_REMOTE_NODE (node)); *mode = UFO_TASK_MODE_PROCESSOR; - request.type = UFO_MESSAGE_GET_STRUCTURE; - g_mutex_lock (priv->mutex); - ufo_msg_send (&request, priv->socket, 0); + request = ufo_message_new (UFO_MESSAGE_GET_STRUCTURE, 0); + response = ufo_messenger_send_blocking (priv->msger, request, NULL); + g_assert (response->data_size == sizeof (struct _Structure)); - /* Receive header */ - zmq_msg_init (&header_msg); - zmq_msg_recv (&header_msg, priv->socket, 0); - header = (UfoMessage *) zmq_msg_data (&header_msg); + msg_data = *(struct _Structure *) response->data; - /* Receive payload */ - zmq_msg_init (&payload_msg); - zmq_msg_recv (&payload_msg, priv->socket, 0); - in_param = (UfoInputParam *) zmq_msg_data (&payload_msg); + priv->n_inputs = msg_data.n_inputs; + *n_inputs = msg_data.n_inputs; - priv->n_inputs = header->d.n_inputs; - *n_inputs = header->d.n_inputs; *in_params = g_new0 (UfoInputParam, 1); - (*in_params)[0].n_dims = in_param->n_dims; + (*in_params)[0].n_dims = msg_data.n_dims; - zmq_msg_close (&header_msg); - zmq_msg_close (&payload_msg); - - g_mutex_unlock (priv->mutex); + ufo_message_free (request); + ufo_message_free (response); } void @@ -194,48 +161,48 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node, UfoBuffer **inputs) { UfoRemoteNodePrivate *priv; - UfoMessage request; + UfoMessage *request; g_return_if_fail (UFO_IS_REMOTE_NODE (node)); priv = node->priv; - request.type = UFO_MESSAGE_SEND_INPUTS; - - g_mutex_lock (priv->mutex); - - ufo_msg_send (&request, priv->socket, ZMQ_SNDMORE); /* * For each of the input data items send two frames: the first one contains * the size as an UfoRequisition struct and the second one the raw byte * data. */ - for (guint i = 0; i < priv->n_inputs; i++) { + struct _Header { UfoRequisition requisition; - zmq_msg_t requisition_msg; - zmq_msg_t data_msg; - gsize size; - gint flags; + guint64 buffer_size; + }; - ufo_buffer_get_requisition (inputs[i], &requisition); - size = ufo_buffer_get_size (inputs[i]); - - zmq_msg_init_size (&requisition_msg, sizeof (UfoRequisition)); - zmq_msg_init_size (&data_msg, size); - - memcpy (zmq_msg_data (&requisition_msg), &requisition, sizeof (UfoRequisition)); - memcpy (zmq_msg_data (&data_msg), ufo_buffer_get_host_array (inputs[i], NULL), size); + // determine our total message size + guint64 size; + for (guint i = 0; i < priv->n_inputs; i++) { + guint64 buffer_size = ufo_buffer_get_size (inputs[i]); + size += buffer_size; + } + gpointer buffer = g_malloc (priv->n_inputs * sizeof (struct _Header) + size); - flags = i == priv->n_inputs - 1 ? 0 : ZMQ_SNDMORE; - zmq_msg_send (&requisition_msg, priv->socket, ZMQ_SNDMORE); - zmq_msg_send (&data_msg, priv->socket, flags); + gpointer base = buffer; - zmq_msg_close (&requisition_msg); - zmq_msg_close (&data_msg); + for (guint i = 0; i < priv->n_inputs; i++) { + struct _Header *header = g_new0 (struct _Header, 1); + ufo_buffer_get_requisition (inputs[i], &header->requisition); + header->buffer_size = (guint64) ufo_buffer_get_size (inputs[i]); + + memcpy (base, header, sizeof (struct _Header)); + base += sizeof (struct _Header); + memcpy (base, ufo_buffer_get_host_array (inputs[i], NULL), header->buffer_size); + base += header->buffer_size; } + request = ufo_message_new (UFO_MESSAGE_SEND_INPUTS, size); + g_free (request->data); + request->data = buffer; + // send as a single message + ufo_messenger_send_blocking (priv->msger, request, NULL); - receive_ack (priv->socket); - g_mutex_unlock (priv->mutex); } void @@ -243,31 +210,23 @@ ufo_remote_node_get_result (UfoRemoteNode *node, UfoBuffer *buffer) { UfoRemoteNodePrivate *priv; - UfoMessage request; - zmq_msg_t reply_msg; + UfoMessage *request, *response; gpointer host_array; g_return_if_fail (UFO_IS_REMOTE_NODE (node)); priv = node->priv; - request.type = UFO_MESSAGE_GET_RESULT; - - g_mutex_lock (priv->mutex); - - ufo_msg_send (&request, priv->socket, 0); - - /* Get the remote data and put it into our buffer */ - zmq_msg_init (&reply_msg); - zmq_msg_recv (&reply_msg, priv->socket, 0); + request = ufo_message_new (UFO_MESSAGE_GET_RESULT, 0); + response = ufo_messenger_send_blocking (priv->msger, request, NULL); ufo_buffer_discard_location (buffer); host_array = ufo_buffer_get_host_array (buffer, NULL); - g_assert (ufo_buffer_get_size (buffer) == zmq_msg_size (&reply_msg)); - memcpy (host_array, zmq_msg_data (&reply_msg), ufo_buffer_get_size (buffer)); + g_assert (ufo_buffer_get_size (buffer) == response->data_size); - zmq_msg_close (&reply_msg); + memcpy (host_array, response->data, ufo_buffer_get_size (buffer)); - g_mutex_unlock (priv->mutex); + ufo_message_free (request); + ufo_message_free (response); } void @@ -275,75 +234,45 @@ ufo_remote_node_get_requisition (UfoRemoteNode *node, UfoRequisition *requisition) { UfoRemoteNodePrivate *priv; - UfoMessage request; - zmq_msg_t reply_msg; + UfoMessage *request, *response; g_return_if_fail (UFO_IS_REMOTE_NODE (node)); priv = node->priv; - request.type = UFO_MESSAGE_GET_REQUISITION; - - g_mutex_lock (priv->mutex); + request = ufo_message_new (UFO_MESSAGE_GET_REQUISITION, 0); + response = ufo_messenger_send_blocking (priv->msger, request, NULL); - ufo_msg_send (&request, priv->socket, 0); + g_assert (response->data_size == sizeof (UfoRequisition)); + memcpy (requisition, response->data, sizeof (UfoRequisition)); - zmq_msg_init (&reply_msg); - zmq_msg_recv (&reply_msg, priv->socket, 0); - g_assert (zmq_msg_size (&reply_msg) >= sizeof (UfoRequisition)); - memcpy (requisition, zmq_msg_data (&reply_msg), sizeof (UfoRequisition)); - zmq_msg_close (&reply_msg); - - g_mutex_unlock (priv->mutex); + ufo_message_free(request); + ufo_message_free(response); } static void -cleanup_remote (gpointer socket) +cleanup_remote (UfoRemoteNodePrivate *priv) { - UfoMessage request; - - request.type = UFO_MESSAGE_CLEANUP; - - ufo_msg_send (&request, socket, 0); - receive_ack (socket); + UfoMessage *request = ufo_message_new (UFO_MESSAGE_CLEANUP, 0); + ufo_messenger_send_blocking (priv->msger, request, NULL); + ufo_message_free (request); } static void -ufo_msg_send (UfoMessage *msg, - gpointer socket, - gint flags) +terminate_remote (UfoRemoteNodePrivate *priv) { - zmq_msg_t request; - - zmq_msg_init_size (&request, sizeof (UfoMessage)); - memcpy (zmq_msg_data (&request), msg, sizeof (UfoMessage)); - zmq_msg_send (&request, socket, flags); - zmq_msg_close (&request); -} - -static void -receive_ack (gpointer socket) -{ - zmq_msg_t reply_msg; - - zmq_msg_init (&reply_msg); - zmq_msg_recv (&reply_msg, socket, 0); - zmq_msg_close (&reply_msg); + UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0); + ufo_messenger_send_blocking (priv->msger, request, NULL); + ufo_message_free (request); } static void ufo_remote_node_dispose (GObject *object) { UfoRemoteNodePrivate *priv; - priv = UFO_REMOTE_NODE_GET_PRIVATE (object); - if (priv->socket != NULL) { - cleanup_remote (priv->socket); - - g_debug ("Close socket=%p", priv->socket); - zmq_close (priv->socket); - priv->socket = NULL; - } + cleanup_remote (priv); + ufo_messenger_disconnect (priv->msger); G_OBJECT_CLASS (ufo_remote_node_parent_class)->dispose (object); } @@ -355,9 +284,9 @@ ufo_remote_node_finalize (GObject *object) priv = UFO_REMOTE_NODE_GET_PRIVATE (object); g_mutex_free (priv->mutex); - + if (priv->context != NULL) { - g_debug ("Destroy zmq_context=%p", priv->context); + g_debug ("RemoteNode destroying zmq_context=%p", priv->context); zmq_ctx_destroy (priv->context); priv->context = NULL; } diff --git a/ufo/ufo-remote-node.h b/ufo/ufo-remote-node.h index 1124ddd..c73ec00 100644 --- a/ufo/ufo-remote-node.h +++ b/ufo/ufo-remote-node.h @@ -27,6 +27,7 @@ #include <ufo/ufo-node.h> #include <ufo/ufo-buffer.h> #include <ufo/ufo-task-iface.h> +#include <ufo/ufo-messenger-iface.h> G_BEGIN_DECLS @@ -40,7 +41,6 @@ G_BEGIN_DECLS typedef struct _UfoRemoteNode UfoRemoteNode; typedef struct _UfoRemoteNodeClass UfoRemoteNodeClass; typedef struct _UfoRemoteNodePrivate UfoRemoteNodePrivate; -typedef struct _UfoMessage UfoMessage; /** * UfoRemoteNode: @@ -78,50 +78,6 @@ typedef enum { } UfoRemoteMode; -/** - * UfoMessageType: (skip) - * @UFO_MESSAGE_STREAM_JSON: insert - * @UFO_MESSAGE_REPLICATE_JSON: insert - * @UFO_MESSAGE_GET_NUM_DEVICES: insert - * @UFO_MESSAGE_SETUP: insert - * @UFO_MESSAGE_GET_STRUCTURE: insert - * @UFO_MESSAGE_STRUCTURE: insert - * @UFO_MESSAGE_GET_REQUISITION: insert - * @UFO_MESSAGE_REQUISITION: insert - * @UFO_MESSAGE_SEND_INPUTS: insert - * @UFO_MESSAGE_GET_RESULT: insert - * @UFO_MESSAGE_RESULT: insert - * @UFO_MESSAGE_CLEANUP: insert - * @UFO_MESSAGE_ACK: insert - */ -typedef enum { - UFO_MESSAGE_STREAM_JSON = 0, - UFO_MESSAGE_REPLICATE_JSON, - UFO_MESSAGE_GET_NUM_DEVICES, - UFO_MESSAGE_SETUP, - UFO_MESSAGE_GET_STRUCTURE, - UFO_MESSAGE_STRUCTURE, - UFO_MESSAGE_GET_REQUISITION, - UFO_MESSAGE_REQUISITION, - UFO_MESSAGE_SEND_INPUTS, - UFO_MESSAGE_GET_RESULT, - UFO_MESSAGE_RESULT, - UFO_MESSAGE_CLEANUP, - UFO_MESSAGE_ACK -} UfoMessageType; - -/** - * UfoMessage: (skip) - * @type: Type of the wire message - */ -struct _UfoMessage { - UfoMessageType type; - - union { - guint16 n_inputs; - guint16 n_devices; - } d; -}; UfoNode *ufo_remote_node_new (const gchar *address); guint ufo_remote_node_get_num_gpus (UfoRemoteNode *node); diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c new file mode 100644 index 0000000..83b167f --- /dev/null +++ b/ufo/ufo-zmq-messenger.c @@ -0,0 +1,305 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <ufo/ufo-zmq-messenger.h> +#include <zmq.h> +#include <string.h> + + +static void ufo_messenger_interface_init (UfoMessengerIface *iface); + +#define UFO_ZMQ_MESSENGER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerPrivate)) + +G_DEFINE_TYPE_WITH_CODE (UfoZmqMessenger, ufo_zmq_messenger, G_TYPE_OBJECT, + G_IMPLEMENT_INTERFACE (UFO_TYPE_MESSENGER, + ufo_messenger_interface_init)) + + +struct _UfoZmqMessengerPrivate { + gchar *remote_addr; + GMutex *mutex; + gpointer zmq_socket; + gpointer zmq_ctx; + UfoMessengerRole role; +}; + +/* C99 allows flexible length structs that we use to map +* arbitrary frame lengths that are transferred via zmq. +* Note: Sizes of datatypes should be fixed and equal on all plattforms +* (i.e. don't use a gsize as it has different size on x86 & x86_64) +*/ +typedef struct _DataFrame { + UfoMessageType type; + guint64 data_size; + // variable length data field + char data[]; +} DataFrame; + +UfoZmqMessenger * +ufo_zmq_messenger_new (void) +{ + UfoZmqMessenger *msger; + msger = UFO_ZMQ_MESSENGER (g_object_new (UFO_TYPE_ZMQ_MESSENGER, NULL)); + + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + priv->zmq_ctx = zmq_ctx_new (); + + return msger; +} + +static void +validate_zmq_listen_address (gchar *addr) +{ + if (!g_str_has_prefix (addr, "tcp://")) + g_critical ("address didn't start with tcp:// scheme, which is required currently"); + + /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555 + * but not bind to it as it treats it like an interface name (like eth0). + * We have to use IP addresses instead of DNS names. + */ + gchar *host = g_strdup (&addr[6]); + if (!g_ascii_isdigit (host[0]) && host[0] != '*') + g_message ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host); + g_free (host); +} + +void +ufo_zmq_messenger_connect (UfoMessenger *msger, gchar *addr, UfoMessengerRole role) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + g_mutex_lock (priv->mutex); + + priv->remote_addr = g_strdup (addr); + priv->role = role; + + if (role == UFO_MESSENGER_CLIENT) { + priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REQ); + + if (zmq_connect (priv->zmq_socket, priv->remote_addr) == 0) { + g_message ("Connected to `%s' via socket=%p", + priv->remote_addr, + priv->zmq_socket); + } + else { + g_warning ("Could not connect to `%s': %s", + addr, + zmq_strerror (errno)); + } + } else if (role == UFO_MESSENGER_SERVER) { + validate_zmq_listen_address (priv->remote_addr); + priv->zmq_socket = zmq_socket (priv->zmq_ctx, ZMQ_REP); + + gint err = zmq_bind (priv->zmq_socket, priv->remote_addr); + if (err < 0) + g_critical ("could not bind to address %s", priv->remote_addr); + } + + g_mutex_unlock (priv->mutex); + return; +} + +void +ufo_zmq_messenger_disconnect (UfoMessenger *msger) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + + g_mutex_lock (priv->mutex); + + if (priv->zmq_socket != NULL) { + zmq_close (priv->zmq_socket); + priv->zmq_socket = NULL; + + // waits for outstanding messages to be flushed + zmq_term (priv->zmq_ctx); + g_free (priv->remote_addr); + } + + g_mutex_unlock (priv->mutex); + return; +} + +UfoMessage * +ufo_zmq_messenger_send_blocking (UfoMessenger *msger, + UfoMessage *request_msg, + GError **error) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + + if (request_msg->type == UFO_MESSAGE_ACK && priv->role == UFO_MESSENGER_CLIENT) + g_critical ("Clients can't send ACK messages"); + + g_mutex_lock (priv->mutex); + + UfoMessage *result = NULL; + zmq_msg_t request; + + gsize frame_size = sizeof (DataFrame) + request_msg->data_size; + zmq_msg_init_size (&request, frame_size); + DataFrame *frame = (DataFrame *) zmq_msg_data (&request); + + frame->data_size = request_msg->data_size; + frame->type = request_msg->type; + memcpy (frame->data, request_msg->data, request_msg->data_size); + + gint err = zmq_msg_send (&request, priv->zmq_socket, 0); + zmq_msg_close (&request); + + if (err < 0) { + g_set_error (error, ufo_messenger_error_quark (), zmq_errno (), + "Error sending message via %s: %s", + priv->remote_addr, zmq_strerror (zmq_errno ())); + goto finalize; + } + + /* if this is an ACK message, don't expect a response + * (send_blocking is then most likely being called by the server) + */ + if (request_msg->type == UFO_MESSAGE_ACK) { + goto finalize; + } + + /* we always need to receive as response as ZMQ + * requires REQ/REP/REQ/REP/... scheme + */ + zmq_msg_t reply; + zmq_msg_init (&reply); + + gint size = zmq_msg_recv (&reply, priv->zmq_socket, 0); + if (size < 0) { + g_set_error (error, ufo_messenger_error_quark (), zmq_errno(), + "Could not receive from %s: %s ", priv->remote_addr, + zmq_strerror (zmq_errno ())); + goto finalize; + } + + DataFrame *resp_frame = (DataFrame *) zmq_msg_data (&reply); + + guint64 expected_size = (guint32) (sizeof (DataFrame) + resp_frame->data_size); + if ((guint64) size != expected_size) { + g_set_error (error, ufo_messenger_error_quark(), + UFO_MESSENGER_SIZE_MISSMATCH, + "Received unexpected frame size: %d", size); + goto finalize; + } + + UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size); + memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size); + + zmq_msg_close (&reply); + result = reply_msg; + goto finalize; + + finalize: + g_mutex_unlock (priv->mutex); + return result; + +} + +UfoMessage * +ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, + GError **error) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + g_assert (priv->role == UFO_MESSENGER_SERVER); + + g_mutex_lock (priv->mutex); + + UfoMessage *result; + zmq_msg_t reply; + zmq_msg_init (&reply); + gint size = zmq_msg_recv (&reply, priv->zmq_socket, 0); + + if (size < 0) { + zmq_msg_close (&reply); + g_set_error (error, ufo_messenger_error_quark(), zmq_errno(), + "Could not receive from %s: %s ", priv->remote_addr, + zmq_strerror (zmq_errno ())); + goto finalize; + } + + DataFrame *frame = zmq_msg_data (&reply); + + guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size); + if ((guint)size != expected_size) { + g_set_error (error, ufo_messenger_error_quark(), + UFO_MESSENGER_SIZE_MISSMATCH, + "Received unexpected frame size: %d, should be: %d", + size, expected_size); + goto finalize; + } + + UfoMessage *msg = ufo_message_new (frame->type, frame->data_size); + memcpy (msg->data, frame->data, frame->data_size); + + zmq_msg_close (&reply); + result = msg; + goto finalize; + + finalize: + g_mutex_unlock (priv->mutex); + return result; +} + +static void +ufo_messenger_interface_init (UfoMessengerIface *iface) +{ + iface->connect = ufo_zmq_messenger_connect; + iface->disconnect = ufo_zmq_messenger_disconnect; + iface->send_blocking = ufo_zmq_messenger_send_blocking; + iface->recv_blocking = ufo_zmq_messenger_recv_blocking; +} + + +static void +ufo_zmq_messenger_dispose (GObject *object) +{ + ufo_zmq_messenger_disconnect (UFO_MESSENGER (object)); +} + +static void +ufo_zmq_messenger_finalize (GObject *object) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (object); + + if (priv->zmq_ctx != NULL) { + zmq_ctx_destroy (priv->zmq_ctx); + priv->zmq_ctx = NULL; + } + + g_mutex_free (priv->mutex); +} + +static void +ufo_zmq_messenger_class_init (UfoZmqMessengerClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + oclass->dispose = ufo_zmq_messenger_dispose; + oclass->finalize = ufo_zmq_messenger_finalize; + + g_type_class_add_private (klass, sizeof(UfoZmqMessengerPrivate)); +} + +static void +ufo_zmq_messenger_init (UfoZmqMessenger *msger) +{ + UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + priv->zmq_socket = NULL; + priv->zmq_ctx = NULL; + priv->mutex = g_mutex_new (); +} diff --git a/ufo/ufo-zmq-messenger.h b/ufo/ufo-zmq-messenger.h new file mode 100644 index 0000000..19462cb --- /dev/null +++ b/ufo/ufo-zmq-messenger.h @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ZMQ_MESSENGER_H +#define __UFO_ZMQ_MESSENGER_H + +#include <ufo/ufo-remote-node.h> +#include <ufo/ufo-messenger-iface.h> +#include <glib-object.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_ZMQ_MESSENGER (ufo_zmq_messenger_get_type()) +#define UFO_ZMQ_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessenger)) +#define UFO_IS_ZMQ_MESSENGER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_MESSENGER)) +#define UFO_ZMQ_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerClass)) +#define UFO_IS_ZMQ_MESSENGER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_MESSENGER)) +#define UFO_ZMQ_MESSENGER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_MESSENGER, UfoZmqMessengerClass)) + +typedef struct _UfoZmqMessenger UfoZmqMessenger; +typedef struct _UfoZmqMessengerClass UfoZmqMessengerClass; +typedef struct _UfoZmqMessengerPrivate UfoZmqMessengerPrivate; + +struct _UfoZmqMessenger { + /*< private >*/ + GObject parent_instance; + + UfoZmqMessengerPrivate *priv; +}; + +struct _UfoZmqMessengerClass { + /*< private >*/ + GObjectClass parent_class; +}; + +UfoZmqMessenger *ufo_zmq_messenger_new (void); +GType ufo_zmq_messenger_get_type (void); + +void ufo_zmq_messenger_connect (UfoMessenger *msger, + gchar *addr, + UfoMessengerRole role); + +void ufo_zmq_messenger_disconnect (UfoMessenger *msg); + +UfoMessage *ufo_zmq_messenger_send_blocking (UfoMessenger *msger, + UfoMessage *request, + GError **error); + +UfoMessage *ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, + GError **error); + +G_END_DECLS + +#endif @@ -48,6 +48,8 @@ #include <ufo/ufo-task-iface.h> #include <ufo/ufo-task-node.h> #include <ufo/ufo-basic-ops.h> +#include <ufo/ufo-messenger-iface.h> +#include <ufo/ufo-zmq-messenger.h> #undef __UFO_H_INSIDE__ |