summaryrefslogtreecommitdiff
path: root/ufo/ufo-daemon.c
diff options
context:
space:
mode:
authorTimo Dörr <timo@latecrew.de>2013-10-04 17:18:41 +0200
committerTimo Dörr <timo@latecrew.de>2013-10-23 14:08:07 +0200
commit921d9015736205bb03a02985f6046d931cdd26c9 (patch)
treeeeb00d06512f51c2140c5d342d40eb7da4ef19e2 /ufo/ufo-daemon.c
parent89fc1edf44ca48086c193d5ad64d6a0f7ee12660 (diff)
Abstract away communication into interface
* Create generic interface ufo_messenger_iface * Move all zmq_* stuff into ufo_zmq_messenger that implements that interface * also fixes a bug that happened when closing the zmq_socket from another thread by introducing a special TERMINATE message that will kill ufo-daemon
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r--ufo/ufo-daemon.c270
1 files changed, 108 insertions, 162 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index 3d311bb..0e46c6f 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -33,6 +33,8 @@
#include <ufo/ufo-plugin-manager.h>
#include <ufo/ufo-scheduler.h>
#include <ufo/ufo-task-graph.h>
+#include <ufo/ufo-zmq-messenger.h>
+#include <ufo/ufo-messenger-iface.h>
#include "zmq-shim.h"
@@ -40,8 +42,6 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT)
#define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate))
-#define CHECK_ZMQ(r) if (r == -1) g_warning ("%s:%i: zmq_error: %s\n", __FILE__, __LINE__, zmq_strerror (errno));
-
struct _UfoDaemonPrivate {
UfoConfig *config;
UfoPluginManager *manager;
@@ -58,10 +58,10 @@ struct _UfoDaemonPrivate {
gboolean has_started;
GMutex *started_lock;
GCond *started_cond;
+ UfoMessenger *msger;
};
static gpointer run_scheduler (UfoDaemon *daemon);
-static void validate_zmq_listen_address (gchar *addr);
UfoDaemon *
ufo_daemon_new (UfoConfig *config, gchar *listen_address)
@@ -71,8 +71,6 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address)
g_return_val_if_fail (listen_address != NULL, NULL);
g_return_val_if_fail (config != NULL, NULL);
- validate_zmq_listen_address (listen_address);
-
daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL));
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
@@ -80,68 +78,31 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address)
priv->listen_address = listen_address;
priv->manager = ufo_plugin_manager_new (priv->config);
priv->scheduler = ufo_scheduler_new (priv->config, NULL);
- priv->context = zmq_ctx_new ();
- priv->socket = zmq_socket (priv->context, ZMQ_REP);
-
- int err = zmq_bind (priv->socket, listen_address);
- if (err < 0)
- g_critical ("could not bind to address %s", listen_address);
+ priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
return daemon;
}
static void
-validate_zmq_listen_address (gchar *addr)
-{
- if (!g_str_has_prefix (addr, "tcp://"))
- g_critical ("address didn't start with tcp:// scheme, which is required currently");
-
- /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555
- * but not bind to it as it treats it like an interface name (like eth0).
- * We have to use IP addresses instead of DNS names.
- */
- gchar *host = g_strdup (&addr[6]);
- if (!g_ascii_isdigit (host[0]) && host[0] != '*')
- g_warning ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host);
- g_free (host);
-}
-
-static void
-ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags)
-{
- zmq_msg_t reply;
-
- zmq_msg_init_size (&reply, sizeof (UfoMessage));
- memcpy (zmq_msg_data (&reply), msg, sizeof (UfoMessage));
- zmq_msg_send (&reply, socket, flags);
- zmq_msg_close (&reply);
-}
-
-static void
-send_ack (gpointer socket)
-{
- UfoMessage msg;
- msg.type = UFO_MESSAGE_ACK;
- ufo_msg_send (&msg, socket, 0);
-}
-
-static void
handle_get_num_devices (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
-
- UfoMessage msg;
cl_context context;
+ UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
+ cl_uint *num_devices = g_malloc (sizeof (cl_uint));
context = ufo_scheduler_get_context (priv->scheduler);
UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context,
CL_CONTEXT_NUM_DEVICES,
sizeof (cl_uint),
- &msg.d.n_devices,
+ num_devices,
NULL));
- ufo_msg_send (&msg, priv->socket, 0);
+ *(guint16 *) msg->data = (guint16) *num_devices;
+
+ ufo_messenger_send_blocking (priv->msger, msg, 0);
+ ufo_message_free (msg);
}
static UfoNode *
@@ -166,34 +127,30 @@ remove_dummy_if_present (UfoGraph *graph,
}
static gchar *
-read_json (UfoDaemon *daemon)
+read_json (UfoDaemon *daemon, UfoMessage *msg)
{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
-
- zmq_msg_t json_msg;
- gsize size;
gchar *json;
- zmq_msg_init (&json_msg);
- size = (gsize) zmq_msg_recv (&json_msg, priv->socket, 0);
-
- json = g_malloc0 (size + 1);
- memcpy (json, zmq_msg_data (&json_msg), size);
- zmq_msg_close (&json_msg);
+ json = g_malloc0 (msg->data_size + 1);
+ memcpy (json, msg->data, msg->data_size);
return json;
}
static void
-handle_replicate_json (UfoDaemon *daemon)
+handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
gchar *json;
UfoTaskGraph *graph;
GError *error = NULL;
- json = read_json (daemon);
- send_ack (priv->socket);
+ json = read_json (daemon, msg);
+
+ // send ack
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
ufo_task_graph_read_from_data (graph, priv->manager, json, &error);
@@ -203,10 +160,7 @@ handle_replicate_json (UfoDaemon *daemon)
goto replicate_json_free;
}
- g_message ("Start scheduler");
ufo_scheduler_run (priv->scheduler, graph, NULL);
-
- g_message ("Done");
g_object_unref (priv->scheduler);
priv->scheduler = ufo_scheduler_new (priv->config, NULL);
@@ -217,7 +171,7 @@ replicate_json_free:
}
static void
-handle_stream_json (UfoDaemon *daemon)
+handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
gchar *json;
@@ -227,7 +181,11 @@ handle_stream_json (UfoDaemon *daemon)
UfoNode *last;
GError *error = NULL;
- json = read_json (daemon);
+ json = read_json (daemon, msg);
+ // send ack
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
/* Setup local task graph */
priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
@@ -263,79 +221,65 @@ handle_stream_json (UfoDaemon *daemon)
g_thread_create ((GThreadFunc) run_scheduler, daemon, FALSE, NULL);
g_free (json);
- send_ack (priv->socket);
-}
-
-static void
-handle_setup (UfoDaemon *daemon)
-{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- g_message ("Setup requested");
- send_ack (priv->socket);
}
static void
handle_get_structure (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoMessage header;
- UfoInputParam in_param;
- zmq_msg_t data_msg;
+ UfoMessage *response;
- g_message ("Structure requested");
- header.type = UFO_MESSAGE_STRUCTURE;
+ /* TODO move into .h and share between daemon and remote-node */
+ struct _Structure {
+ guint16 n_inputs;
+ guint16 n_dims;
+ } msg_data;
- /* TODO: do not hardcode these */
- header.d.n_inputs = 1;
- in_param.n_dims = 2;
+ /* TODO don't hardcode these */
+ msg_data.n_inputs = 1;
+ msg_data.n_dims = 2;
- zmq_msg_init_size (&data_msg, sizeof (UfoInputParam));
- memcpy (zmq_msg_data (&data_msg), &in_param, sizeof (UfoInputParam));
+ response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure));
+ *(struct _Structure *) (response->data) = msg_data;
- ufo_msg_send (&header, priv->socket, ZMQ_SNDMORE);
- zmq_msg_send (&data_msg, priv->socket, 0);
- zmq_msg_close (&data_msg);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
}
static void
-handle_send_inputs (UfoDaemon *daemon)
+handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoRequisition *requisition;
- zmq_msg_t requisition_msg;
- zmq_msg_t data_msg;
+ UfoRequisition requisition;
gpointer context;
context = ufo_scheduler_get_context (priv->scheduler);
- /* Receive buffer size */
- zmq_msg_init (&requisition_msg);
- zmq_msg_recv (&requisition_msg, priv->socket, 0);
- g_assert (zmq_msg_size (&requisition_msg) >= sizeof (UfoRequisition));
- requisition = zmq_msg_data (&requisition_msg);
+ struct _Header {
+ UfoRequisition requisition;
+ guint64 buffer_size;
+ };
+ gpointer base = request->data;
+ struct _Header *header = (struct _Header *) base;
+
+ /* Receive buffer size */
+ requisition = header->requisition;
if (priv->input == NULL) {
- priv->input = ufo_buffer_new (requisition, context);
+ priv->input = ufo_buffer_new (&requisition, context);
}
else {
- if (ufo_buffer_cmp_dimensions (priv->input, requisition))
- ufo_buffer_resize (priv->input, requisition);
+ if (ufo_buffer_cmp_dimensions (priv->input, &requisition))
+ ufo_buffer_resize (priv->input, &requisition);
}
-
- zmq_msg_close (&requisition_msg);
-
- /* Receive actual buffer */
- zmq_msg_init (&data_msg);
- zmq_msg_recv (&data_msg, priv->socket, 0);
-
memcpy (ufo_buffer_get_host_array (priv->input, NULL),
- zmq_msg_data (&data_msg),
+ base + sizeof (struct _Header),
ufo_buffer_get_size (priv->input));
-
ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
- zmq_msg_close (&data_msg);
- send_ack (priv->socket);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
}
static void
@@ -343,17 +287,15 @@ handle_get_requisition (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoRequisition requisition;
- zmq_msg_t reply_msg;
/* We need to get the requisition from the last node */
- g_message ("Requisition requested");
ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task),
&requisition);
- zmq_msg_init_size (&reply_msg, sizeof (UfoRequisition));
- memcpy (zmq_msg_data (&reply_msg), &requisition, sizeof (UfoRequisition));
- zmq_msg_send (&reply_msg, priv->socket, 0);
- zmq_msg_close (&reply_msg);
+ UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition));
+ memcpy (msg->data, &requisition, msg->data_size);
+ ufo_messenger_send_blocking (priv->msger, msg, NULL);
+ ufo_message_free (msg);
}
static
@@ -361,16 +303,14 @@ void handle_get_result (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoBuffer *buffer;
- zmq_msg_t reply_msg;
gsize size;
buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task));
size = ufo_buffer_get_size (buffer);
- zmq_msg_init_size (&reply_msg, size);
- memcpy (zmq_msg_data (&reply_msg), ufo_buffer_get_host_array (buffer, NULL), size);
- zmq_msg_send (&reply_msg, priv->socket, 0);
- zmq_msg_close (&reply_msg);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size);
+ memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer);
}
@@ -392,7 +332,9 @@ void handle_cleanup (UfoDaemon *daemon)
* We send the ACK early on, because we don't want to let the host wait for
* actually cleaning up (and waiting some time to unref the input task).
*/
- send_ack (priv->socket);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
// TODO check that we don't need to execture this branch wen priv->input is null
if (priv->input_task && priv->input) {
@@ -410,6 +352,18 @@ void handle_cleanup (UfoDaemon *daemon)
unref_and_free ((GObject **) &priv->task_graph);
}
+static void
+handle_terminate (UfoDaemon *daemon)
+{
+ UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->msger, response, NULL);
+ ufo_message_free (response);
+
+ priv->run = FALSE;
+ ufo_messenger_disconnect (priv->msger);
+}
+
static gpointer
run_scheduler (UfoDaemon *daemon)
{
@@ -430,48 +384,39 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
while (priv->run) {
- zmq_msg_t request;
+ // zmq_msg_t request;
- zmq_msg_init (&request);
+ // zmq_msg_init (&request);
g_mutex_lock (priv->started_lock);
priv->has_started = TRUE;
g_cond_signal (priv->started_cond);
g_mutex_unlock (priv->started_lock);
- gint err = zmq_msg_recv (&request, priv->socket, 0);
+ // gint err = zmq_msg_recv (&request, priv->socket, 0);
/* if daemon is stopped, socket will be closed and msg_recv
* will yield an error - we simply want to return
*/
- if (err < 0) return;
-
- if (zmq_msg_size (&request) < sizeof (UfoMessage)) {
- g_warning ("Message is smaller than expected\n");
- send_ack (priv->socket);
- }
- else {
- UfoMessage *msg;
-
- msg = (UfoMessage *) zmq_msg_data (&request);
+ GError *err = NULL;
+ UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err);
+ if (err != NULL) {
+ } else {
switch (msg->type) {
case UFO_MESSAGE_GET_NUM_DEVICES:
handle_get_num_devices (daemon);
break;
case UFO_MESSAGE_STREAM_JSON:
- handle_stream_json (daemon);
+ handle_stream_json (daemon, msg);
break;
case UFO_MESSAGE_REPLICATE_JSON:
- handle_replicate_json (daemon);
- break;
- case UFO_MESSAGE_SETUP:
- handle_setup (daemon);
+ handle_replicate_json (daemon, msg);
break;
case UFO_MESSAGE_GET_STRUCTURE:
handle_get_structure (daemon);
break;
case UFO_MESSAGE_SEND_INPUTS:
- handle_send_inputs (daemon);
+ handle_send_inputs (daemon, msg);
break;
case UFO_MESSAGE_GET_REQUISITION:
handle_get_requisition (daemon);
@@ -482,11 +427,14 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
case UFO_MESSAGE_CLEANUP:
handle_cleanup (daemon);
break;
+ case UFO_MESSAGE_TERMINATE:
+ handle_terminate (daemon);
+ break;
default:
- g_print ("unhandled case\n");
+ g_message ("Unknown message received\n");
}
}
- zmq_msg_close (&request);
+ ufo_message_free (msg);
}
}
@@ -495,6 +443,9 @@ ufo_daemon_start (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ /* TODO handle error if unable to connect/bind */
+ ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER);
+
priv->run = TRUE;
priv->thread = g_thread_create ((GThreadFunc)ufo_daemon_start_impl, daemon, TRUE, NULL);
g_return_if_fail (priv->thread != NULL);
@@ -511,15 +462,15 @@ void
ufo_daemon_stop (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- priv->run = FALSE;
-
- zmq_close (priv->socket);
- if (priv->context != NULL) {
- zmq_ctx_destroy (priv->context);
- priv->context = NULL;
- g_assert (priv->context == NULL);
- }
+ /* HACK we can't call _disconnect() as this has to be run from the
+ * thread running the daemon - we thus send a TERMINATE message to
+ * that thread
+ */
+ UfoMessenger *tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT);
+ UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0);
+ ufo_messenger_send_blocking (tmp_msger, request, NULL);
g_thread_join (priv->thread);
}
@@ -534,18 +485,13 @@ ufo_daemon_dispose (GObject *object)
g_object_unref (priv->task_graph);
if (priv->config != NULL)
g_object_unref (priv->config);
+ if (priv->msger != NULL)
+ g_object_unref (priv->msger);
if (priv->manager != NULL)
g_object_unref (priv->manager);
if (priv->scheduler != NULL)
g_object_unref (priv->scheduler);
- zmq_close (priv->socket);
-
- if (priv->context != NULL) {
- zmq_ctx_destroy (priv->context);
- priv->context = NULL;
- }
-
G_OBJECT_CLASS (ufo_daemon_parent_class)->dispose (object);
}