summaryrefslogtreecommitdiff
path: root/ufo/ufo-daemon.c
diff options
context:
space:
mode:
authorMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-04-21 09:31:11 +0200
committerMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-04-21 09:33:44 +0200
commit4be83c379a00919b9026b3a6476299c7bf4ca77f (patch)
treecda6d806f4bcb20851c5254623a6ab69de631861 /ufo/ufo-daemon.c
parentb0174d6770a03e77af7682eca91613b197eb6f73 (diff)
Rework message handling
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r--ufo/ufo-daemon.c205
1 files changed, 101 insertions, 104 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index f2d0602..7bd4a27 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -71,10 +71,33 @@ struct _UfoDaemonPrivate {
gboolean has_stopped;
GCond *started_cond;
GCond *stopped_cond;
- UfoMessenger *msger;
+ UfoMessenger *messenger;
};
static gpointer run_scheduler (UfoDaemon *daemon);
+static void handle_get_num_devices (UfoDaemon *, UfoMessage *);
+static void handle_stream_json (UfoDaemon *, UfoMessage *);
+static void handle_replicate_json (UfoDaemon *, UfoMessage *);
+static void handle_get_structure (UfoDaemon *, UfoMessage *);
+static void handle_send_inputs (UfoDaemon *, UfoMessage *);
+static void handle_get_requisition (UfoDaemon *, UfoMessage *);
+static void handle_get_result (UfoDaemon *, UfoMessage *);
+static void handle_cleanup (UfoDaemon *, UfoMessage *);
+static void handle_terminate (UfoDaemon *, UfoMessage *);
+
+typedef void (*RequestHandler) (UfoDaemon *daemon, UfoMessage *request);
+
+static RequestHandler handlers[] = {
+ handle_stream_json,
+ handle_replicate_json,
+ handle_get_num_devices,
+ handle_get_structure,
+ handle_get_requisition,
+ handle_send_inputs,
+ handle_get_result,
+ handle_cleanup,
+ handle_terminate,
+};
UfoDaemon *
ufo_daemon_new (const gchar *listen_address)
@@ -91,37 +114,36 @@ ufo_daemon_new (const gchar *listen_address)
priv->manager = ufo_plugin_manager_new ();
#ifdef WITH_MPI
- priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
+ priv->messenger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
- priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ priv->messenger = UFO_MESSENGER (ufo_zmq_messenger_new ());
#endif
return daemon;
}
static void
-handle_get_num_devices (UfoDaemon *daemon)
+handle_get_num_devices (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv;
- UfoMessage *msg;
+ UfoMessage *reply;
cl_uint num_devices;
cl_context context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
- msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
+ reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
context = ufo_resources_get_context (priv->resources);
UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL));
- *(guint16 *) msg->data = (guint16) num_devices;
+ *(guint16 *) reply->data = (guint16) num_devices;
- ufo_messenger_send_blocking (priv->msger, msg, 0);
- ufo_message_free (msg);
+ ufo_messenger_send_blocking (priv->messenger, reply, 0);
+ ufo_message_free (reply);
}
static UfoNode *
-remove_dummy_if_present (UfoGraph *graph,
- UfoNode *first)
+remove_dummy_if_present (UfoGraph *graph, UfoNode *first)
{
UfoNode *real = first;
@@ -141,30 +163,30 @@ remove_dummy_if_present (UfoGraph *graph,
}
static gchar *
-read_json (UfoDaemon *daemon, UfoMessage *msg)
+read_json (UfoDaemon *daemon, UfoMessage *message)
{
gchar *json;
- json = g_malloc0 (msg->data_size + 1);
- memcpy (json, msg->data, msg->data_size);
+ json = g_malloc0 (message->data_size + 1);
+ memcpy (json, message->data, message->data_size);
return json;
}
static void
-handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg)
+handle_replicate_json (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
gchar *json;
UfoTaskGraph *graph;
GError *error = NULL;
- json = read_json (daemon, msg);
+ json = read_json (daemon, request);
- // send ack
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ /* send ack */
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
ufo_task_graph_read_from_data (graph, priv->manager, json, &error);
@@ -185,7 +207,7 @@ replicate_json_free:
}
static void
-handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
+handle_stream_json (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv;
gchar *json;
@@ -196,12 +218,12 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
GError *error = NULL;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
- json = read_json (daemon, msg);
+ json = read_json (daemon, request);
/* send ack */
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
/* Setup local task graph */
priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
@@ -235,26 +257,26 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
}
static void
-handle_get_structure (UfoDaemon *daemon)
+handle_get_structure (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoMessage *response;
+ UfoMessage *reply;
/* TODO move into .h and share between daemon and remote-node */
struct _Structure {
guint16 n_inputs;
guint16 n_dims;
- } msg_data;
+ } message_data;
/* TODO don't hardcode these */
- msg_data.n_inputs = 1;
- msg_data.n_dims = 2;
+ message_data.n_inputs = 1;
+ message_data.n_dims = 2;
- response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure));
- *(struct _Structure *) (response->data) = msg_data;
+ reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure));
+ *(struct _Structure *) (reply->data) = message_data;
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
}
static void
@@ -292,13 +314,13 @@ handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
}
static void
-handle_get_requisition (UfoDaemon *daemon)
+handle_get_requisition (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoRequisition requisition;
@@ -307,14 +329,14 @@ handle_get_requisition (UfoDaemon *daemon)
ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task),
&requisition);
- UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition));
- memcpy (msg->data, &requisition, msg->data_size);
- ufo_messenger_send_blocking (priv->msger, msg, NULL);
- ufo_message_free (msg);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition));
+ memcpy (reply->data, &requisition, reply->data_size);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
}
static
-void handle_get_result (UfoDaemon *daemon)
+void handle_get_result (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
UfoBuffer *buffer;
@@ -323,9 +345,9 @@ void handle_get_result (UfoDaemon *daemon)
buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task));
size = ufo_buffer_get_size (buffer);
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size);
- memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, size);
+ memcpy (reply->data, ufo_buffer_get_host_array (buffer, NULL), size);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer);
}
@@ -339,7 +361,7 @@ unref_and_free (GObject **object)
}
static
-void handle_cleanup (UfoDaemon *daemon)
+void handle_cleanup (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
@@ -347,9 +369,9 @@ void handle_cleanup (UfoDaemon *daemon)
* We send the ACK early on, because we don't want to let the host wait for
* actually cleaning up (and waiting some time to unref the input task).
*/
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0);
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
/* TODO: check that we don't need to execute this branch wen priv->input is null */
if (priv->input_task && priv->input) {
@@ -365,24 +387,24 @@ void handle_cleanup (UfoDaemon *daemon)
unref_and_free ((GObject **) &priv->output_task);
unref_and_free ((GObject **) &priv->task_graph);
-
}
static void
-handle_terminate (UfoDaemon *daemon)
+handle_terminate (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
- ufo_messenger_send_blocking (priv->msger, response, NULL);
- ufo_message_free (response);
+ UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0);
+
+ ufo_messenger_send_blocking (priv->messenger, reply, NULL);
+ ufo_message_free (reply);
if (priv->scheduler_thread != NULL) {
- g_message ("waiting for scheduler to finish");
+ g_message ("Waiting for scheduler to finish ...");
g_thread_join (priv->scheduler_thread);
- g_message ("scheduler finished!");
+ g_message ("Done.");
}
- ufo_messenger_disconnect (priv->msger);
+ ufo_messenger_disconnect (priv->messenger);
}
static gpointer
@@ -390,12 +412,11 @@ run_scheduler (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- g_message ("Start scheduler");
+ g_message ("Run scheduler ...");
priv->scheduler = ufo_scheduler_new ();
ufo_base_scheduler_set_arch (priv->scheduler, priv->arch);
ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL);
-
- g_message ("Done");
+ g_message ("Done.");
g_object_unref (priv->scheduler);
priv->scheduler = ufo_scheduler_new ();
@@ -429,48 +450,24 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
while (wait_for_messages) {
GError *err = NULL;
- UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err);
+ UfoMessage *message = ufo_messenger_recv_blocking (priv->messenger, &err);
if (err != NULL) {
- /* If daemon is stopped, socket will be closed and msg_recv will
+ /* If daemon is stopped, socket will be closed and message_recv will
* yield an error - we stop. */
wait_for_messages = FALSE;
}
else {
- switch (msg->type) {
- case UFO_MESSAGE_GET_NUM_DEVICES:
- handle_get_num_devices (daemon);
- break;
- case UFO_MESSAGE_STREAM_JSON:
- handle_stream_json (daemon, msg);
- break;
- case UFO_MESSAGE_REPLICATE_JSON:
- handle_replicate_json (daemon, msg);
- break;
- case UFO_MESSAGE_GET_STRUCTURE:
- handle_get_structure (daemon);
- break;
- case UFO_MESSAGE_SEND_INPUTS:
- handle_send_inputs (daemon, msg);
- break;
- case UFO_MESSAGE_GET_REQUISITION:
- handle_get_requisition (daemon);
- break;
- case UFO_MESSAGE_GET_RESULT:
- handle_get_result (daemon);
- break;
- case UFO_MESSAGE_CLEANUP:
- handle_cleanup (daemon);
- break;
- case UFO_MESSAGE_TERMINATE:
- handle_terminate (daemon);
- wait_for_messages = FALSE;
- break;
- default:
- g_message ("Unknown message received\n");
- }
+ if (message->type >= UFO_MESSAGE_INVALID_REQUEST)
+ g_error ("Invalid request");
+ else
+ handlers[message->type](daemon, message);
+
+ if (message->type == UFO_MESSAGE_TERMINATE)
+ wait_for_messages = FALSE;
}
- ufo_message_free (msg);
+
+ ufo_message_free (message);
}
/* tell calling thread we have stopped */
@@ -493,7 +490,7 @@ ufo_daemon_start (UfoDaemon *daemon, GError **error)
return;
}
- ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER, &tmp_error);
+ ufo_messenger_connect (priv->messenger, priv->listen_address, UFO_MESSENGER_SERVER, &tmp_error);
if (tmp_error != NULL) {
g_propagate_error (error, tmp_error);
@@ -527,14 +524,14 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error)
* - we thus send a TERMINATE message to that thread
*/
- UfoMessenger *tmp_msger;
+ UfoMessenger *tmp_messenger;
#ifdef WITH_MPI
- tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
+ tmp_messenger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
- tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
+ tmp_messenger = UFO_MESSENGER (ufo_zmq_messenger_new ());
#endif
- ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT, &tmp_error);
+ ufo_messenger_connect (tmp_messenger, priv->listen_address, UFO_MESSENGER_CLIENT, &tmp_error);
if (tmp_error != NULL) {
g_propagate_error (error, tmp_error);
@@ -542,7 +539,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error)
}
UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0);
- ufo_messenger_send_blocking (tmp_msger, request, NULL);
+ ufo_messenger_send_blocking (tmp_messenger, request, NULL);
g_thread_join (priv->thread);
@@ -576,8 +573,8 @@ ufo_daemon_dispose (GObject *object)
if (priv->task_graph)
g_object_unref (priv->task_graph);
- if (priv->msger != NULL)
- g_object_unref (priv->msger);
+ if (priv->messenger != NULL)
+ g_object_unref (priv->messenger);
if (priv->manager != NULL)
g_object_unref (priv->manager);