diff options
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r-- | ufo/ufo-daemon.c | 171 |
1 files changed, 84 insertions, 87 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index d982131..f2d0602 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -51,27 +51,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT) #define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate)) struct _UfoDaemonPrivate { - UfoPluginManager *manager; - UfoTaskGraph *task_graph; - UfoBaseScheduler *scheduler; - GThread *scheduler_thread; - UfoMessenger *msger; - - UfoNode **input_tasks; - UfoNode *output_task; - UfoBuffer **inputs; - - gchar *listen_address; - gpointer socket; - GThread *thread; - GMutex *startstop_lock; - GMutex *started_lock; - GMutex *stopped_lock; - GCond *started_cond; - GCond *stopped_cond; - gboolean has_started; - gboolean has_stopped; - guint num_inputs; + UfoPluginManager *manager; + UfoResources *resources; + UfoArchGraph *arch; + UfoTaskGraph *task_graph; + UfoBaseScheduler *scheduler; + GThread *scheduler_thread; + gpointer socket; + UfoNode *input_task; + UfoNode *output_task; + UfoBuffer *input; + cl_context context; + gchar *listen_address; + GThread *thread; + GMutex *startstop_lock; + GMutex *started_lock; + GMutex *stopped_lock; + gboolean has_started; + gboolean has_stopped; + GCond *started_cond; + GCond *stopped_cond; + UfoMessenger *msger; }; static gpointer run_scheduler (UfoDaemon *daemon); @@ -79,24 +79,23 @@ static gpointer run_scheduler (UfoDaemon *daemon); UfoDaemon * ufo_daemon_new (const gchar *listen_address) { + UfoDaemonPrivate *priv; UfoDaemon *daemon; g_return_val_if_fail (listen_address != NULL, NULL); daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL)); + priv = UFO_DAEMON_GET_PRIVATE (daemon); - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); priv->listen_address = g_strdup (listen_address); priv->manager = ufo_plugin_manager_new (); - priv->scheduler = ufo_scheduler_new (); + #ifdef WITH_MPI priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ()); -#else - /* TODO: we should return a GError in the constructor */ - g_warning ("No messenger backend available!"); #endif + return daemon; } @@ -105,14 +104,12 @@ handle_get_num_devices (UfoDaemon *daemon) { UfoDaemonPrivate *priv; UfoMessage *msg; - UfoResources *resources; cl_uint num_devices; cl_context context; priv = UFO_DAEMON_GET_PRIVATE (daemon); msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); - resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler)); - context = ufo_resources_get_context (resources); + context = ufo_resources_get_context (priv->resources); UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL)); @@ -201,7 +198,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) priv = UFO_DAEMON_GET_PRIVATE (daemon); json = read_json (daemon, msg); - /* Send ack */ + /* send ack */ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); @@ -227,22 +224,11 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first); - priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first)); - priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode)); - priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer)); + priv->input_task = ufo_input_task_new (); priv->output_task = ufo_output_task_new (2); - for (guint i = 0; i < priv->num_inputs; i++) { - priv->input_tasks[i] = ufo_input_task_new(); - ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), - priv->input_tasks[i], first, - GINT_TO_POINTER (i)); - } - - ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), - last, priv->output_task, - GINT_TO_POINTER (0)); - + ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), priv->input_task, first, GINT_TO_POINTER (0)); + ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), last, priv->output_task, GINT_TO_POINTER (0)); priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL); g_free (json); @@ -275,43 +261,36 @@ static void handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) { UfoDaemonPrivate *priv; - UfoArchGraph *arch; - UfoResources *resources; UfoRequisition requisition; gpointer context; priv = UFO_DAEMON_GET_PRIVATE (daemon); + context = ufo_resources_get_context (priv->resources); - /* TODO: We should store the context */ - arch = ufo_base_scheduler_get_arch (priv->scheduler); - resources = ufo_arch_graph_get_resources (arch); - context = ufo_resources_get_context (resources); - - struct _Header { + struct Header { UfoRequisition requisition; guint64 buffer_size; }; char *base = request->data; - struct _Header *header = (struct _Header *) base; + struct Header *header = (struct Header *) base; + /* Receive buffer size */ requisition = header->requisition; - for (guint i = 0; i < priv->num_inputs; i++) { - if (priv->inputs[i] == NULL) { - priv->inputs[i] = ufo_buffer_new (&requisition, context); - } - else { - if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition)) - ufo_buffer_resize (priv->inputs[i], &requisition); - } + if (priv->input == NULL) { + priv->input = ufo_buffer_new (&requisition, context); + } + else { + if (ufo_buffer_cmp_dimensions (priv->input, &requisition)) + ufo_buffer_resize (priv->input, &requisition); + } - memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL), - base + sizeof (struct _Header), - ufo_buffer_get_size (priv->inputs[i])); + memcpy (ufo_buffer_get_host_array (priv->input, NULL), + base + sizeof (struct Header), + ufo_buffer_get_size (priv->input)); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]); - } + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); @@ -372,21 +351,21 @@ void handle_cleanup (UfoDaemon *daemon) ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); - /* TODO check that we don't need to execture this branch wen priv->input is null */ - for (guint i = 0; i < priv->num_inputs; i++) { - if (priv->input_tasks[i] && priv->inputs[i]) { - ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i])); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), - priv->inputs[i]); + /* TODO: check that we don't need to execute this branch wen priv->input is null */ + if (priv->input_task && priv->input) { + ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task)); - g_usleep (1.5 * G_USEC_PER_SEC); - unref_and_free ((GObject **) &priv->input_tasks[i]); - unref_and_free ((GObject **) &priv->inputs[i]); - } + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), + priv->input); + + g_usleep (1.5 * G_USEC_PER_SEC); + unref_and_free ((GObject **) &priv->input_task); + unref_and_free ((GObject **) &priv->input); } unref_and_free ((GObject **) &priv->output_task); unref_and_free ((GObject **) &priv->task_graph); + } static void @@ -397,7 +376,7 @@ handle_terminate (UfoDaemon *daemon) ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); - if(priv->scheduler_thread != NULL) { + if (priv->scheduler_thread != NULL) { g_message ("waiting for scheduler to finish"); g_thread_join (priv->scheduler_thread); g_message ("scheduler finished!"); @@ -410,19 +389,35 @@ static gpointer run_scheduler (UfoDaemon *daemon) { UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); + + g_message ("Start scheduler"); + priv->scheduler = ufo_scheduler_new (); + ufo_base_scheduler_set_arch (priv->scheduler, priv->arch); ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL); - g_object_unref (priv->scheduler); + g_message ("Done"); + g_object_unref (priv->scheduler); priv->scheduler = ufo_scheduler_new (); + return NULL; } static void ufo_daemon_start_impl (UfoDaemon *daemon) { - UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); - g_debug ("UfoDaemon started on address %s", priv->listen_address); + UfoDaemonPrivate *priv; + GError *error = NULL; + + priv = UFO_DAEMON_GET_PRIVATE (daemon); + priv->resources = ufo_resources_new (&error); + + if (error != NULL) { + g_warning ("%s\n", error->message); + return; + } + + priv->arch = UFO_ARCH_GRAPH (ufo_arch_graph_new (priv->resources, NULL)); /* tell the calling thread that we have started */ g_mutex_lock (priv->started_lock); @@ -431,16 +426,17 @@ ufo_daemon_start_impl (UfoDaemon *daemon) g_mutex_unlock (priv->started_lock); gboolean wait_for_messages = TRUE; - while (wait_for_messages) { + while (wait_for_messages) { GError *err = NULL; UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err); + if (err != NULL) { - /* if daemon is stopped, socket will be closed and msg_recv - * will yield an error - we stop - */ + /* If daemon is stopped, socket will be closed and msg_recv will + * yield an error - we stop. */ wait_for_messages = FALSE; - } else { + } + else { switch (msg->type) { case UFO_MESSAGE_GET_NUM_DEVICES: handle_get_num_devices (daemon); @@ -531,8 +527,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error) * - we thus send a TERMINATE message to that thread */ - UfoMessenger *tmp_msger = NULL; - + UfoMessenger *tmp_msger; #ifdef WITH_MPI tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); #elif WITH_ZMQ @@ -621,6 +616,8 @@ ufo_daemon_init (UfoDaemon *self) { UfoDaemonPrivate *priv; self->priv = priv = UFO_DAEMON_GET_PRIVATE (self); + + priv->scheduler = NULL; priv->startstop_lock = g_mutex_new (); priv->started_lock = g_mutex_new (); priv->stopped_lock = g_mutex_new (); |