diff options
author | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2015-04-21 09:31:11 +0200 |
---|---|---|
committer | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2015-04-21 09:33:44 +0200 |
commit | 4be83c379a00919b9026b3a6476299c7bf4ca77f (patch) | |
tree | cda6d806f4bcb20851c5254623a6ab69de631861 /ufo/ufo-daemon.c | |
parent | b0174d6770a03e77af7682eca91613b197eb6f73 (diff) |
Rework message handling
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r-- | ufo/ufo-daemon.c | 205 |
1 files changed, 101 insertions, 104 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index f2d0602..7bd4a27 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -71,10 +71,33 @@ struct _UfoDaemonPrivate { gboolean has_stopped; GCond *started_cond; GCond *stopped_cond; - UfoMessenger *msger; + UfoMessenger *messenger; }; static gpointer run_scheduler (UfoDaemon *daemon); +static void handle_get_num_devices (UfoDaemon *, UfoMessage *); +static void handle_stream_json (UfoDaemon *, UfoMessage *); +static void handle_replicate_json (UfoDaemon *, UfoMessage *); +static void handle_get_structure (UfoDaemon *, UfoMessage *); +static void handle_send_inputs (UfoDaemon *, UfoMessage *); +static void handle_get_requisition (UfoDaemon *, UfoMessage *); +static void handle_get_result (UfoDaemon *, UfoMessage *); +static void handle_cleanup (UfoDaemon *, UfoMessage *); +static void handle_terminate (UfoDaemon *, UfoMessage *); + +typedef void (*RequestHandler) (UfoDaemon *daemon, UfoMessage *request); + +static RequestHandler handlers[] = { + handle_stream_json, + handle_replicate_json, + handle_get_num_devices, + handle_get_structure, + handle_get_requisition, + handle_send_inputs, + handle_get_result, + handle_cleanup, + handle_terminate, +}; UfoDaemon * ufo_daemon_new (const gchar *listen_address) @@ -91,37 +114,36 @@ ufo_daemon_new (const gchar *listen_address) priv->manager = ufo_plugin_manager_new (); #ifdef WITH_MPI - priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); + priv->messenger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ - priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + priv->messenger = UFO_MESSENGER (ufo_zmq_messenger_new ()); #endif return daemon; } static void -handle_get_num_devices (UfoDaemon *daemon) +handle_get_num_devices (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv; - UfoMessage *msg; + UfoMessage *reply; cl_uint num_devices; cl_context context; priv = UFO_DAEMON_GET_PRIVATE (daemon); - msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); + reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); context = ufo_resources_get_context (priv->resources); UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL)); - *(guint16 *) msg->data = (guint16) num_devices; + *(guint16 *) reply->data = (guint16) num_devices; - ufo_messenger_send_blocking (priv->msger, msg, 0); - ufo_message_free (msg); + ufo_messenger_send_blocking (priv->messenger, reply, 0); + ufo_message_free (reply); } static UfoNode * -remove_dummy_if_present (UfoGraph *graph, - UfoNode *first) +remove_dummy_if_present (UfoGraph *graph, UfoNode *first) { UfoNode *real = first; @@ -141,30 +163,30 @@ remove_dummy_if_present (UfoGraph *graph, } static gchar * -read_json (UfoDaemon *daemon, UfoMessage *msg) +read_json (UfoDaemon *daemon, UfoMessage *message) { gchar *json; - json = g_malloc0 (msg->data_size + 1); - memcpy (json, msg->data, msg->data_size); + json = g_malloc0 (message->data_size + 1); + memcpy (json, message->data, message->data_size); return json; } static void -handle_replicate_json (UfoDaemon *daemon, UfoMessage *msg) +handle_replicate_json (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); gchar *json; UfoTaskGraph *graph; GError *error = NULL; - json = read_json (daemon, msg); + json = read_json (daemon, request); - // send ack - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + /* send ack */ + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); ufo_task_graph_read_from_data (graph, priv->manager, json, &error); @@ -185,7 +207,7 @@ replicate_json_free: } static void -handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) +handle_stream_json (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv; gchar *json; @@ -196,12 +218,12 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) GError *error = NULL; priv = UFO_DAEMON_GET_PRIVATE (daemon); - json = read_json (daemon, msg); + json = read_json (daemon, request); /* send ack */ - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); /* Setup local task graph */ priv->task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); @@ -235,26 +257,26 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) } static void -handle_get_structure (UfoDaemon *daemon) +handle_get_structure (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoMessage *response; + UfoMessage *reply; /* TODO move into .h and share between daemon and remote-node */ struct _Structure { guint16 n_inputs; guint16 n_dims; - } msg_data; + } message_data; /* TODO don't hardcode these */ - msg_data.n_inputs = 1; - msg_data.n_dims = 2; + message_data.n_inputs = 1; + message_data.n_dims = 2; - response = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure)); - *(struct _Structure *) (response->data) = msg_data; + reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (struct _Structure)); + *(struct _Structure *) (reply->data) = message_data; - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); } static void @@ -292,13 +314,13 @@ handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); } static void -handle_get_requisition (UfoDaemon *daemon) +handle_get_requisition (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoRequisition requisition; @@ -307,14 +329,14 @@ handle_get_requisition (UfoDaemon *daemon) ufo_output_task_get_output_requisition (UFO_OUTPUT_TASK (priv->output_task), &requisition); - UfoMessage *msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition)); - memcpy (msg->data, &requisition, msg->data_size); - ufo_messenger_send_blocking (priv->msger, msg, NULL); - ufo_message_free (msg); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, sizeof (UfoRequisition)); + memcpy (reply->data, &requisition, reply->data_size); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); } static -void handle_get_result (UfoDaemon *daemon) +void handle_get_result (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); UfoBuffer *buffer; @@ -323,9 +345,9 @@ void handle_get_result (UfoDaemon *daemon) buffer = ufo_output_task_get_output_buffer (UFO_OUTPUT_TASK (priv->output_task)); size = ufo_buffer_get_size (buffer); - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, size); - memcpy (response->data, ufo_buffer_get_host_array (buffer, NULL), size); - ufo_messenger_send_blocking (priv->msger, response, NULL); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, size); + memcpy (reply->data, ufo_buffer_get_host_array (buffer, NULL), size); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); ufo_output_task_release_output_buffer (UFO_OUTPUT_TASK (priv->output_task), buffer); } @@ -339,7 +361,7 @@ unref_and_free (GObject **object) } static -void handle_cleanup (UfoDaemon *daemon) +void handle_cleanup (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); @@ -347,9 +369,9 @@ void handle_cleanup (UfoDaemon *daemon) * We send the ACK early on, because we don't want to let the host wait for * actually cleaning up (and waiting some time to unref the input task). */ - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0); + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); /* TODO: check that we don't need to execute this branch wen priv->input is null */ if (priv->input_task && priv->input) { @@ -365,24 +387,24 @@ void handle_cleanup (UfoDaemon *daemon) unref_and_free ((GObject **) &priv->output_task); unref_and_free ((GObject **) &priv->task_graph); - } static void -handle_terminate (UfoDaemon *daemon) +handle_terminate (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); - ufo_messenger_send_blocking (priv->msger, response, NULL); - ufo_message_free (response); + UfoMessage *reply = ufo_message_new (UFO_MESSAGE_ACK, 0); + + ufo_messenger_send_blocking (priv->messenger, reply, NULL); + ufo_message_free (reply); if (priv->scheduler_thread != NULL) { - g_message ("waiting for scheduler to finish"); + g_message ("Waiting for scheduler to finish ..."); g_thread_join (priv->scheduler_thread); - g_message ("scheduler finished!"); + g_message ("Done."); } - ufo_messenger_disconnect (priv->msger); + ufo_messenger_disconnect (priv->messenger); } static gpointer @@ -390,12 +412,11 @@ run_scheduler (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - g_message ("Start scheduler"); + g_message ("Run scheduler ..."); priv->scheduler = ufo_scheduler_new (); ufo_base_scheduler_set_arch (priv->scheduler, priv->arch); ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL); - - g_message ("Done"); + g_message ("Done."); g_object_unref (priv->scheduler); priv->scheduler = ufo_scheduler_new (); @@ -429,48 +450,24 @@ ufo_daemon_start_impl (UfoDaemon *daemon) while (wait_for_messages) { GError *err = NULL; - UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err); + UfoMessage *message = ufo_messenger_recv_blocking (priv->messenger, &err); if (err != NULL) { - /* If daemon is stopped, socket will be closed and msg_recv will + /* If daemon is stopped, socket will be closed and message_recv will * yield an error - we stop. */ wait_for_messages = FALSE; } else { - switch (msg->type) { - case UFO_MESSAGE_GET_NUM_DEVICES: - handle_get_num_devices (daemon); - break; - case UFO_MESSAGE_STREAM_JSON: - handle_stream_json (daemon, msg); - break; - case UFO_MESSAGE_REPLICATE_JSON: - handle_replicate_json (daemon, msg); - break; - case UFO_MESSAGE_GET_STRUCTURE: - handle_get_structure (daemon); - break; - case UFO_MESSAGE_SEND_INPUTS: - handle_send_inputs (daemon, msg); - break; - case UFO_MESSAGE_GET_REQUISITION: - handle_get_requisition (daemon); - break; - case UFO_MESSAGE_GET_RESULT: - handle_get_result (daemon); - break; - case UFO_MESSAGE_CLEANUP: - handle_cleanup (daemon); - break; - case UFO_MESSAGE_TERMINATE: - handle_terminate (daemon); - wait_for_messages = FALSE; - break; - default: - g_message ("Unknown message received\n"); - } + if (message->type >= UFO_MESSAGE_INVALID_REQUEST) + g_error ("Invalid request"); + else + handlers[message->type](daemon, message); + + if (message->type == UFO_MESSAGE_TERMINATE) + wait_for_messages = FALSE; } - ufo_message_free (msg); + + ufo_message_free (message); } /* tell calling thread we have stopped */ @@ -493,7 +490,7 @@ ufo_daemon_start (UfoDaemon *daemon, GError **error) return; } - ufo_messenger_connect (priv->msger, priv->listen_address, UFO_MESSENGER_SERVER, &tmp_error); + ufo_messenger_connect (priv->messenger, priv->listen_address, UFO_MESSENGER_SERVER, &tmp_error); if (tmp_error != NULL) { g_propagate_error (error, tmp_error); @@ -527,14 +524,14 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error) * - we thus send a TERMINATE message to that thread */ - UfoMessenger *tmp_msger; + UfoMessenger *tmp_messenger; #ifdef WITH_MPI - tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); + tmp_messenger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ - tmp_msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); + tmp_messenger = UFO_MESSENGER (ufo_zmq_messenger_new ()); #endif - ufo_messenger_connect (tmp_msger, priv->listen_address, UFO_MESSENGER_CLIENT, &tmp_error); + ufo_messenger_connect (tmp_messenger, priv->listen_address, UFO_MESSENGER_CLIENT, &tmp_error); if (tmp_error != NULL) { g_propagate_error (error, tmp_error); @@ -542,7 +539,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error) } UfoMessage *request = ufo_message_new (UFO_MESSAGE_TERMINATE, 0); - ufo_messenger_send_blocking (tmp_msger, request, NULL); + ufo_messenger_send_blocking (tmp_messenger, request, NULL); g_thread_join (priv->thread); @@ -576,8 +573,8 @@ ufo_daemon_dispose (GObject *object) if (priv->task_graph) g_object_unref (priv->task_graph); - if (priv->msger != NULL) - g_object_unref (priv->msger); + if (priv->messenger != NULL) + g_object_unref (priv->messenger); if (priv->manager != NULL) g_object_unref (priv->manager); |