/* * Copyright (C) 2011-2013 Karlsruhe Institute of Technology * * This file is part of Ufo. * * This library is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, either * version 3 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include "config.h" #ifdef WITH_PYTHON #include #endif #ifdef __APPLE__ #include #else #include #endif #include #include #include #include #include #include #include #include #include #include "ufo-priv.h" #include "compat.h" /** * SECTION:ufo-local-scheduler * @Short_description: Schedule each task independently * @Title: UfoLocalScheduler * * This scheduler schedules each task autonomously without taking relations * between tasks into account. It is not recommended to use this scheduler in * production. */ G_DEFINE_TYPE (UfoLocalScheduler, ufo_local_scheduler, UFO_TYPE_BASE_SCHEDULER) #define UFO_LOCAL_SCHEDULER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_LOCAL_SCHEDULER, UfoLocalSchedulerPrivate)) typedef struct { GQueue *queue; GMutex *lock; } ProcessorPool; typedef struct { gpointer context; ProcessorPool *pp; UfoTask *task; UfoTwoWayQueue **inputs; UfoTwoWayQueue *output; guint n_inputs; gboolean is_leaf; } TaskLocal; enum { PROP_0, N_PROPERTIES, }; static UfoBuffer *POISON_PILL = (UfoBuffer *) 0x1; /** * UfoLocalSchedulerError: * @UFO_LOCAL_SCHEDULER_ERROR_SETUP: Could not start scheduler due to error */ GQuark ufo_local_scheduler_error_quark (void) { return g_quark_from_static_string ("ufo-scheduler-error-quark"); } static ProcessorPool * ufo_pp_new (GList *init) { ProcessorPool *pp; GList *jt; pp = g_malloc0 (sizeof (ProcessorPool)); pp->queue = g_queue_new (); pp->lock = g_mutex_new (); g_list_for (init, jt) { g_queue_push_tail (pp->queue, jt->data); } return pp; } static void ufo_pp_destroy (ProcessorPool *pp) { g_queue_free (pp->queue); g_mutex_free (pp->lock); } static gpointer ufo_pp_next (ProcessorPool *pp) { gpointer data; g_mutex_lock (pp->lock); data = g_queue_pop_head (pp->queue); g_queue_push_tail (pp->queue, data); g_mutex_unlock (pp->lock); return data; } /** * ufo_local_scheduler_new: * * Creates a new #UfoLocalScheduler. * * Return value: A new #UfoLocalScheduler */ UfoBaseScheduler * ufo_local_scheduler_new (void) { return UFO_BASE_SCHEDULER (g_object_new (UFO_TYPE_LOCAL_SCHEDULER, NULL)); } static gboolean pop_input_data (TaskLocal *local, UfoBuffer **inputs) { for (guint i = 0; i < local->n_inputs; i++) { inputs[i] = ufo_two_way_queue_consumer_pop (local->inputs[i]); if (inputs[i] == POISON_PILL) return FALSE; } return TRUE; } static void release_input_data (TaskLocal *local, UfoBuffer **inputs) { for (guint i = 0; i < local->n_inputs; i++) ufo_two_way_queue_consumer_push (local->inputs[i], inputs[i]); } static GError * run_local (TaskLocal *local) { UfoBuffer **inputs; UfoBuffer *output; UfoRequisition requisition; UfoTask *task; UfoTaskMode mode; UfoTaskMode pu_mode; /* gboolean shared; */ gboolean active = TRUE; task = local->task; inputs = g_new0 (UfoBuffer *, local->n_inputs); mode = ufo_task_get_mode (task) & UFO_TASK_MODE_TYPE_MASK; pu_mode = ufo_task_get_mode (task) & UFO_TASK_MODE_PROCESSOR_MASK; /* shared = ufo_task_get_mode (task) & UFO_TASK_MODE_SHARE_DATA; */ /* */ output = NULL; requisition.n_dims = 0; while (active) { /* Fetch data from parent locals */ active = pop_input_data (local, inputs); if (!active) break; /* Choose next task of the local */ /* profiler = ufo_task_node_get_profiler (UFO_TASK_NODE (task)); */ /* Ask current task about size requirements */ ufo_task_get_requisition (task, inputs, &requisition); /* Insert output buffers as longs as capacity is not filled */ if (!local->is_leaf) { if (ufo_two_way_queue_get_capacity (local->output) < 2) { UfoBuffer *buffer; buffer = ufo_buffer_new (&requisition, local->context); ufo_two_way_queue_insert (local->output, buffer); } output = ufo_two_way_queue_producer_pop (local->output); } if (pu_mode == UFO_TASK_MODE_GPU) { ufo_task_node_set_proc_node (UFO_TASK_NODE (task), UFO_NODE (ufo_pp_next (local->pp))); } /* Generate/process the data. Because the functions return active state, * we negate it for the finished flag. */ if (mode != UFO_TASK_MODE_REDUCTOR) { if (mode == UFO_TASK_MODE_PROCESSOR) active = ufo_task_process (task, inputs, output, &requisition); else active = ufo_task_generate (task, output, &requisition); if (output != NULL && active) { ufo_two_way_queue_producer_push (local->output, output); } release_input_data (local, inputs); } else { /* This branch handles the reductor mode */ do { active = ufo_task_process (task, inputs, output, &requisition); release_input_data (local, inputs); active = pop_input_data (local, inputs); } while (active); /* Generate and forward as long as reductor produces data */ do { active = ufo_task_generate (task, output, &requisition); if (active) { ufo_two_way_queue_producer_push (local->output, output); output = ufo_two_way_queue_producer_pop (local->output); } } while (active); } } if (!local->is_leaf) ufo_two_way_queue_producer_push (local->output, POISON_PILL); return NULL; } static void join_threads (GList *threads) { GList *it; g_list_for (threads, it) { g_thread_join (it->data); } } static GHashTable * setup_tasks (UfoGraph *graph, UfoResources *resources, ProcessorPool *pp, GError **error) { GHashTable *local; GList *nodes; GList *it; local = g_hash_table_new (g_direct_hash, g_direct_equal); nodes = ufo_graph_get_nodes (graph); g_list_for (nodes, it) { UfoNode *node; UfoTask *task; TaskLocal *data; GList *successors; GList *predecessors; data = g_malloc0 (sizeof (TaskLocal)); node = UFO_NODE (it->data); task = UFO_TASK (node); data->inputs = NULL; data->output = NULL; data->task = task; data->is_leaf = TRUE; data->pp = pp; data->n_inputs = ufo_task_get_num_inputs (task); data->context = ufo_resources_get_context (resources); g_hash_table_insert (local, node, data); successors = ufo_graph_get_successors (graph, UFO_NODE (task)); predecessors = ufo_graph_get_predecessors (graph, UFO_NODE (task)); if (g_list_length (successors) > 0) { UfoNode *succ; TaskLocal *succ_data; gint port; data->is_leaf = FALSE; succ = UFO_NODE (g_list_nth_data (successors, 0)); succ_data = g_hash_table_lookup (local, succ); port = GPOINTER_TO_INT (ufo_graph_get_edge_label (graph, node, succ)); if (succ_data != NULL) { data->output = succ_data->inputs[port]; } else { data->output = ufo_two_way_queue_new (NULL); } } if (g_list_length (predecessors) > 0) { GList *jt; data->inputs = g_new0 (UfoTwoWayQueue *, data->n_inputs); g_list_for (predecessors, jt) { UfoNode *pred; TaskLocal *pred_data; gint port; pred = UFO_NODE (jt->data); pred_data = g_hash_table_lookup (local, pred); port = GPOINTER_TO_INT (ufo_graph_get_edge_label (graph, pred, node)); if (pred_data != NULL) { data->inputs[port] = pred_data->output; } else { data->inputs[port] = ufo_two_way_queue_new (NULL); } } } ufo_task_setup (task, resources, error); g_list_free (successors); g_list_free (predecessors); } g_list_free (nodes); return local; } static void ufo_local_scheduler_run (UfoBaseScheduler *scheduler, UfoTaskGraph *task_graph, GError **error) { UfoResources *resources; ProcessorPool *pp; GHashTable *task_data; GList *local_data; GList *threads; GList *it; GList *gpu_nodes; g_return_if_fail (UFO_IS_LOCAL_SCHEDULER (scheduler)); resources = ufo_base_scheduler_get_resources (scheduler, error); if (resources == NULL) return; gpu_nodes = ufo_resources_get_gpu_nodes (resources); pp = ufo_pp_new (gpu_nodes); g_list_free (gpu_nodes); task_data = setup_tasks (UFO_GRAPH (task_graph), resources, pp, error); local_data = g_hash_table_get_values (task_data); threads = NULL; g_list_for (local_data, it) { GThread *thread; thread = g_thread_create ((GThreadFunc) run_local, it->data, TRUE, error); threads = g_list_append (threads, thread); } #ifdef WITH_PYTHON if (Py_IsInitialized ()) { Py_BEGIN_ALLOW_THREADS join_threads (threads); Py_END_ALLOW_THREADS } else { join_threads (threads); } #else join_threads (threads); #endif ufo_pp_destroy (pp); g_list_free (threads); g_hash_table_destroy (task_data); } static void ufo_local_scheduler_class_init (UfoLocalSchedulerClass *klass) { UfoBaseSchedulerClass *sclass; sclass = UFO_BASE_SCHEDULER_CLASS (klass); sclass->run = ufo_local_scheduler_run; } static void ufo_local_scheduler_init (UfoLocalScheduler *scheduler) { }