diff options
author | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2015-04-09 15:54:11 +0200 |
---|---|---|
committer | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2015-04-21 09:32:50 +0200 |
commit | b0174d6770a03e77af7682eca91613b197eb6f73 (patch) | |
tree | 60d70329d2a69a3c58d3af7c50a30fd4b82f4b89 | |
parent | cc0fd83d80acb52df30fa7b90ee475f232c6c01d (diff) |
Fix #88: communication broken
No better summary line, too many problems ...
-rw-r--r-- | ufo/ufo-daemon.c | 171 | ||||
-rw-r--r-- | ufo/ufo-messenger-iface.c | 1 | ||||
-rw-r--r-- | ufo/ufo-remote-node.c | 7 | ||||
-rw-r--r-- | ufo/ufo-task-graph.c | 22 | ||||
-rw-r--r-- | ufo/ufo-zmq-messenger.c | 75 |
5 files changed, 124 insertions, 152 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index d982131..f2d0602 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -51,27 +51,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT) #define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate)) struct _UfoDaemonPrivate { - UfoPluginManager *manager; - UfoTaskGraph *task_graph; - UfoBaseScheduler *scheduler; - GThread *scheduler_thread; - UfoMessenger *msger; - - UfoNode **input_tasks; - UfoNode *output_task; - UfoBuffer **inputs; - - gchar *listen_address; - gpointer socket; - GThread *thread; - GMutex *startstop_lock; - GMutex *started_lock; - GMutex *stopped_lock; - GCond *started_cond; - GCond *stopped_cond; - gboolean has_started; - gboolean has_stopped; - guint num_inputs; + UfoPluginManager *manager; + UfoResources *resources; + UfoArchGraph *arch; + UfoTaskGraph *task_graph; + UfoBaseScheduler *scheduler; + GThread *scheduler_thread; + gpointer socket; + UfoNode *input_task; + UfoNode *output_task; + UfoBuffer *input; + cl_context context; + gchar *listen_address; + GThread *thread; + GMutex *startstop_lock; + GMutex *started_lock; + GMutex *stopped_lock; + gboolean has_started; + gboolean has_stopped; + GCond *started_cond; + GCond *stopped_cond; + UfoMessenger *msger; }; static gpointer run_scheduler (UfoDaemon *daemon); @@ -79,24 +79,23 @@ static gpointer run_scheduler (UfoDaemon *daemon); UfoDaemon * ufo_daemon_new (const gchar *listen_address) { + UfoDaemonPrivate *priv; UfoDaemon *daemon; g_return_val_if_fail (listen_address != NULL, NULL); daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL)); + priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); priv->listen_address = g_strdup (listen_address); priv->manager = ufo_plugin_manager_new (); - priv->scheduler = ufo_scheduler_new (); + #ifdef WITH_MPI priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); -#else - /* TODO: we should return a GError in the constructor */ - g_warning ("No messenger backend available!"); #endif + return daemon; } @@ -105,14 +104,12 @@ handle_get_num_devices (UfoDaemon *daemon) { UfoDaemonPrivate *priv; UfoMessage *msg; - UfoResources *resources; cl_uint num_devices; cl_context context; priv = UFO_DAEMON_GET_PRIVATE (daemon); msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); - resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler)); - context = ufo_resources_get_context (resources); + context = ufo_resources_get_context (priv->resources); UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL)); @@ -201,7 +198,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) priv = UFO_DAEMON_GET_PRIVATE (daemon); json = read_json (daemon, msg); - /* Send ack */ + /* send ack */ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); @@ -227,22 +224,11 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first); - priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first)); - priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode)); - priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer)); + priv->input_task = ufo_input_task_new (); priv->output_task = ufo_output_task_new (2); - for (guint i = 0; i < priv->num_inputs; i++) { - priv->input_tasks[i] = ufo_input_task_new(); - ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), - priv->input_tasks[i], first, - GINT_TO_POINTER (i)); - } - - ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), - last, priv->output_task, - GINT_TO_POINTER (0)); - + ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), priv->input_task, first, GINT_TO_POINTER (0)); + ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), last, priv->output_task, GINT_TO_POINTER (0)); priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL); g_free (json); @@ -275,43 +261,36 @@ static void handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv; - UfoArchGraph *arch; - UfoResources *resources; UfoRequisition requisition; gpointer context; priv = UFO_DAEMON_GET_PRIVATE (daemon); + context = ufo_resources_get_context (priv->resources); - /* TODO: We should store the context */ - arch = ufo_base_scheduler_get_arch (priv->scheduler); - resources = ufo_arch_graph_get_resources (arch); - context = ufo_resources_get_context (resources); - - struct _Header { + struct Header { UfoRequisition requisition; guint64 buffer_size; }; char *base = request->data; - struct _Header *header = (struct _Header *) base; + struct Header *header = (struct Header *) base; + /* Receive buffer size */ requisition = header->requisition; - for (guint i = 0; i < priv->num_inputs; i++) { - if (priv->inputs[i] == NULL) { - priv->inputs[i] = ufo_buffer_new (&requisition, context); - } - else { - if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition)) - ufo_buffer_resize (priv->inputs[i], &requisition); - } + if (priv->input == NULL) { + priv->input = ufo_buffer_new (&requisition, context); + } + else { + if (ufo_buffer_cmp_dimensions (priv->input, &requisition)) + ufo_buffer_resize (priv->input, &requisition); + } - memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL), - base + sizeof (struct _Header), - ufo_buffer_get_size (priv->inputs[i])); + memcpy (ufo_buffer_get_host_array (priv->input, NULL), + base + sizeof (struct Header), + ufo_buffer_get_size (priv->input)); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]); - } + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); @@ -372,21 +351,21 @@ void handle_cleanup (UfoDaemon *daemon) ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); - /* TODO check that we don't need to execture this branch wen priv->input is null */ - for (guint i = 0; i < priv->num_inputs; i++) { - if (priv->input_tasks[i] && priv->inputs[i]) { - ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i])); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), - priv->inputs[i]); + /* TODO: check that we don't need to execute this branch wen priv->input is null */ + if (priv->input_task && priv->input) { + ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task)); - g_usleep (1.5 * G_USEC_PER_SEC); - unref_and_free ((GObject **) &priv->input_tasks[i]); - unref_and_free ((GObject **) &priv->inputs[i]); - } + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), + priv->input); + + g_usleep (1.5 * G_USEC_PER_SEC); + unref_and_free ((GObject **) &priv->input_task); + unref_and_free ((GObject **) &priv->input); } unref_and_free ((GObject **) &priv->output_task); unref_and_free ((GObject **) &priv->task_graph); + } static void @@ -397,7 +376,7 @@ handle_terminate (UfoDaemon *daemon) ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); - if(priv->scheduler_thread != NULL) { + if (priv->scheduler_thread != NULL) { g_message ("waiting for scheduler to finish"); g_thread_join (priv->scheduler_thread); g_message ("scheduler finished!"); @@ -410,19 +389,35 @@ static gpointer run_scheduler (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + + g_message ("Start scheduler"); + priv->scheduler = ufo_scheduler_new (); + ufo_base_scheduler_set_arch (priv->scheduler, priv->arch); ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL); - g_object_unref (priv->scheduler); + g_message ("Done"); + g_object_unref (priv->scheduler); priv->scheduler = ufo_scheduler_new (); + return NULL; } static void ufo_daemon_start_impl (UfoDaemon *daemon) { - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - g_debug ("UfoDaemon started on address %s", priv->listen_address); + UfoDaemonPrivate *priv; + GError *error = NULL; + + priv = UFO_DAEMON_GET_PRIVATE (daemon); + priv->resources = ufo_resources_new (&error); + + if (error != NULL) { + g_warning ("%s\n", error->message); + return; + } + + priv->arch = UFO_ARCH_GRAPH (ufo_arch_graph_new (priv->resources, NULL)); /* tell the calling thread that we have started */ g_mutex_lock (priv->started_lock); @@ -431,16 +426,17 @@ ufo_daemon_start_impl (UfoDaemon *daemon) g_mutex_unlock (priv->started_lock); gboolean wait_for_messages = TRUE; - while (wait_for_messages) { + while (wait_for_messages) { GError *err = NULL; UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err); + if (err != NULL) { - /* if daemon is stopped, socket will be closed and msg_recv - * will yield an error - we stop - */ + /* If daemon is stopped, socket will be closed and msg_recv will + * yield an error - we stop. */ wait_for_messages = FALSE; - } else { + } + else { switch (msg->type) { case UFO_MESSAGE_GET_NUM_DEVICES: handle_get_num_devices (daemon); @@ -531,8 +527,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error) * - we thus send a TERMINATE message to that thread */ - UfoMessenger *tmp_msger = NULL; - + UfoMessenger *tmp_msger; #ifdef WITH_MPI tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ @@ -621,6 +616,8 @@ ufo_daemon_init (UfoDaemon *self) { UfoDaemonPrivate *priv; self->priv = priv = UFO_DAEMON_GET_PRIVATE (self); + + priv->scheduler = NULL; priv->startstop_lock = g_mutex_new (); priv->started_lock = g_mutex_new (); priv->stopped_lock = g_mutex_new (); diff --git a/ufo/ufo-messenger-iface.c b/ufo/ufo-messenger-iface.c index b02f584..e34d701 100644 --- a/ufo/ufo-messenger-iface.c +++ b/ufo/ufo-messenger-iface.c @@ -43,6 +43,7 @@ ufo_message_new (UfoMessageType type, guint64 data_size) UfoMessage *msg = g_malloc (sizeof (UfoMessage)); msg->type = type; msg->data_size = data_size; + if (data_size == 0) msg->data = NULL; else diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c index 9f24ae2..13b998e 100644 --- a/ufo/ufo-remote-node.c +++ b/ufo/ufo-remote-node.c @@ -193,7 +193,7 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node, // determine our total message size guint64 size = 0; - + for (guint i = 0; i < priv->n_inputs; i++) { guint64 buffer_size = ufo_buffer_get_size (inputs[i]); size += buffer_size; @@ -219,7 +219,6 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node, request->data = buffer; // send as a single message ufo_messenger_send_blocking (priv->msger, request, NULL); - } void @@ -291,7 +290,7 @@ ufo_remote_node_terminate (UfoRemoteNode *node) ufo_messenger_send_blocking (priv->msger, request, NULL); ufo_messenger_disconnect (priv->msger); - return; + return; } static void @@ -330,6 +329,6 @@ ufo_remote_node_init (UfoRemoteNode *self) { UfoRemoteNodePrivate *priv; self->priv = priv = UFO_REMOTE_NODE_GET_PRIVATE (self); - priv->n_inputs = 0; + priv->n_inputs = 1; priv->terminated = FALSE; } diff --git a/ufo/ufo-task-graph.c b/ufo/ufo-task-graph.c index 4aad7a1..b2f7fbc 100644 --- a/ufo/ufo-task-graph.c +++ b/ufo/ufo-task-graph.c @@ -18,6 +18,7 @@ */ #include <string.h> +#include <stdio.h> #include <json-glib/json-glib.h> #include <ufo/ufo-task-graph.h> #include <ufo/ufo-task-node.h> @@ -336,35 +337,25 @@ build_remote_graph (UfoTaskGraph *remote_graph, static void create_remote_tasks (UfoTaskGraph *task_graph, UfoTaskGraph *remote_graph, - GList *first, + UfoTaskNode *first, UfoTaskNode *last, UfoRemoteNode *remote) { UfoTaskGraphPrivate *priv; UfoTaskNode *task; gchar *json; - GList *pred; - guint port = 0; - priv = task_graph->priv; json = ufo_task_graph_get_json_data (remote_graph, NULL); + priv = task_graph->priv; ufo_remote_node_send_json (remote, UFO_REMOTE_MODE_STREAM, json); task = UFO_TASK_NODE (ufo_remote_task_new ()); priv->remote_tasks = g_list_append (priv->remote_tasks, task); ufo_task_node_set_proc_node (task, UFO_NODE (remote)); - /* Setting the remote's # of inputs to the # of inputs of the corresponding - * task */ - GList *roots = ufo_graph_get_roots (UFO_GRAPH (remote_graph)); - guint root_num_inputs = ufo_task_get_num_inputs (UFO_TASK (roots->data)); - ufo_remote_node_set_num_inputs (remote, root_num_inputs); - - g_list_for (first->data, pred) { - ufo_task_graph_connect_nodes_full (task_graph, pred->data, task, port++); - } - + ufo_task_graph_connect_nodes (task_graph, first, task); ufo_task_graph_connect_nodes (task_graph, task, last); + g_free (json); } @@ -381,6 +372,7 @@ expand_remotes (UfoTaskGraph *task_graph, first = g_list_first (path); last = g_list_last (path); + remote_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); node = build_remote_graph (remote_graph, first, last); @@ -392,7 +384,7 @@ expand_remotes (UfoTaskGraph *task_graph, g_list_for (remotes, it) { create_remote_tasks (task_graph, remote_graph, - first, last->data, it->data); + first->data, last->data, it->data); } g_object_unref (remote_graph); diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c index ec86e0c..da87193 100644 --- a/ufo/ufo-zmq-messenger.c +++ b/ufo/ufo-zmq-messenger.c @@ -40,11 +40,12 @@ struct _UfoZmqMessengerPrivate { UfoMessengerRole role; }; -/* C99 allows flexible length structs that we use to map -* arbitrary frame lengths that are transferred via zmq. -* Note: Sizes of datatypes should be fixed and equal on all plattforms -* (i.e. don't use a gsize as it has different size on x86 & x86_64) -*/ +/* + * C99 allows flexible length structs that we use to map arbitrary frame lengths + * that are transferred via zmq. Note: Sizes of datatypes should be fixed and + * equal on all plattforms (i.e. don't use a gsize as it has different size on + * x86 & x86_64). + */ typedef struct _DataFrame { UfoMessageType type; guint64 data_size; @@ -137,7 +138,7 @@ ufo_zmq_messenger_disconnect (UfoMessenger *msger) zmq_close (priv->zmq_socket); priv->zmq_socket = NULL; - // waits for outstanding messages to be flushed + /* waits for outstanding messages to be flushed */ zmq_term (priv->zmq_ctx); g_free (priv->remote_addr); } @@ -146,14 +147,6 @@ ufo_zmq_messenger_disconnect (UfoMessenger *msger) return; } -/** - * ufo_zmq_messenger_send_blocking: (skip) - * @msger: #UfoMessenger - * @request_msg: Request message - * @error: Location for an error or %NULL - * - * Send message in blocking way. - */ static UfoMessage * ufo_zmq_messenger_send_blocking (UfoMessenger *msger, UfoMessage *request_msg, @@ -188,21 +181,23 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger, goto finalize; } - /* if this is an ACK message, don't expect a response - * (send_blocking is then most likely being called by the server) + /* + * If this is an ACK message, don't expect a response (send_blocking is then + * most likely being called by the server). */ - if (request_msg->type == UFO_MESSAGE_ACK) { + if (request_msg->type == UFO_MESSAGE_ACK) goto finalize; - } - /* we always need to receive as response as ZMQ - * requires REQ/REP/REQ/REP/... scheme + /* + * We always need to receive as response as ZMQ requires REQ/REP/REQ/REP/... + * scheme. */ zmq_msg_t reply; zmq_msg_init (&reply); err = zmq_msg_recv (&reply, priv->zmq_socket, 0); gint size = zmq_msg_size (&reply); + if (err < 0) { g_set_error (error, ufo_messenger_error_quark (), zmq_errno(), "Could not receive from %s: %s ", priv->remote_addr, @@ -223,36 +218,26 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger, UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size); memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size); - //if (frame->type != 5 && frame->type != 7 && frame->type!=8) - //g_message ("Type: %i \tData_size: %i",(int) frame->type, (int) resp_frame->data_size); - zmq_msg_close (&reply); result = reply_msg; - goto finalize; - - finalize: - g_mutex_unlock (priv->mutex); - return result; +finalize: + g_mutex_unlock (priv->mutex); + return result; } -/** - * ufo_zmq_messenger_recv_blocking: (skip) - * @msger: A #UfoMessenger - * @error: Location for an error or %NULL - * - * Receive message in blocking way. - */ static UfoMessage * ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, GError **error) { - UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); + UfoZmqMessengerPrivate *priv; + UfoMessage *result = NULL; + + priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger); g_assert (priv->role == UFO_MESSENGER_SERVER); g_mutex_lock (priv->mutex); - UfoMessage *result = NULL; zmq_msg_t reply; zmq_msg_init (&reply); gint err = zmq_msg_recv (&reply, priv->zmq_socket, 0); @@ -268,7 +253,8 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, DataFrame *frame = zmq_msg_data (&reply); guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size); - if ((guint)size != expected_size) { + + if ((guint) size != expected_size) { g_set_error (error, ufo_messenger_error_quark(), UFO_MESSENGER_SIZE_MISSMATCH, "Received unexpected frame size: %d, should be: %d", @@ -276,16 +262,13 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, goto finalize; } - UfoMessage *msg = ufo_message_new (frame->type, frame->data_size); - memcpy (msg->data, frame->data, frame->data_size); - + result = ufo_message_new (frame->type, frame->data_size); + memcpy (result->data, frame->data, frame->data_size); zmq_msg_close (&reply); - result = msg; - goto finalize; - finalize: - g_mutex_unlock (priv->mutex); - return result; +finalize: + g_mutex_unlock (priv->mutex); + return result; } static void |