/* * Copyright (C) 2011-2013 Karlsruhe Institute of Technology * * This file is part of Ufo. * * This library is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, either * version 3 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "config.h" #ifdef WITH_PYTHON #include #endif #ifdef __APPLE__ #include #else #include #endif #include #include #include #include #include #include #include #include #include #include #include "ufo-priv.h" #include "compat.h" /** * SECTION:ufo-scheduler * @Short_description: Expansion-based scheduler * @Title: UfoScheduler * * A scheduler that automatically distributes data according to an expansion * policy among different hardware resources. For that, paths of large work are * duplicated inside the #UfoTaskGraph and assigned to distinct GPUs. */ G_DEFINE_TYPE (UfoScheduler, ufo_scheduler, UFO_TYPE_BASE_SCHEDULER) #define UFO_SCHEDULER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_SCHEDULER, UfoSchedulerPrivate)) typedef struct { UfoTask *task; UfoTaskMode mode; guint n_inputs; guint *dims; gboolean *finished; gboolean strict; gboolean timestamps; } TaskLocalData; struct _UfoSchedulerPrivate { UfoRemoteMode mode; gboolean ran; }; /** * UfoSchedulerError: * @UFO_SCHEDULER_ERROR_SETUP: Could not start scheduler due to error */ GQuark ufo_scheduler_error_quark (void) { return g_quark_from_static_string ("ufo-scheduler-error-quark"); } /** * ufo_scheduler_new: * * Creates a new #UfoBaseScheduler. * * Return value: A new #UfoBaseScheduler */ UfoBaseScheduler * ufo_scheduler_new (void) { return UFO_BASE_SCHEDULER (g_object_new (UFO_TYPE_SCHEDULER, NULL)); } static gboolean get_inputs (TaskLocalData *tld, UfoBuffer **inputs) { UfoRequisition req; UfoTaskNode *node = UFO_TASK_NODE (tld->task); guint n_finished = 0; for (guint i = 0; i < tld->n_inputs; i++) { UfoGroup *group; if (!tld->finished[i]) { UfoBuffer *input; group = ufo_task_node_get_current_in_group (node, i); input = ufo_group_pop_input_buffer (group, tld->task); if (tld->strict && input != UFO_END_OF_STREAM) { ufo_buffer_get_requisition (input, &req); if (req.n_dims != tld->dims[i]) { g_warning ("%s: buffer from input %i provides %i dimensions but expect %i dimensions", G_OBJECT_TYPE_NAME (tld->task), i, req.n_dims, tld->dims[i]); return FALSE; } } if (input == UFO_END_OF_STREAM) { tld->finished[i] = TRUE; n_finished++; } else inputs[i] = input; } else n_finished++; } return (tld->n_inputs == 0) || (n_finished < tld->n_inputs); } static void release_inputs (TaskLocalData *tld, UfoBuffer **inputs) { UfoTaskNode *node = UFO_TASK_NODE (tld->task); for (guint i = 0; i < tld->n_inputs; i++) { UfoGroup *group; group = ufo_task_node_get_current_in_group (node, i); ufo_group_push_input_buffer (group, tld->task, inputs[i]); ufo_task_node_switch_in_group (node, i); } } static gboolean any (gboolean *values, guint n_values) { gboolean result = FALSE; for (guint i = 0; i < n_values; i++) result = result || values[i]; return result; } static void run_remote_task (TaskLocalData *tld) { UfoRemoteNode *remote; guint n_remote_gpus; gboolean *alive; gboolean active = TRUE; remote = UFO_REMOTE_NODE (ufo_task_node_get_proc_node (UFO_TASK_NODE (tld->task))); n_remote_gpus = ufo_remote_node_get_num_gpus (remote); alive = g_new0 (gboolean, n_remote_gpus); /* * We launch a new thread for each incoming input data set because then we * can send as many items as we have remote GPUs available without waiting * for processing to stop. */ while (active) { for (guint i = 0; i < n_remote_gpus; i++) { UfoBuffer *input; if (get_inputs (tld, &input)) { ufo_remote_node_send_inputs (remote, &input); release_inputs (tld, &input); alive[i] = TRUE; } else { alive[i] = FALSE; } } for (guint i = 0; i < n_remote_gpus; i++) { UfoGroup *group; UfoBuffer *output; UfoRequisition requisition; if (!alive[i]) continue; ufo_remote_node_get_requisition (remote, &requisition); group = ufo_task_node_get_out_group (UFO_TASK_NODE (tld->task)); output = ufo_group_pop_output_buffer (group, &requisition); ufo_remote_node_get_result (remote, output); ufo_group_push_output_buffer (group, output); } active = any (alive, n_remote_gpus); } g_free (alive); ufo_group_finish (ufo_task_node_get_out_group (UFO_TASK_NODE (tld->task))); ufo_remote_node_terminate (remote); } static gpointer run_task (TaskLocalData *tld) { UfoBuffer *inputs[tld->n_inputs]; UfoBuffer *output; UfoTaskNode *node; UfoTaskMode mode; UfoRequisition requisition; gboolean produces; gboolean active; node = UFO_TASK_NODE (tld->task); active = TRUE; output = NULL; if (UFO_IS_REMOTE_TASK (tld->task)) { run_remote_task (tld); return NULL; } /* mode without CPU/GPU flag */ mode = tld->mode & UFO_TASK_MODE_TYPE_MASK; produces = mode != UFO_TASK_MODE_SINK; while (active) { UfoGroup *group; group = ufo_task_node_get_out_group (node); /* Get input buffers */ active = get_inputs (tld, inputs); if (!active) { ufo_group_finish (group); break; } /* Get output buffers */ ufo_task_get_requisition (tld->task, inputs, &requisition); if (produces) { output = ufo_group_pop_output_buffer (group, &requisition); g_assert (output != NULL); } if (output != NULL) { ufo_buffer_discard_location (output); for (guint i = 0; i < tld->n_inputs; i++) ufo_buffer_copy_metadata (inputs[i], output); } switch (mode) { case UFO_TASK_MODE_PROCESSOR: case UFO_TASK_MODE_SINK: active = ufo_task_process (tld->task, inputs, output, &requisition); break; case UFO_TASK_MODE_REDUCTOR: do { gboolean go_on = TRUE; do { go_on = ufo_task_process (tld->task, inputs, output, &requisition); release_inputs (tld, inputs); active = get_inputs (tld, inputs); go_on = go_on && active; } while (go_on); do { go_on = ufo_task_generate (tld->task, output, &requisition); if (go_on) { ufo_group_push_output_buffer (group, output); output = ufo_group_pop_output_buffer (group, &requisition); } } while (go_on); } while (active); break; case UFO_TASK_MODE_GENERATOR: { if (tld->timestamps) { GValue v = { 0, }; g_value_init (&v, G_TYPE_INT64); g_value_set_int64 (&v, g_get_real_time ()); ufo_buffer_set_metadata (output, "ts", &v); } active = ufo_task_generate (tld->task, output, &requisition); } break; default: g_warning ("Invalid task mode: %i\n", mode); } if (active && produces && (mode != UFO_TASK_MODE_REDUCTOR)) ufo_group_push_output_buffer (group, output); /* Release buffers for further consumption */ if (active) release_inputs (tld, inputs); if (!active) ufo_group_finish (group); } return NULL; } static void cleanup_task_local_data (TaskLocalData **tlds, guint n) { for (guint i = 0; i < n; i++) { TaskLocalData *tld = tlds[i]; ufo_task_node_reset (UFO_TASK_NODE (tld->task)); g_free (tld->dims); g_free (tld->finished); g_free (tld); } g_free (tlds); } static gboolean check_target_connections (UfoTaskGraph *graph, UfoNode *target, guint n_inputs, GError **error) { GList *predecessors; GList *it; guint16 connection_bitmap; guint16 mask; gboolean result = TRUE; if (n_inputs == 0) return TRUE; predecessors = ufo_graph_get_predecessors (UFO_GRAPH (graph), target); connection_bitmap = 0; /* Check all edges and enable bit number for edge label */ g_list_for (predecessors, it) { gpointer label; gint input; label = ufo_graph_get_edge_label (UFO_GRAPH (graph), UFO_NODE (it->data), target); input = GPOINTER_TO_INT (label); g_assert (input >= 0 && input < 16); connection_bitmap |= 1 << input; } mask = (1 << n_inputs) - 1; /* Check if mask matches what we have */ if ((mask & connection_bitmap) != mask) { g_set_error (error, UFO_SCHEDULER_ERROR, UFO_SCHEDULER_ERROR_SETUP, "Not all inputs of `%s' are connected", ufo_task_node_get_plugin_name (UFO_TASK_NODE (target))); result = FALSE; } g_list_free (predecessors); return result; } static TaskLocalData ** setup_tasks (UfoBaseScheduler *scheduler, UfoTaskGraph *task_graph, GError **error) { UfoResources *resources; TaskLocalData **tlds; GList *nodes; guint n_nodes; gboolean timestamps; gboolean tracing_enabled; resources = ufo_base_scheduler_get_resources (scheduler, error); if (resources == NULL) return NULL; g_object_get (scheduler, "enable-tracing", &tracing_enabled, "timestamps", ×tamps, NULL); nodes = ufo_graph_get_nodes (UFO_GRAPH (task_graph)); n_nodes = g_list_length (nodes); tlds = g_new0 (TaskLocalData *, n_nodes); for (guint i = 0; i < n_nodes; i++) { UfoNode *node; TaskLocalData *tld; node = g_list_nth_data (nodes, i); tld = g_new0 (TaskLocalData, 1); tld->task = UFO_TASK (node); tlds[i] = tld; ufo_task_setup (tld->task, resources, error); tld->mode = ufo_task_get_mode (tld->task); tld->n_inputs = ufo_task_get_num_inputs (tld->task); tld->dims = g_new0 (guint, tld->n_inputs); tld->timestamps = timestamps; /* TODO: make this configurable from outside */ tld->strict = FALSE; for (guint j = 0; j < tld->n_inputs; j++) tld->dims[j] = ufo_task_get_num_dimensions (tld->task, j); if (!check_target_connections (task_graph, node, tld->n_inputs, error)) { return NULL; } tld->finished = g_new0 (gboolean, tld->n_inputs); if (error && *error != NULL) { return NULL; } } g_list_free (nodes); return tlds; } static GList * setup_groups (UfoBaseScheduler *scheduler, UfoTaskGraph *task_graph, GError **error) { UfoResources *resources; GList *groups; GList *nodes; GList *it; cl_context context; groups = NULL; nodes = ufo_graph_get_nodes (UFO_GRAPH (task_graph)); resources = ufo_base_scheduler_get_resources (scheduler, error); if (resources == NULL) return NULL; context = ufo_resources_get_context (resources); g_list_for (nodes, it) { GList *successors; GList *jt; UfoNode *node; UfoGroup *group; UfoSendPattern pattern; node = UFO_NODE (it->data); successors = ufo_graph_get_successors (UFO_GRAPH (task_graph), node); pattern = ufo_task_node_get_send_pattern (UFO_TASK_NODE (node)); group = ufo_group_new (successors, context, pattern); groups = g_list_append (groups, group); ufo_task_node_set_out_group (UFO_TASK_NODE (node), group); g_list_for (successors, jt) { UfoNode *target; gpointer label; guint input; target = UFO_NODE (jt->data); label = ufo_graph_get_edge_label (UFO_GRAPH (task_graph), node, target); input = (guint) GPOINTER_TO_INT (label); ufo_task_node_add_in_group (UFO_TASK_NODE (target), input, group); ufo_group_set_num_expected (group, UFO_TASK (target), ufo_task_node_get_num_expected (UFO_TASK_NODE (target), input)); } g_list_free (successors); } g_list_free (nodes); return groups; } static gboolean correct_connections (UfoTaskGraph *graph, GError **error) { GList *nodes; GList *it; gboolean result = TRUE; nodes = ufo_graph_get_nodes (UFO_GRAPH (graph)); g_list_for (nodes, it) { UfoTaskNode *node; UfoTaskMode mode; UfoGroup *group; node = UFO_TASK_NODE (it->data); mode = ufo_task_get_mode (UFO_TASK (node)) & UFO_TASK_MODE_TYPE_MASK; group = ufo_task_node_get_out_group (node); if (((mode == UFO_TASK_MODE_GENERATOR) || (mode == UFO_TASK_MODE_REDUCTOR)) && ufo_group_get_num_targets (group) < 1) { g_set_error (error, UFO_SCHEDULER_ERROR, UFO_SCHEDULER_ERROR_SETUP, "No outgoing node for `%s'", ufo_task_node_get_identifier (node)); result = FALSE; break; } } g_list_free (nodes); return result; } static void replicate_task_graph (UfoTaskGraph *graph, UfoResources *resources) { GList *remotes; GList *it; guint n_graphs; guint idx = 1; remotes = ufo_resources_get_remote_nodes (resources); n_graphs = g_list_length (remotes) + 1; g_list_for (remotes, it) { UfoRemoteNode *node; gchar *json; /* Set partition idx for the remote task graph */ ufo_task_graph_set_partition (graph, idx++, n_graphs); json = ufo_task_graph_get_json_data (graph, NULL); node = UFO_REMOTE_NODE (it->data); ufo_remote_node_send_json (node, UFO_REMOTE_MODE_REPLICATE, json); g_free (json); } /* Set partition index for the local task graph */ ufo_task_graph_set_partition (graph, 0, n_graphs); g_list_free (remotes); } static void propagate_partition (UfoTaskGraph *graph) { GList *nodes; GList *it; guint idx; guint total; ufo_task_graph_get_partition (graph, &idx, &total); nodes = ufo_graph_get_nodes (UFO_GRAPH (graph)); g_list_for (nodes, it) { ufo_task_node_set_partition (UFO_TASK_NODE (it->data), idx, total); } g_list_free (nodes); } static void join_threads (GThread **threads, guint n_threads) { for (guint i = 0; i < n_threads; i++) g_thread_join (threads[i]); } static void ufo_scheduler_run (UfoBaseScheduler *scheduler, UfoTaskGraph *task_graph, GError **error) { UfoSchedulerPrivate *priv; UfoResources *resources; UfoTaskGraph *graph; GList *gpu_nodes; GList *groups; guint n_nodes; GThread **threads; TaskLocalData **tlds; gboolean expand; priv = UFO_SCHEDULER_GET_PRIVATE (scheduler); g_object_get (scheduler, "expand", &expand, NULL); graph = task_graph; resources = ufo_base_scheduler_get_resources (scheduler, error); if (resources == NULL) return; gpu_nodes = ufo_resources_get_gpu_nodes (resources); if (priv->mode == UFO_REMOTE_MODE_REPLICATE) replicate_task_graph (graph, resources); if (expand) { gboolean expand_remote = priv->mode == UFO_REMOTE_MODE_STREAM; if (!priv->ran) ufo_task_graph_expand (graph, resources, g_list_length (gpu_nodes), expand_remote); else g_debug ("Task graph already expanded, skipping."); } propagate_partition (graph); ufo_task_graph_map (graph, gpu_nodes); /* Prepare task structures */ tlds = setup_tasks (scheduler, graph, error); if (tlds == NULL) return; groups = setup_groups (scheduler, graph, error); if (groups == NULL) return; if (!correct_connections (graph, error)) return; n_nodes = ufo_graph_get_num_nodes (UFO_GRAPH (graph)); threads = g_new0 (GThread *, n_nodes); /* Spawn threads */ for (guint i = 0; i < n_nodes; i++) { threads[i] = g_thread_create ((GThreadFunc) run_task, tlds[i], TRUE, error); if (error && (*error != NULL)) return; } #ifdef WITH_PYTHON if (Py_IsInitialized ()) { PyGILState_STATE state = PyGILState_Ensure (); Py_BEGIN_ALLOW_THREADS join_threads (threads, n_nodes); Py_END_ALLOW_THREADS PyGILState_Release (state); } else { join_threads (threads, n_nodes); } #else join_threads (threads, n_nodes); #endif /* Cleanup */ cleanup_task_local_data (tlds, n_nodes); g_list_foreach (groups, (GFunc) g_object_unref, NULL); g_list_free (groups); g_list_free (gpu_nodes); g_free (threads); priv->ran = TRUE; } static void ufo_scheduler_class_init (UfoSchedulerClass *klass) { UfoBaseSchedulerClass *sclass; sclass = UFO_BASE_SCHEDULER_CLASS (klass); sclass->run = ufo_scheduler_run; g_type_class_add_private (klass, sizeof (UfoSchedulerPrivate)); } static void ufo_scheduler_init (UfoScheduler *scheduler) { UfoSchedulerPrivate *priv; scheduler->priv = priv = UFO_SCHEDULER_GET_PRIVATE (scheduler); priv->mode = UFO_REMOTE_MODE_STREAM; priv->ran = FALSE; }