summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSven Werchner <Sven.Werchner@t-online.de>2014-08-20 12:02:19 +0200
committerMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-02-11 09:11:46 +0100
commitcc965010374473cce62ab99a1fcce3189bac522a (patch)
treecc3338a6dbbabae647f9212ce8e8c766f150edbc
parent586d981a453d405758c7d52006ad3f9aeecac455 (diff)
Allow multi input tasks for remote processing
-rw-r--r--ufo/ufo-daemon.c116
-rw-r--r--ufo/ufo-graph.c2
-rw-r--r--ufo/ufo-remote-node.c12
-rw-r--r--ufo/ufo-remote-node.h2
-rw-r--r--ufo/ufo-scheduler.c18
-rw-r--r--ufo/ufo-task-graph.c24
-rw-r--r--ufo/ufo-zmq-messenger.c4
7 files changed, 103 insertions, 75 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index d278137..f624d58 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -46,25 +46,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT)
#define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate))
struct _UfoDaemonPrivate {
- UfoPluginManager *manager;
- UfoTaskGraph *task_graph;
- UfoBaseScheduler *scheduler;
- GThread *scheduler_thread;
- gpointer socket;
- UfoNode *input_task;
- UfoNode *output_task;
- UfoBuffer *input;
- cl_context context;
- gchar *listen_address;
- GThread *thread;
- GMutex *startstop_lock;
- GMutex *started_lock;
- GMutex *stopped_lock;
- gboolean has_started;
- gboolean has_stopped;
- GCond *started_cond;
- GCond *stopped_cond;
- UfoMessenger *msger;
+ UfoPluginManager *manager;
+ UfoTaskGraph *task_graph;
+ UfoBaseScheduler *scheduler;
+ GThread *scheduler_thread;
+ UfoMessenger *msger;
+
+ UfoNode **input_tasks;
+ UfoNode *output_task;
+ UfoBuffer **inputs;
+
+ gchar *listen_address;
+ gpointer socket;
+ GThread *thread;
+ GMutex *startstop_lock;
+ GMutex *started_lock;
+ GMutex *stopped_lock;
+ GCond *started_cond;
+ GCond *stopped_cond;
+ gboolean has_started;
+ gboolean has_stopped;
+ guint num_inputs;
};
static gpointer run_scheduler (UfoDaemon *daemon);
@@ -96,22 +98,17 @@ handle_get_num_devices (UfoDaemon *daemon)
UfoDaemonPrivate *priv;
UfoMessage *msg;
UfoResources *resources;
- cl_uint *num_devices;
+ cl_uint num_devices;
cl_context context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
- num_devices = g_malloc (sizeof (cl_uint));
resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler));
context = ufo_resources_get_context (resources);
- UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context,
- CL_CONTEXT_NUM_DEVICES,
- sizeof (cl_uint),
- num_devices,
- NULL));
+ UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL));
- *(guint16 *) msg->data = (guint16) *num_devices;
+ *(guint16 *) msg->data = (guint16) num_devices;
ufo_messenger_send_blocking (priv->msger, msg, 0);
ufo_message_free (msg);
@@ -196,7 +193,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
priv = UFO_DAEMON_GET_PRIVATE (daemon);
json = read_json (daemon, msg);
- // send ack
+ /* Send ack */
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
@@ -222,17 +219,23 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first);
- priv->input_task = ufo_input_task_new ();
+ priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first));
+ priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode));
+ priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer));
priv->output_task = ufo_output_task_new (2);
- ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
- priv->input_task, first,
- GINT_TO_POINTER (0));
+ for (guint i = 0; i < priv->num_inputs; i++) {
+ priv->input_tasks[i] = ufo_input_task_new();
+ ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
+ priv->input_tasks[i], first,
+ GINT_TO_POINTER (i));
+ }
ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
last, priv->output_task,
GINT_TO_POINTER (0));
+
priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL);
g_free (json);
}
@@ -284,22 +287,23 @@ handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
char *base = request->data;
struct _Header *header = (struct _Header *) base;
- /* Receive buffer size */
requisition = header->requisition;
- if (priv->input == NULL) {
- priv->input = ufo_buffer_new (&requisition, context);
- }
- else {
- if (ufo_buffer_cmp_dimensions (priv->input, &requisition))
- ufo_buffer_resize (priv->input, &requisition);
- }
+ for (guint i = 0; i < priv->num_inputs; i++) {
+ if (priv->inputs[i] == NULL) {
+ priv->inputs[i] = ufo_buffer_new (&requisition, context);
+ }
+ else {
+ if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition))
+ ufo_buffer_resize (priv->inputs[i], &requisition);
+ }
- memcpy (ufo_buffer_get_host_array (priv->input, NULL),
- base + sizeof (struct _Header),
- ufo_buffer_get_size (priv->input));
+ memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL),
+ base + sizeof (struct _Header),
+ ufo_buffer_get_size (priv->inputs[i]));
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]);
+ }
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
@@ -360,21 +364,21 @@ void handle_cleanup (UfoDaemon *daemon)
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
- // TODO check that we don't need to execture this branch wen priv->input is null
- if (priv->input_task && priv->input) {
- ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task));
+ /* TODO check that we don't need to execture this branch wen priv->input is null */
+ for (guint i = 0; i < priv->num_inputs; i++) {
+ if (priv->input_tasks[i] && priv->inputs[i]) {
+ ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i]));
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]),
+ priv->inputs[i]);
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task),
- priv->input);
-
- g_usleep (1.5 * G_USEC_PER_SEC);
- unref_and_free ((GObject **) &priv->input_task);
- unref_and_free ((GObject **) &priv->input);
+ g_usleep (1.5 * G_USEC_PER_SEC);
+ unref_and_free ((GObject **) &priv->input_tasks[i]);
+ unref_and_free ((GObject **) &priv->inputs[i]);
+ }
}
unref_and_free ((GObject **) &priv->output_task);
unref_and_free ((GObject **) &priv->task_graph);
-
}
static void
@@ -412,7 +416,7 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
g_debug ("UfoDaemon started on address %s", priv->listen_address);
- // tell the calling thread that we have started
+ /* tell the calling thread that we have started */
g_mutex_lock (priv->started_lock);
priv->has_started = TRUE;
g_cond_signal (priv->started_cond);
@@ -465,7 +469,7 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
ufo_message_free (msg);
}
- // tell calling thread we have stopped
+ /* tell calling thread we have stopped */
g_mutex_lock (priv->stopped_lock);
priv->has_stopped = TRUE;
g_cond_signal (priv->stopped_cond);
diff --git a/ufo/ufo-graph.c b/ufo/ufo-graph.c
index 5a7d53d..ff5b4e7 100644
--- a/ufo/ufo-graph.c
+++ b/ufo/ufo-graph.c
@@ -220,7 +220,7 @@ ufo_graph_get_nodes_filtered (UfoGraph *graph,
g_list_for (priv->nodes, it) {
UfoNode *node = UFO_NODE (it->data);
- if (func (node, user_data))
+ if (func (node, user_data))
result = g_list_append (result, node);
}
diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c
index 8ad525a..1eaf508 100644
--- a/ufo/ufo-remote-node.c
+++ b/ufo/ufo-remote-node.c
@@ -139,7 +139,13 @@ ufo_remote_node_send_json (UfoRemoteNode *node,
guint
ufo_remote_node_get_num_inputs (UfoRemoteNode *node)
{
- return 1;
+ return node->priv->n_inputs;
+}
+
+void
+ufo_remote_node_set_num_inputs (UfoRemoteNode *node, guint n_inputs)
+{
+ node->priv->n_inputs = n_inputs;
}
guint
@@ -178,7 +184,7 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node,
// determine our total message size
guint64 size = 0;
-
+
for (guint i = 0; i < priv->n_inputs; i++) {
guint64 buffer_size = ufo_buffer_get_size (inputs[i]);
size += buffer_size;
@@ -315,6 +321,6 @@ ufo_remote_node_init (UfoRemoteNode *self)
{
UfoRemoteNodePrivate *priv;
self->priv = priv = UFO_REMOTE_NODE_GET_PRIVATE (self);
- priv->n_inputs = ufo_remote_node_get_num_inputs (self);
+ priv->n_inputs = 0;
priv->terminated = FALSE;
}
diff --git a/ufo/ufo-remote-node.h b/ufo/ufo-remote-node.h
index f661804..0d67693 100644
--- a/ufo/ufo-remote-node.h
+++ b/ufo/ufo-remote-node.h
@@ -86,6 +86,8 @@ void ufo_remote_node_send_json (UfoRemoteNode *node,
UfoRemoteMode mode,
const gchar *json);
guint ufo_remote_node_get_num_inputs (UfoRemoteNode *node);
+void ufo_remote_node_set_num_inputs (UfoRemoteNode *node,
+ guint n_inputs);
guint ufo_remote_node_get_num_dimensions (UfoRemoteNode *node,
guint input);
UfoTaskMode ufo_remote_node_get_mode (UfoRemoteNode *node);
diff --git a/ufo/ufo-scheduler.c b/ufo/ufo-scheduler.c
index 687551c..3cfc572 100644
--- a/ufo/ufo-scheduler.c
+++ b/ufo/ufo-scheduler.c
@@ -169,9 +169,7 @@ run_remote_task (TaskLocalData *tld)
guint n_remote_gpus;
gboolean *alive;
gboolean active = TRUE;
-
- g_assert (tld->n_inputs == 1);
-
+
remote = UFO_REMOTE_NODE (ufo_task_node_get_proc_node (UFO_TASK_NODE (tld->task)));
n_remote_gpus = ufo_remote_node_get_num_gpus (remote);
alive = g_new0 (gboolean, n_remote_gpus);
@@ -410,6 +408,7 @@ setup_tasks (UfoBaseScheduler *scheduler,
tlds = g_new0 (TaskLocalData *, n_nodes);
+
for (guint i = 0; i < n_nodes; i++) {
UfoNode *node;
UfoProfiler *profiler;
@@ -431,16 +430,18 @@ setup_tasks (UfoBaseScheduler *scheduler,
for (guint j = 0; j < tld->n_inputs; j++)
tld->dims[j] = ufo_task_get_num_dimensions (tld->task, j);
- if (!check_target_connections (task_graph, node, tld->n_inputs, error))
+ if (!check_target_connections (task_graph, node, tld->n_inputs, error)) {
return NULL;
+ }
profiler = ufo_task_node_get_profiler (UFO_TASK_NODE (node));
ufo_profiler_enable_tracing (profiler, tracing_enabled);
tld->finished = g_new0 (gboolean, tld->n_inputs);
- if (error && *error != NULL)
+ if (error && *error != NULL) {
return NULL;
+ }
}
g_list_free (nodes);
@@ -582,7 +583,7 @@ propagate_partition (UfoTaskGraph *graph)
static void
join_threads (GThread **threads, guint n_threads)
{
- for (guint i = 0; i < n_threads; i++)
+ for (guint i = 0; i < n_threads; i++)
g_thread_join (threads[i]);
}
@@ -624,7 +625,7 @@ ufo_scheduler_run (UfoBaseScheduler *scheduler,
graph = task_graph;
}
- if (graph == NULL)
+ if (graph == NULL)
return;
arch = ufo_base_scheduler_get_arch (scheduler);
@@ -645,8 +646,9 @@ ufo_scheduler_run (UfoBaseScheduler *scheduler,
/* Prepare task structures */
tlds = setup_tasks (scheduler, graph, error);
- if (tlds == NULL)
+ if (tlds == NULL) {
return;
+ }
groups = setup_groups (scheduler, graph);
diff --git a/ufo/ufo-task-graph.c b/ufo/ufo-task-graph.c
index 57fae05..64d0ea0 100644
--- a/ufo/ufo-task-graph.c
+++ b/ufo/ufo-task-graph.c
@@ -336,7 +336,7 @@ build_remote_graph (UfoTaskGraph *remote_graph,
static void
create_remote_tasks (UfoTaskGraph *task_graph,
UfoTaskGraph *remote_graph,
- UfoTaskNode *first,
+ GList *first, //since there can be more than one predecessor
UfoTaskNode *last,
UfoRemoteNode *remote)
{
@@ -352,7 +352,18 @@ create_remote_tasks (UfoTaskGraph *task_graph,
priv->remote_tasks = g_list_append (priv->remote_tasks, task);
ufo_task_node_set_proc_node (task, UFO_NODE (remote));
- ufo_task_graph_connect_nodes (task_graph, first, task);
+ //Setting the remote's # of inputs to the # of inputs of the corresponding task
+ GList *roots = ufo_graph_get_roots (UFO_GRAPH (remote_graph));
+ guint root_num_inputs = ufo_task_get_num_inputs (UFO_TASK (roots->data));
+ ufo_remote_node_set_num_inputs (remote, root_num_inputs);
+
+ //Connect all predecessors
+ GList *pred;
+ int i = 0;
+ g_list_for (first->data, pred) {
+ ufo_task_graph_connect_nodes_full (task_graph, pred->data, task, i);
+ i++;
+ }
ufo_task_graph_connect_nodes (task_graph, task, last);
g_free (json);
@@ -382,7 +393,7 @@ expand_remotes (UfoTaskGraph *task_graph,
g_list_for (remotes, it) {
create_remote_tasks (task_graph, remote_graph,
- first->data, last->data, it->data);
+ first, last->data, it->data);
}
g_object_unref (remote_graph);
@@ -446,11 +457,10 @@ ufo_task_graph_expand (UfoTaskGraph *task_graph,
successors = ufo_graph_get_successors (UFO_GRAPH (task_graph),
UFO_NODE (g_list_last (path)->data));
-
- path = g_list_prepend (path, g_list_first (predecessors)->data);
+
+ path = g_list_prepend (path, predecessors);
path = g_list_append (path, g_list_first (successors)->data);
- g_list_free (predecessors);
g_list_free (successors);
if (expand_remote) {
@@ -468,6 +478,8 @@ ufo_task_graph_expand (UfoTaskGraph *task_graph,
g_list_free (remotes);
}
+ g_list_free (predecessors);
+ n_gpus = ufo_arch_graph_get_num_gpus (arch_graph);
g_debug ("Expand for %i GPU nodes", n_gpus);
for (guint i = 1; i < n_gpus; i++)
diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c
index e2eab01..ec86e0c 100644
--- a/ufo/ufo-zmq-messenger.c
+++ b/ufo/ufo-zmq-messenger.c
@@ -223,6 +223,9 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size);
memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size);
+ //if (frame->type != 5 && frame->type != 7 && frame->type!=8)
+ //g_message ("Type: %i \tData_size: %i",(int) frame->type, (int) resp_frame->data_size);
+
zmq_msg_close (&reply);
result = reply_msg;
goto finalize;
@@ -264,7 +267,6 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
}
DataFrame *frame = zmq_msg_data (&reply);
-
guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size);
if ((guint)size != expected_size) {
g_set_error (error, ufo_messenger_error_quark(),