diff options
author | Timo Dörr <timo@latecrew.de> | 2013-10-04 17:18:41 +0200 |
---|---|---|
committer | Timo Dörr <timo@latecrew.de> | 2013-10-23 14:08:07 +0200 |
commit | 921d9015736205bb03a02985f6046d931cdd26c9 (patch) | |
tree | eeb00d06512f51c2140c5d342d40eb7da4ef19e2 /ufo/ufo-daemon.c | |
parent | 89fc1edf44ca48086c193d5ad64d6a0f7ee12660 (diff) |
Abstract away communication into interface
* Create generic interface ufo_messenger_iface
* Move all zmq_* stuff into ufo_zmq_messenger that implements that
interface
* also fixes a bug that happened when closing the zmq_socket from
another thread by introducing a special TERMINATE message that
will kill ufo-daemon
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r-- | ufo/ufo-daemon.c | 270 |
1 files changed, 108 insertions, 162 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index 3d311bb..0e46c6f 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -33,6 +33,8 @@ #include <ufo/ufo-plugin-manager.h> #include <ufo/ufo-scheduler.h> #include <ufo/ufo-task-graph.h> +#include <ufo/ufo-zmq-messenger.h> +#include <ufo/ufo-messenger-iface.h> #include "zmq-shim.h" @@ -40,8 +42,6 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT) #define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate)) -#define CHECK_ZMQ(r) if (r == -1) g_warning ("%s:%i: zmq_error: %s\n", __FILE__, __LINE__, zmq_strerror (errno)); - struct _UfoDaemonPrivate { UfoConfig *config; UfoPluginManager *manager; @@ -58,10 +58,10 @@ struct _UfoDaemonPrivate { gboolean has_started; GMutex *started_lock; GCond *started_cond; + UfoMessenger *msger; }; static gpointer run_scheduler (UfoDaemon *daemon); -static void validate_zmq_listen_address (gchar *addr); UfoDaemon * ufo_daemon_new (UfoConfig *config, gchar *listen_address) @@ -71,8 +71,6 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address) g_return_val_if_fail (listen_address != NULL, NULL); g_return_val_if_fail (config != NULL, NULL); - validate_zmq_listen_address (listen_address); - daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL)); UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); @@ -80,68 +78,31 @@ ufo_daemon_new (UfoConfig *config, gchar *listen_address) priv->listen_address = listen_address; priv->manager = ufo_plugin_manager_new (priv->config); priv->scheduler = ufo_scheduler_new (priv->config, NULL); - priv->context = zmq_ctx_new (); - priv->socket = zmq_socket (priv->context, ZMQ_REP); - - int err = zmq_bind (priv->socket, listen_address); - if (err < 0) - g_critical ("could not bind to address %s", listen_address); + priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); return daemon; } static void -validate_zmq_listen_address (gchar *addr) -{ - if (!g_str_has_prefix (addr, "tcp://")) - g_critical ("address didn't start with tcp:// scheme, which is required currently"); - - /* Pitfall: zmq will silently accept hostnames like tcp://localhost:5555 - * but not bind to it as it treats it like an interface name (like eth0). - * We have to use IP addresses instead of DNS names. - */ - gchar *host = g_strdup (&addr[6]); - if (!g_ascii_isdigit (host[0]) && host[0] != '*') - g_warning ("treating address %s as interface device name. Use IP address if supplying a host was intended.", host); - g_free (host); -} - -static void -ufo_msg_send (UfoMessage *msg, gpointer socket, gint flags) -{ - zmq_msg_t reply; - - zmq_msg_init_size (&reply, sizeof (UfoMessage)); - memcpy (zmq_msg_data (&reply), msg, sizeof (UfoMessage)); - zmq_msg_send (&reply, socket, flags); - zmq_msg_close (&reply); -} - -static void -send_ack (gpointer socket) -{ - UfoMessage msg; - msg.type = UFO_MESSAGE_ACK; - ufo_msg_send (&msg, socket, 0); -} - -static void handle_get_num_devices (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - - UfoMessage msg; cl_context context; + UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); + cl_uint *num_devices = g_malloc (sizeof (cl_uint)); context = ufo_scheduler_get_context (priv->scheduler); UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), - &msg.d.n_devices, + num_devices, NULL)); - ufo_msg_send (&msg, priv->socket, 0); + *(guint16 *) msg->data = (guint16) *num_devices; + + ufo_messenger_send_blocking (priv->msger, msg, 0); + ufo_message_free (msg); } static UfoNode * @@ -166,34 +127,30 @@ remove_dummy_if_present (UfoGraph *graph, } static gchar * -read_json (UfoDaemon *daemon) +read_json (UfoDaemon *daemon, UfoMessage *msg) { - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - - zmq_msg_t json_msg; - gsize size; gchar *json; - zmq_msg_init (&json_msg); - size = (gsize) zmq_msg_recv (&json_msg, priv->socket, 0); - - json = g_malloc0 (size + 1); - memcpy (json, zmq_msg_data (&json_msg), size); - zmq_msg_close (&json_msg); + json = g_malloc0 (msg->data_size + 1); + memcpy (json, msg->data, msg->data_size); return json; } static void -handle_replicate_json (UfoDaemon *daemon) +handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); gchar *json; UfoTaskGraph *graph; GError *error = NULL; - json = read_json (daemon); - send_ack (priv->socket); + json = read_json (daemon, msg); + + // send ack + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); ufo_task_graph_read_from_data (graph, priv->manager, json, &error); @@ -203,10 +160,7 @@ handle_replicate_json (UfoDaemon *daemon) goto replicate_json_free; } - g_message ("Start scheduler"); ufo_scheduler_run (priv->scheduler, graph, NULL); - - g_message ("Done"); g_object_unref (priv->scheduler); priv->scheduler = ufo_scheduler_new (priv->config, NULL); @@ -217,7 +171,7 @@ replicate_json_free: } static void -handle_stream_json (UfoDaemon *daemon) +handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); gchar *json; @@ -227,7 +181,11 @@ handle_stream_json (UfoDaemon *daemon) UfoNode *last; GError *error = NULL; - json = read_json (daemon); + json = read_json (daemon, msg); + // send ack + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); /* Setup local task graph */ priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); @@ -263,79 +221,65 @@ handle_stream_json (UfoDaemon *daemon) g_thread_create ((GThreadFunc) run_scheduler, daemon, FALSE, NULL); g_free (json); - send_ack (priv->socket); -} - -static void -handle_setup (UfoDaemon *daemon) -{ - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - g_message ("Setup requested"); - send_ack (priv->socket); } static void handle_get_structure (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoMessage header; - UfoInputParam in_param; - zmq_msg_t data_msg; + UfoMessage *response; - g_message ("Structure requested"); - header.type = UFO_MESSAGE_STRUCTURE; + /* TODO move into .h and share between daemon and remote-node */ + struct _Structure { + guint16 n_inputs; + guint16 n_dims; + } msg_data; - /* TODO: do not hardcode these */ - header.d.n_inputs = 1; - in_param.n_dims = 2; + /* TODO don't hardcode these */ + msg_data.n_inputs = 1; + msg_data.n_dims = 2; - zmq_msg_init_size (&data_msg, sizeof (UfoInputParam)); - memcpy (zmq_msg_data (&data_msg), &in_param, sizeof (UfoInputParam)); + response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure)); + *(struct _Structure *) (response->data) = msg_data; - ufo_msg_send (&header, priv->socket, ZMQ_SNDMORE); - zmq_msg_send (&data_msg, priv->socket, 0); - zmq_msg_close (&data_msg); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); } static void -handle_send_inputs (UfoDaemon *daemon) +handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoRequisition *requisition; - zmq_msg_t requisition_msg; - zmq_msg_t data_msg; + UfoRequisition requisition; gpointer context; context = ufo_scheduler_get_context (priv->scheduler); - /* Receive buffer size */ - zmq_msg_init (&requisition_msg); - zmq_msg_recv (&requisition_msg, priv->socket, 0); - g_assert (zmq_msg_size (&requisition_msg) >= sizeof (UfoRequisition)); - requisition = zmq_msg_data (&requisition_msg); + struct _Header { + UfoRequisition requisition; + guint64 buffer_size; + }; + gpointer base = request->data; + struct _Header *header = (struct _Header *) base; + + /* Receive buffer size */ + requisition = header->requisition; if (priv->input == NULL) { - priv->input = ufo_buffer_new (requisition, context); + priv->input = ufo_buffer_new (&requisition, context); } else { - if (ufo_buffer_cmp_dimensions (priv->input, requisition)) - ufo_buffer_resize (priv->input, requisition); + if (ufo_buffer_cmp_dimensions (priv->input, &requisition)) + ufo_buffer_resize (priv->input, &requisition); } - - zmq_msg_close (&requisition_msg); - - /* Receive actual buffer */ - zmq_msg_init (&data_msg); - zmq_msg_recv (&data_msg, priv->socket, 0); - memcpy (ufo_buffer_get_host_array (priv->input, NULL), - zmq_msg_data (&data_msg), + base + sizeof (struct _Header), ufo_buffer_get_size (priv->input)); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); - zmq_msg_close (&data_msg); - send_ack (priv->socket); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); } static void @@ -343,17 +287,15 @@ handle_get_requisition (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoRequisition requisition; - zmq_msg_t reply_msg; /* We need to get the requisition from the last node */ - g_message ("Requisition requested"); ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task), &requisition); - zmq_msg_init_size (&reply_msg, sizeof (UfoRequisition)); - memcpy (zmq_msg_data (&reply_msg), &requisition, sizeof (UfoRequisition)); - zmq_msg_send (&reply_msg, priv->socket, 0); - zmq_msg_close (&reply_msg); + UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition)); + memcpy (msg->data, &requisition, msg->data_size); + ufo_messenger_send_blocking (priv->msger, msg, NULL); + ufo_message_free (msg); } static @@ -361,16 +303,14 @@ void handle_get_result (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoBuffer *buffer; - zmq_msg_t reply_msg; gsize size; buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task)); size = ufo_buffer_get_size (buffer); - zmq_msg_init_size (&reply_msg, size); - memcpy (zmq_msg_data (&reply_msg), ufo_buffer_get_host_array (buffer, NULL), size); - zmq_msg_send (&reply_msg, priv->socket, 0); - zmq_msg_close (&reply_msg); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size); + memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size); + ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer); } @@ -392,7 +332,9 @@ void handle_cleanup (UfoDaemon *daemon) * We send the ACK early on, because we don't want to let the host wait for * actually cleaning up (and waiting some time to unref the input task). */ - send_ack (priv->socket); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); // TODO check that we don't need to execture this branch wen priv->input is null if (priv->input_task && priv->input) { @@ -410,6 +352,18 @@ void handle_cleanup (UfoDaemon *daemon) unref_and_free ((GObject **) &priv->task_graph); } +static void +handle_terminate (UfoDaemon *daemon) +{ + UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->msger, response, NULL); + ufo_message_free (response); + + priv->run = FALSE; + ufo_messenger_disconnect (priv->msger); +} + static gpointer run_scheduler (UfoDaemon *daemon) { @@ -430,48 +384,39 @@ ufo_daemon_start_impl (UfoDaemon *daemon) UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); while (priv->run) { - zmq_msg_t request; + // zmq_msg_t request; - zmq_msg_init (&request); + // zmq_msg_init (&request); g_mutex_lock (priv->started_lock); priv->has_started = TRUE; g_cond_signal (priv->started_cond); g_mutex_unlock (priv->started_lock); - gint err = zmq_msg_recv (&request, priv->socket, 0); + // gint err = zmq_msg_recv (&request, priv->socket, 0); /* if daemon is stopped, socket will be closed and msg_recv * will yield an error - we simply want to return */ - if (err < 0) return; - - if (zmq_msg_size (&request) < sizeof (UfoMessage)) { - g_warning ("Message is smaller than expected\n"); - send_ack (priv->socket); - } - else { - UfoMessage *msg; - - msg = (UfoMessage *) zmq_msg_data (&request); + GError *err = NULL; + UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err); + if (err != NULL) { + } else { switch (msg->type) { case UFO_MESSAGE_GET_NUM_DEVICES: handle_get_num_devices (daemon); break; case UFO_MESSAGE_STREAM_JSON: - handle_stream_json (daemon); + handle_stream_json (daemon, msg); break; case UFO_MESSAGE_REPLICATE_JSON: - handle_replicate_json (daemon); - break; - case UFO_MESSAGE_SETUP: - handle_setup (daemon); + handle_replicate_json (daemon, msg); break; case UFO_MESSAGE_GET_STRUCTURE: handle_get_structure (daemon); break; case UFO_MESSAGE_SEND_INPUTS: - handle_send_inputs (daemon); + handle_send_inputs (daemon, msg); break; case UFO_MESSAGE_GET_REQUISITION: handle_get_requisition (daemon); @@ -482,11 +427,14 @@ ufo_daemon_start_impl (UfoDaemon *daemon) case UFO_MESSAGE_CLEANUP: handle_cleanup (daemon); break; + case UFO_MESSAGE_TERMINATE: + handle_terminate (daemon); + break; default: - g_print ("unhandled case\n"); + g_message ("Unknown message received\n"); } } - zmq_msg_close (&request); + ufo_message_free (msg); } } @@ -495,6 +443,9 @@ ufo_daemon_start (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + /* TODO handle error if unable to connect/bind */ + ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER); + priv->run = TRUE; priv->thread = g_thread_create ((GThreadFunc)ufo_daemon_start_impl, daemon, TRUE, NULL); g_return_if_fail (priv->thread != NULL); @@ -511,15 +462,15 @@ void ufo_daemon_stop (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - priv->run = FALSE; - - zmq_close (priv->socket); - if (priv->context != NULL) { - zmq_ctx_destroy (priv->context); - priv->context = NULL; - g_assert (priv->context == NULL); - } + /* HACK we can't call _disconnect() as this has to be run from the + * thread running the daemon - we thus send a TERMINATE message to + * that thread + */ + UfoMessenger *tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT); + UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0); + ufo_messenger_send_blocking (tmp_msger, request, NULL); g_thread_join (priv->thread); } @@ -534,18 +485,13 @@ ufo_daemon_dispose (GObject *object) g_object_unref (priv->task_graph); if (priv->config != NULL) g_object_unref (priv->config); + if (priv->msger != NULL) + g_object_unref (priv->msger); if (priv->manager != NULL) g_object_unref (priv->manager); if (priv->scheduler != NULL) g_object_unref (priv->scheduler); - zmq_close (priv->socket); - - if (priv->context != NULL) { - zmq_ctx_destroy (priv->context); - priv->context = NULL; - } - G_OBJECT_CLASS (ufo_daemon_parent_class)->dispose (object); } |