/* * Copyright (C) 2011-2013 Karlsruhe Institute of Technology * * This file is part of Ufo. * * This library is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, either * version 3 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "config.h" #ifdef WITH_PYTHON #include #endif #ifdef __APPLE__ #include #else #include #endif #include #include #include #include #include #include #include #include #include #include "ufo-priv.h" #include "compat.h" /** * SECTION:ufo-fixed-scheduler * @Short_description: Simple fixed scheduler * @Title: UfoFixedScheduler * * This scheduler has only minimal automatisms. It does not attempt to * distribute work among multiple GPUs, which is left to do by the user. */ G_DEFINE_TYPE (UfoFixedScheduler, ufo_fixed_scheduler, UFO_TYPE_BASE_SCHEDULER) #define UFO_FIXED_SCHEDULER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_FIXED_SCHEDULER, UfoFixedSchedulerPrivate)) struct _UfoFixedSchedulerPrivate { UfoArchGraph *arch; }; typedef struct { UfoTask *from; UfoTask *to; guint port; UfoTwoWayQueue *queue; } Connection; typedef struct { GList *connections; GList *tasks; } ProcessData; typedef struct { UfoGraph *graph; UfoTask *task; GList *connections; cl_context context; } TaskData; enum { PROP_0, N_PROPERTIES, }; static UfoBuffer *POISON_PILL = (UfoBuffer *) 0x1; /** * UfoFixedSchedulerError: * @UFO_FIXED_SCHEDULER_ERROR_SETUP: Could not start scheduler due to error */ GQuark ufo_fixed_scheduler_error_quark (void) { return g_quark_from_static_string ("ufo-scheduler-error-quark"); } /** * ufo_fixed_scheduler_new: * * Creates a new #UfoFixedScheduler. * * Return value: A new #UfoFixedScheduler */ UfoBaseScheduler * ufo_fixed_scheduler_new (void) { return UFO_BASE_SCHEDULER (g_object_new (UFO_TYPE_FIXED_SCHEDULER, NULL)); } static gboolean pop_input_data (UfoTwoWayQueue **in_queues, gboolean *finished, UfoBuffer **inputs, guint n_inputs) { guint n_finished; n_finished = 0; for (guint i = 0; i < n_inputs; i++) { if (!finished[i]) { UfoBuffer *input; input = ufo_two_way_queue_consumer_pop (in_queues[i]); if (input == POISON_PILL) { finished[i] = TRUE; n_finished++; } else { inputs[i] = input; } } else { n_finished++; } } return n_finished < n_inputs; } static void release_input_data (UfoTwoWayQueue **in_queues, UfoBuffer **inputs, guint n_inputs) { for (guint i = 0; i < n_inputs; i++) ufo_two_way_queue_consumer_push (in_queues[i], inputs[i]); } static UfoBuffer * pop_output_data (UfoTwoWayQueue *queue, UfoRequisition *requisition, cl_context context) { UfoBuffer *buffer; if (ufo_two_way_queue_get_capacity (queue) < 2) { buffer = ufo_buffer_new (requisition, context); ufo_two_way_queue_insert (queue, buffer); } buffer = ufo_two_way_queue_producer_pop (queue); if (ufo_buffer_cmp_dimensions (buffer, requisition)) ufo_buffer_resize (buffer, requisition); return buffer; } static GList * get_output_queue_list (TaskData *data) { GList *result = NULL; GList *it; g_list_for (data->connections, it) { Connection *connection = (Connection *) it->data; if (connection->from == data->task) result = g_list_append (result, connection->queue); } return result; } static UfoTwoWayQueue ** get_input_queues (TaskData *data, guint *n_inputs) { UfoTwoWayQueue **result; GList *it; *n_inputs = ufo_graph_get_num_predecessors (data->graph, UFO_NODE (data->task)); result = g_new0 (UfoTwoWayQueue *, *n_inputs); g_list_for (data->connections, it) { Connection *connection = (Connection *) it->data; if (connection->to == data->task) result[connection->port] = connection->queue; } return result; } static void finish_successors (GList *out_queues) { GList *it; g_list_for (out_queues, it) { UfoTwoWayQueue *out_queue = (UfoTwoWayQueue *) it->data; ufo_two_way_queue_producer_push (out_queue, POISON_PILL); } } static void generate_loop (TaskData *data) { UfoRequisition requisition; UfoBuffer *output; GList *out_queues; GList *it; gboolean active = TRUE; out_queues = get_output_queue_list (data); while (active) { g_list_for (out_queues, it) { UfoTwoWayQueue *out_queue = (UfoTwoWayQueue *) it->data; ufo_task_get_requisition (data->task, NULL, &requisition); output = pop_output_data (out_queue, &requisition, data->context); active = ufo_task_generate (data->task, output, &requisition); if (!active) break; ufo_two_way_queue_producer_push (out_queue, output); } } finish_successors (out_queues); g_list_free (out_queues); } static void process_loop (TaskData *data) { UfoRequisition requisition; UfoBuffer **inputs; UfoBuffer *output; UfoTwoWayQueue **in_queues; gboolean *finished; GList *out_queues; GList *it; guint n_inputs; gboolean active = TRUE; gboolean is_sink; in_queues = get_input_queues (data, &n_inputs); out_queues = get_output_queue_list (data); inputs = g_new0 (UfoBuffer *, n_inputs); finished = g_new0 (gboolean, n_inputs); is_sink = g_list_length (out_queues) == 0; while (active) { active = pop_input_data (in_queues, finished, inputs, n_inputs); if (!active) break; ufo_task_get_requisition (data->task, inputs, &requisition); if (is_sink) { active = ufo_task_process (data->task, inputs, NULL, &requisition); } else { g_list_for (out_queues, it) { UfoTwoWayQueue *out_queue = (UfoTwoWayQueue *) it->data; output = pop_output_data (out_queue, &requisition, data->context); for (guint i = 0; i < n_inputs; i++) ufo_buffer_copy_metadata (inputs[i], output); active = ufo_task_process (data->task, inputs, output, &requisition); if (!active) break; ufo_two_way_queue_producer_push (out_queue, output); } } release_input_data (in_queues, inputs, n_inputs); } finish_successors (out_queues); g_free (in_queues); g_free (inputs); g_free (finished); g_list_free (out_queues); } static void reduce_loop (TaskData *data) { UfoRequisition requisition; UfoTwoWayQueue **in_queues; UfoTwoWayQueue **output_queues; UfoBuffer **inputs; UfoBuffer **outputs; gboolean *finished; GList *it; GList *out_queues; guint n_inputs; guint n_outputs; gboolean active = TRUE; in_queues = get_input_queues (data, &n_inputs); out_queues = get_output_queue_list (data); inputs = g_new0 (UfoBuffer *, n_inputs); finished = g_new0 (gboolean, n_inputs); n_outputs = g_list_length (out_queues); outputs = g_new0 (UfoBuffer *, n_outputs); output_queues = g_new0 (UfoTwoWayQueue *, n_outputs); it = g_list_first (out_queues); for (guint i = 0; it != NULL; it = g_list_next (it)) { output_queues[i] = (UfoTwoWayQueue *) it->data; } /* Read first input item */ if (!pop_input_data (in_queues, finished, inputs, n_inputs)) return; ufo_task_get_requisition (data->task, inputs, &requisition); /* Get the scratchpad output buffers from all successors */ for (guint i = 0; i < n_outputs; i++) { outputs[i] = pop_output_data (output_queues[i], &requisition, data->context); } /* Process all inputs. Note that we already fetched the first input. */ do { for (guint i = 0; i < n_outputs; i++) { for (guint j = 0; j < n_inputs; j++) ufo_buffer_copy_metadata (inputs[j], outputs[i]); active = ufo_task_process (data->task, inputs, outputs[i], &requisition); release_input_data (in_queues, inputs, n_inputs); active = pop_input_data (in_queues, finished, inputs, n_inputs); } } while (active); /* Generate all outputs */ do { for (guint i = 0; i < n_outputs; i++) { active = ufo_task_generate (data->task, outputs[i], &requisition); if (active) { ufo_two_way_queue_producer_push (output_queues[i], outputs[i]); outputs[i] = ufo_two_way_queue_producer_pop (output_queues[i]); } } } while (active); finish_successors (out_queues); g_free (inputs); g_free (in_queues); g_free (outputs); g_free (output_queues); g_free (finished); g_list_free (out_queues); } static gpointer run_local (TaskData *data) { UfoTaskMode mode; mode = ufo_task_get_mode (data->task) & UFO_TASK_MODE_TYPE_MASK; switch (mode) { case UFO_TASK_MODE_GENERATOR: generate_loop (data); break; case UFO_TASK_MODE_PROCESSOR: process_loop (data); break; case UFO_TASK_MODE_REDUCTOR: reduce_loop (data); break; default: g_warning ("Unknown task mode"); } /* We can release "data" here, because we do not store it when creating it */ g_free (data); return NULL; } static void join_threads (GList *threads) { GList *it; g_list_for (threads, it) { g_thread_join (it->data); } } static GList * append_if_not_existing (GList *list, UfoTask *task) { if (g_list_find (list, task) == NULL) return g_list_append (list, task); return list; } static ProcessData * setup_tasks (UfoGraph *graph, UfoBaseScheduler *scheduler, UfoResources *resources, GError **error) { ProcessData *data; GList *gpu_nodes; GList *nodes; GList *it; data = g_new0 (ProcessData, 1); data->connections = NULL; data->tasks = NULL; nodes = ufo_graph_get_nodes (graph); gpu_nodes = ufo_base_scheduler_get_gpu_nodes (scheduler); g_list_for (nodes, it) { UfoNode *source_node; UfoTask *source_task; GList *successors; GList *jt; source_node = UFO_NODE (it->data); source_task = UFO_TASK (source_node); data->tasks = append_if_not_existing (data->tasks, source_task); successors = ufo_graph_get_successors (graph, source_node); g_list_for (successors, jt) { UfoNode *dest_node; UfoTask *dest_task; Connection *connection; dest_node = UFO_NODE (jt->data); dest_task = UFO_TASK (dest_node); connection = g_new0 (Connection, 1); connection->from = source_task; connection->to = dest_task; connection->port = (guint) GPOINTER_TO_INT (ufo_graph_get_edge_label (graph, source_node, dest_node)); connection->queue = ufo_two_way_queue_new (NULL); data->connections = g_list_append (data->connections, connection); data->tasks = append_if_not_existing (data->tasks, dest_task); } } g_list_for (data->tasks, it) { UfoTask *task; task = UFO_TASK (it->data); /* Set a default GPU if not assigned by user */ if (ufo_task_get_mode (task) & UFO_TASK_MODE_GPU) { if (ufo_task_node_get_proc_node (UFO_TASK_NODE (task)) == NULL) { if (g_list_length (gpu_nodes) == 0) { g_set_error_literal (error, UFO_BASE_SCHEDULER_ERROR, UFO_BASE_SCHEDULER_ERROR_SETUP, "Using GPU tasks but no GPU available"); break; } g_debug ("Setting default GPU %p for %s-%p", g_list_nth_data (gpu_nodes, 0), ufo_task_node_get_plugin_name (UFO_TASK_NODE (task)), (gpointer) task); ufo_task_node_set_proc_node (UFO_TASK_NODE (task), g_list_nth_data (gpu_nodes, 0)); } } ufo_task_setup (task, resources, error); if (*error != NULL) break; } g_list_free (nodes); return data; } static void ufo_fixed_scheduler_run (UfoBaseScheduler *scheduler, UfoTaskGraph *task_graph, GError **error) { UfoArchGraph *arch; UfoResources *resources; ProcessData *pdata; GList *threads; GList *it; GError *tmp_error = NULL; g_return_if_fail (UFO_IS_FIXED_SCHEDULER (scheduler)); arch = ufo_base_scheduler_get_arch (scheduler); resources = ufo_arch_graph_get_resources (arch); pdata = setup_tasks (UFO_GRAPH (task_graph), scheduler, resources, &tmp_error); if (tmp_error != NULL) { g_propagate_error (error, tmp_error); return; } threads = NULL; g_list_for (pdata->tasks, it) { GThread *thread; TaskData *tdata; tdata = g_new0 (TaskData, 1); tdata->graph = UFO_GRAPH (task_graph); tdata->task = UFO_TASK (it->data); tdata->connections = pdata->connections; tdata->context = ufo_resources_get_context (resources); thread = g_thread_create ((GThreadFunc) run_local, tdata, TRUE, error); threads = g_list_append (threads, thread); } #ifdef WITH_PYTHON if (Py_IsInitialized ()) { PyGILState_STATE state = PyGILState_Ensure (); Py_BEGIN_ALLOW_THREADS join_threads (threads); Py_END_ALLOW_THREADS PyGILState_Release (state); } else { join_threads (threads); } #else join_threads (threads); #endif g_list_free (threads); } static void ufo_fixed_scheduler_dispose (GObject *object) { UfoFixedSchedulerPrivate *priv; priv = UFO_FIXED_SCHEDULER_GET_PRIVATE (object); if (priv->arch != NULL) { g_object_unref (priv->arch); priv->arch = NULL; } G_OBJECT_CLASS (ufo_fixed_scheduler_parent_class)->dispose (object); } static void ufo_fixed_scheduler_class_init (UfoFixedSchedulerClass *klass) { GObjectClass *oclass; UfoBaseSchedulerClass *sclass; sclass = UFO_BASE_SCHEDULER_CLASS (klass); sclass->run = ufo_fixed_scheduler_run; oclass = G_OBJECT_CLASS (klass); oclass->dispose = ufo_fixed_scheduler_dispose; g_type_class_add_private (klass, sizeof (UfoFixedSchedulerPrivate)); } static void ufo_fixed_scheduler_init (UfoFixedScheduler *scheduler) { scheduler->priv = UFO_FIXED_SCHEDULER_GET_PRIVATE (scheduler); scheduler->priv->arch = NULL; }