diff options
author | Sven Werchner <Sven.Werchner@t-online.de> | 2014-08-20 12:02:19 +0200 |
---|---|---|
committer | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2015-02-11 09:11:46 +0100 |
commit | cc965010374473cce62ab99a1fcce3189bac522a (patch) | |
tree | cc3338a6dbbabae647f9212ce8e8c766f150edbc | |
parent | 586d981a453d405758c7d52006ad3f9aeecac455 (diff) |
Allow multi input tasks for remote processing
-rw-r--r-- | ufo/ufo-daemon.c | 116 | ||||
-rw-r--r-- | ufo/ufo-graph.c | 2 | ||||
-rw-r--r-- | ufo/ufo-remote-node.c | 12 | ||||
-rw-r--r-- | ufo/ufo-remote-node.h | 2 | ||||
-rw-r--r-- | ufo/ufo-scheduler.c | 18 | ||||
-rw-r--r-- | ufo/ufo-task-graph.c | 24 | ||||
-rw-r--r-- | ufo/ufo-zmq-messenger.c | 4 |
7 files changed, 103 insertions, 75 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c index d278137..f624d58 100644 --- a/ufo/ufo-daemon.c +++ b/ufo/ufo-daemon.c @@ -46,25 +46,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT) #define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate)) struct _UfoDaemonPrivate { - UfoPluginManager *manager; - UfoTaskGraph *task_graph; - UfoBaseScheduler *scheduler; - GThread *scheduler_thread; - gpointer socket; - UfoNode *input_task; - UfoNode *output_task; - UfoBuffer *input; - cl_context context; - gchar *listen_address; - GThread *thread; - GMutex *startstop_lock; - GMutex *started_lock; - GMutex *stopped_lock; - gboolean has_started; - gboolean has_stopped; - GCond *started_cond; - GCond *stopped_cond; - UfoMessenger *msger; + UfoPluginManager *manager; + UfoTaskGraph *task_graph; + UfoBaseScheduler *scheduler; + GThread *scheduler_thread; + UfoMessenger *msger; + + UfoNode **input_tasks; + UfoNode *output_task; + UfoBuffer **inputs; + + gchar *listen_address; + gpointer socket; + GThread *thread; + GMutex *startstop_lock; + GMutex *started_lock; + GMutex *stopped_lock; + GCond *started_cond; + GCond *stopped_cond; + gboolean has_started; + gboolean has_stopped; + guint num_inputs; }; static gpointer run_scheduler (UfoDaemon *daemon); @@ -96,22 +98,17 @@ handle_get_num_devices (UfoDaemon *daemon) UfoDaemonPrivate *priv; UfoMessage *msg; UfoResources *resources; - cl_uint *num_devices; + cl_uint num_devices; cl_context context; priv = UFO_DAEMON_GET_PRIVATE (daemon); msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16)); - num_devices = g_malloc (sizeof (cl_uint)); resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler)); context = ufo_resources_get_context (resources); - UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, - CL_CONTEXT_NUM_DEVICES, - sizeof (cl_uint), - num_devices, - NULL)); + UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL)); - *(guint16 *) msg->data = (guint16) *num_devices; + *(guint16 *) msg->data = (guint16) num_devices; ufo_messenger_send_blocking (priv->msger, msg, 0); ufo_message_free (msg); @@ -196,7 +193,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) priv = UFO_DAEMON_GET_PRIVATE (daemon); json = read_json (daemon, msg); - // send ack + /* Send ack */ UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); @@ -222,17 +219,23 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg) first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first); - priv->input_task = ufo_input_task_new (); + priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first)); + priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode)); + priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer)); priv->output_task = ufo_output_task_new (2); - ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), - priv->input_task, first, - GINT_TO_POINTER (0)); + for (guint i = 0; i < priv->num_inputs; i++) { + priv->input_tasks[i] = ufo_input_task_new(); + ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), + priv->input_tasks[i], first, + GINT_TO_POINTER (i)); + } ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), last, priv->output_task, GINT_TO_POINTER (0)); + priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL); g_free (json); } @@ -284,22 +287,23 @@ handle_send_inputs (UfoDaemon *daemon, UfoMessage *request) char *base = request->data; struct _Header *header = (struct _Header *) base; - /* Receive buffer size */ requisition = header->requisition; - if (priv->input == NULL) { - priv->input = ufo_buffer_new (&requisition, context); - } - else { - if (ufo_buffer_cmp_dimensions (priv->input, &requisition)) - ufo_buffer_resize (priv->input, &requisition); - } + for (guint i = 0; i < priv->num_inputs; i++) { + if (priv->inputs[i] == NULL) { + priv->inputs[i] = ufo_buffer_new (&requisition, context); + } + else { + if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition)) + ufo_buffer_resize (priv->inputs[i], &requisition); + } - memcpy (ufo_buffer_get_host_array (priv->input, NULL), - base + sizeof (struct _Header), - ufo_buffer_get_size (priv->input)); + memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL), + base + sizeof (struct _Header), + ufo_buffer_get_size (priv->inputs[i])); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input); + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]); + } UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0); ufo_messenger_send_blocking (priv->msger, response, NULL); @@ -360,21 +364,21 @@ void handle_cleanup (UfoDaemon *daemon) ufo_messenger_send_blocking (priv->msger, response, NULL); ufo_message_free (response); - // TODO check that we don't need to execture this branch wen priv->input is null - if (priv->input_task && priv->input) { - ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task)); + /* TODO check that we don't need to execture this branch wen priv->input is null */ + for (guint i = 0; i < priv->num_inputs; i++) { + if (priv->input_tasks[i] && priv->inputs[i]) { + ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i])); + ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), + priv->inputs[i]); - ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), - priv->input); - - g_usleep (1.5 * G_USEC_PER_SEC); - unref_and_free ((GObject **) &priv->input_task); - unref_and_free ((GObject **) &priv->input); + g_usleep (1.5 * G_USEC_PER_SEC); + unref_and_free ((GObject **) &priv->input_tasks[i]); + unref_and_free ((GObject **) &priv->inputs[i]); + } } unref_and_free ((GObject **) &priv->output_task); unref_and_free ((GObject **) &priv->task_graph); - } static void @@ -412,7 +416,7 @@ ufo_daemon_start_impl (UfoDaemon *daemon) UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon); g_debug ("UfoDaemon started on address %s", priv->listen_address); - // tell the calling thread that we have started + /* tell the calling thread that we have started */ g_mutex_lock (priv->started_lock); priv->has_started = TRUE; g_cond_signal (priv->started_cond); @@ -465,7 +469,7 @@ ufo_daemon_start_impl (UfoDaemon *daemon) ufo_message_free (msg); } - // tell calling thread we have stopped + /* tell calling thread we have stopped */ g_mutex_lock (priv->stopped_lock); priv->has_stopped = TRUE; g_cond_signal (priv->stopped_cond); diff --git a/ufo/ufo-graph.c b/ufo/ufo-graph.c index 5a7d53d..ff5b4e7 100644 --- a/ufo/ufo-graph.c +++ b/ufo/ufo-graph.c @@ -220,7 +220,7 @@ ufo_graph_get_nodes_filtered (UfoGraph *graph, g_list_for (priv->nodes, it) { UfoNode *node = UFO_NODE (it->data); - if (func (node, user_data)) + if (func (node, user_data)) result = g_list_append (result, node); } diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c index 8ad525a..1eaf508 100644 --- a/ufo/ufo-remote-node.c +++ b/ufo/ufo-remote-node.c @@ -139,7 +139,13 @@ ufo_remote_node_send_json (UfoRemoteNode *node, guint ufo_remote_node_get_num_inputs (UfoRemoteNode *node) { - return 1; + return node->priv->n_inputs; +} + +void +ufo_remote_node_set_num_inputs (UfoRemoteNode *node, guint n_inputs) +{ + node->priv->n_inputs = n_inputs; } guint @@ -178,7 +184,7 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node, // determine our total message size guint64 size = 0; - + for (guint i = 0; i < priv->n_inputs; i++) { guint64 buffer_size = ufo_buffer_get_size (inputs[i]); size += buffer_size; @@ -315,6 +321,6 @@ ufo_remote_node_init (UfoRemoteNode *self) { UfoRemoteNodePrivate *priv; self->priv = priv = UFO_REMOTE_NODE_GET_PRIVATE (self); - priv->n_inputs = ufo_remote_node_get_num_inputs (self); + priv->n_inputs = 0; priv->terminated = FALSE; } diff --git a/ufo/ufo-remote-node.h b/ufo/ufo-remote-node.h index f661804..0d67693 100644 --- a/ufo/ufo-remote-node.h +++ b/ufo/ufo-remote-node.h @@ -86,6 +86,8 @@ void ufo_remote_node_send_json (UfoRemoteNode *node, UfoRemoteMode mode, const gchar *json); guint ufo_remote_node_get_num_inputs (UfoRemoteNode *node); +void ufo_remote_node_set_num_inputs (UfoRemoteNode *node, + guint n_inputs); guint ufo_remote_node_get_num_dimensions (UfoRemoteNode *node, guint input); UfoTaskMode ufo_remote_node_get_mode (UfoRemoteNode *node); diff --git a/ufo/ufo-scheduler.c b/ufo/ufo-scheduler.c index 687551c..3cfc572 100644 --- a/ufo/ufo-scheduler.c +++ b/ufo/ufo-scheduler.c @@ -169,9 +169,7 @@ run_remote_task (TaskLocalData *tld) guint n_remote_gpus; gboolean *alive; gboolean active = TRUE; - - g_assert (tld->n_inputs == 1); - + remote = UFO_REMOTE_NODE (ufo_task_node_get_proc_node (UFO_TASK_NODE (tld->task))); n_remote_gpus = ufo_remote_node_get_num_gpus (remote); alive = g_new0 (gboolean, n_remote_gpus); @@ -410,6 +408,7 @@ setup_tasks (UfoBaseScheduler *scheduler, tlds = g_new0 (TaskLocalData *, n_nodes); + for (guint i = 0; i < n_nodes; i++) { UfoNode *node; UfoProfiler *profiler; @@ -431,16 +430,18 @@ setup_tasks (UfoBaseScheduler *scheduler, for (guint j = 0; j < tld->n_inputs; j++) tld->dims[j] = ufo_task_get_num_dimensions (tld->task, j); - if (!check_target_connections (task_graph, node, tld->n_inputs, error)) + if (!check_target_connections (task_graph, node, tld->n_inputs, error)) { return NULL; + } profiler = ufo_task_node_get_profiler (UFO_TASK_NODE (node)); ufo_profiler_enable_tracing (profiler, tracing_enabled); tld->finished = g_new0 (gboolean, tld->n_inputs); - if (error && *error != NULL) + if (error && *error != NULL) { return NULL; + } } g_list_free (nodes); @@ -582,7 +583,7 @@ propagate_partition (UfoTaskGraph *graph) static void join_threads (GThread **threads, guint n_threads) { - for (guint i = 0; i < n_threads; i++) + for (guint i = 0; i < n_threads; i++) g_thread_join (threads[i]); } @@ -624,7 +625,7 @@ ufo_scheduler_run (UfoBaseScheduler *scheduler, graph = task_graph; } - if (graph == NULL) + if (graph == NULL) return; arch = ufo_base_scheduler_get_arch (scheduler); @@ -645,8 +646,9 @@ ufo_scheduler_run (UfoBaseScheduler *scheduler, /* Prepare task structures */ tlds = setup_tasks (scheduler, graph, error); - if (tlds == NULL) + if (tlds == NULL) { return; + } groups = setup_groups (scheduler, graph); diff --git a/ufo/ufo-task-graph.c b/ufo/ufo-task-graph.c index 57fae05..64d0ea0 100644 --- a/ufo/ufo-task-graph.c +++ b/ufo/ufo-task-graph.c @@ -336,7 +336,7 @@ build_remote_graph (UfoTaskGraph *remote_graph, static void create_remote_tasks (UfoTaskGraph *task_graph, UfoTaskGraph *remote_graph, - UfoTaskNode *first, + GList *first, //since there can be more than one predecessor UfoTaskNode *last, UfoRemoteNode *remote) { @@ -352,7 +352,18 @@ create_remote_tasks (UfoTaskGraph *task_graph, priv->remote_tasks = g_list_append (priv->remote_tasks, task); ufo_task_node_set_proc_node (task, UFO_NODE (remote)); - ufo_task_graph_connect_nodes (task_graph, first, task); + //Setting the remote's # of inputs to the # of inputs of the corresponding task + GList *roots = ufo_graph_get_roots (UFO_GRAPH (remote_graph)); + guint root_num_inputs = ufo_task_get_num_inputs (UFO_TASK (roots->data)); + ufo_remote_node_set_num_inputs (remote, root_num_inputs); + + //Connect all predecessors + GList *pred; + int i = 0; + g_list_for (first->data, pred) { + ufo_task_graph_connect_nodes_full (task_graph, pred->data, task, i); + i++; + } ufo_task_graph_connect_nodes (task_graph, task, last); g_free (json); @@ -382,7 +393,7 @@ expand_remotes (UfoTaskGraph *task_graph, g_list_for (remotes, it) { create_remote_tasks (task_graph, remote_graph, - first->data, last->data, it->data); + first, last->data, it->data); } g_object_unref (remote_graph); @@ -446,11 +457,10 @@ ufo_task_graph_expand (UfoTaskGraph *task_graph, successors = ufo_graph_get_successors (UFO_GRAPH (task_graph), UFO_NODE (g_list_last (path)->data)); - - path = g_list_prepend (path, g_list_first (predecessors)->data); + + path = g_list_prepend (path, predecessors); path = g_list_append (path, g_list_first (successors)->data); - g_list_free (predecessors); g_list_free (successors); if (expand_remote) { @@ -468,6 +478,8 @@ ufo_task_graph_expand (UfoTaskGraph *task_graph, g_list_free (remotes); } + g_list_free (predecessors); + n_gpus = ufo_arch_graph_get_num_gpus (arch_graph); g_debug ("Expand for %i GPU nodes", n_gpus); for (guint i = 1; i < n_gpus; i++) diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c index e2eab01..ec86e0c 100644 --- a/ufo/ufo-zmq-messenger.c +++ b/ufo/ufo-zmq-messenger.c @@ -223,6 +223,9 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger, UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size); memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size); + //if (frame->type != 5 && frame->type != 7 && frame->type!=8) + //g_message ("Type: %i \tData_size: %i",(int) frame->type, (int) resp_frame->data_size); + zmq_msg_close (&reply); result = reply_msg; goto finalize; @@ -264,7 +267,6 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger, } DataFrame *frame = zmq_msg_data (&reply); - guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size); if ((guint)size != expected_size) { g_set_error (error, ufo_messenger_error_quark(), |