/* * Copyright (C) 2011-2013 Karlsruhe Institute of Technology * * This file is part of Ufo. * * This library is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation, either * version 3 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library. If not, see . */ #include #include #include #include #include #include #include #include #include #include "compat.h" /** * SECTION:ufo-task-graph * @Short_description: Hold task nodes * @Title: UfoTaskGraph * * The task graph is the central data structure that connects #UfoTaskNode * objects to form computational pipelines and graphs. To execute a task graph, * it has to be passed to a #UfoBaseScheduler. */ G_DEFINE_TYPE (UfoTaskGraph, ufo_task_graph, UFO_TYPE_GRAPH) #define UFO_TASK_GRAPH_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_TASK_GRAPH, UfoTaskGraphPrivate)) struct _UfoTaskGraphPrivate { UfoPluginManager *manager; GHashTable *prop_sets; GHashTable *json_nodes; GList *remote_tasks; guint index; guint total; }; typedef enum { JSON_FILE, JSON_DATA } JsonLocation; static void add_nodes_from_json (UfoTaskGraph *self, JsonNode *, GError **); static void handle_json_prop_set (JsonObject *, const gchar *, JsonNode *, gpointer user); static void handle_json_task_edge (JsonArray *, guint, JsonNode *, gpointer); static void add_task_node_to_json_array (UfoTaskNode *, JsonArray *); static JsonObject *json_object_from_ufo_node (UfoNode *node); static JsonNode *get_json_representation (UfoTaskGraph *, GError **); static UfoTaskNode *create_node_from_json (JsonNode *json_node,UfoPluginManager *manager, GHashTable *prop_sets, GError **error); static void add_node_to_table (JsonObject *object, const gchar *name, JsonNode *node, gpointer table); /* * ChangeLog: * - 1.1: Add "index" and "total" keys to the root object */ static const gchar *JSON_API_VERSION = "1.1"; /** * UfoTaskGraphError: * @UFO_TASK_GRAPH_ERROR_JSON_KEY: Key is not found in JSON * @UFO_TASK_GRAPH_ERROR_BAD_INPUTS: Inputs of a task do not play well with each * other. * * Task graph errors */ GQuark ufo_task_graph_error_quark (void) { return g_quark_from_static_string ("ufo-task-graph-error-quark"); } /** * ufo_task_graph_new: * * Create a new task graph without any nodes. * * Returns: A #UfoGraph that can be upcast to a #UfoTaskGraph. */ UfoGraph * ufo_task_graph_new (void) { UfoTaskGraph *graph; graph = UFO_TASK_GRAPH (g_object_new (UFO_TYPE_TASK_GRAPH, NULL)); return UFO_GRAPH (graph); } static void read_json (UfoTaskGraph *graph, UfoPluginManager *manager, JsonLocation location, const gchar *data, GError **error) { JsonParser *json_parser; JsonNode *json_root; JsonObject *object; GError *tmp_error = NULL; json_parser = json_parser_new (); switch (location) { case JSON_FILE: json_parser_load_from_file (json_parser, data, &tmp_error); break; case JSON_DATA: json_parser_load_from_data (json_parser, data, (gssize) strlen (data), &tmp_error); break; } if (tmp_error != NULL) { g_propagate_prefixed_error (error, tmp_error, "Parsing JSON: "); g_object_unref (json_parser); return; } graph->priv->manager = manager; g_object_ref (manager); json_root = json_parser_get_root (json_parser); object = json_node_get_object (json_root); if (json_object_has_member (object, "index") && json_object_has_member (object, "total")) { guint index = (guint) json_object_get_int_member (object, "index"); guint total = (guint) json_object_get_int_member (object, "total"); ufo_task_graph_set_partition (graph, index, total); } add_nodes_from_json (graph, json_root, error); g_object_unref (json_parser); } /** * ufo_task_graph_read_from_file: * @graph: A #UfoTaskGraph. * @manager: A #UfoPluginManager used to load the filters * @filename: Path and filename to the JSON file * @error: Indicates error in case of failed file loading or parsing * * Read a JSON configuration file to fill the structure of @graph. */ void ufo_task_graph_read_from_file (UfoTaskGraph *graph, UfoPluginManager *manager, const gchar *filename, GError **error) { g_return_if_fail (UFO_IS_TASK_GRAPH (graph) && UFO_IS_PLUGIN_MANAGER (manager) && (filename != NULL)); read_json (graph, manager, JSON_FILE, filename, error); } /** * ufo_task_graph_read_from_data: * @graph: A #UfoTaskGraph. * @manager: A #UfoPluginManager used to load the filters * @json: %NULL-terminated string with JSON data * @error: Indicates error in case of failed file loading or parsing * * Read a JSON configuration file to fill the structure of @graph. */ void ufo_task_graph_read_from_data (UfoTaskGraph *graph, UfoPluginManager *manager, const gchar *json, GError **error) { g_return_if_fail (UFO_IS_TASK_GRAPH (graph) && UFO_IS_PLUGIN_MANAGER (manager) && (json != NULL)); read_json (graph, manager, JSON_DATA, json, error); } static JsonNode * get_json_representation (UfoTaskGraph *graph, GError **error) { GList *task_nodes; GList *it; JsonNode *root_node = json_node_new (JSON_NODE_OBJECT); JsonObject *root_object = json_object_new (); JsonArray *nodes = json_array_new (); JsonArray *edges = json_array_new (); task_nodes = ufo_graph_get_nodes (UFO_GRAPH (graph)); g_list_foreach (task_nodes, (GFunc) add_task_node_to_json_array, nodes); g_list_for (task_nodes, it) { UfoNode *from; GList *successors; GList *jt; from = UFO_NODE (it->data); successors = ufo_graph_get_successors (UFO_GRAPH (graph), from); g_list_for (successors, jt) { UfoNode *to; gint port; JsonObject *to_object; JsonObject *from_object; JsonObject *edge_object; to = UFO_NODE (jt->data); port = GPOINTER_TO_INT (ufo_graph_get_edge_label (UFO_GRAPH (graph), from, to)); to_object = json_object_from_ufo_node (to); from_object = json_object_from_ufo_node (from); edge_object = json_object_new (); json_object_set_int_member (to_object, "input", port); json_object_set_object_member (edge_object, "to", to_object); json_object_set_object_member (edge_object, "from", from_object); json_array_add_object_element (edges, edge_object); } g_list_free (successors); } json_object_set_string_member (root_object, "version", JSON_API_VERSION); json_object_set_array_member (root_object, "nodes", nodes); json_object_set_array_member (root_object, "edges", edges); json_object_set_int_member (root_object, "index", graph->priv->index); json_object_set_int_member (root_object, "total", graph->priv->total); json_node_set_object (root_node, root_object); g_list_free (task_nodes); return root_node; } static JsonGenerator * task_graph_to_generator (UfoTaskGraph *graph, GError **error) { JsonNode *root_node; JsonGenerator *generator; root_node = get_json_representation (graph, error); if (error != NULL && *error != NULL) return NULL; generator = json_generator_new (); json_generator_set_root (generator, root_node); json_node_free (root_node); return generator; } /** * ufo_task_graph_save_to_json: * @graph: A #UfoTaskGraph * @filename: Path and filename to the JSON file * @error: Indicates error in case of failed file saving * * Save a JSON configuration file with the filter structure of @graph. */ void ufo_task_graph_save_to_json (UfoTaskGraph *graph, const gchar *filename, GError **error) { JsonGenerator *generator; generator = task_graph_to_generator (graph, error); if (generator != NULL) { json_generator_to_file (generator, filename, error); g_object_unref (generator); } } /** * ufo_task_graph_get_json_data: * @graph: A #UfoTaskGraph * @error: Indicates error in case of failed serialization * * Serialize @graph to a JSON string. * * Returns: (transfer full): A JSON string describing @graph */ gchar * ufo_task_graph_get_json_data (UfoTaskGraph *graph, GError **error) { JsonGenerator *generator; gchar *json = NULL; generator = task_graph_to_generator (graph, error); if (generator != NULL) { json = json_generator_to_data (generator, NULL); g_object_unref (generator); } return json; } static gboolean is_gpu_task (UfoTask *task, gpointer user_data) { return ufo_task_uses_gpu (task); } static UfoTaskNode * build_remote_graph (UfoTaskGraph *remote_graph, GList *first, GList *last) { UfoTaskNode *node = NULL; UfoTaskNode *predecessor = NULL; for (GList *it = g_list_next (first); it != last && it != NULL; it = g_list_next (it)) { node = UFO_TASK_NODE (it->data); if (predecessor != NULL) ufo_task_graph_connect_nodes (remote_graph, predecessor, node); predecessor = node; } g_assert (node != NULL); return node; } static void create_remote_tasks (UfoTaskGraph *task_graph, UfoTaskGraph *remote_graph, UfoTaskNode *first, UfoTaskNode *last, UfoRemoteNode *remote) { UfoTaskGraphPrivate *priv; UfoTaskNode *task; gchar *json; json = ufo_task_graph_get_json_data (remote_graph, NULL); priv = task_graph->priv; ufo_remote_node_send_json (remote, UFO_REMOTE_MODE_STREAM, json); task = UFO_TASK_NODE (ufo_remote_task_new ()); priv->remote_tasks = g_list_append (priv->remote_tasks, task); ufo_task_node_set_proc_node (task, UFO_NODE (remote)); ufo_task_graph_connect_nodes (task_graph, first, task); ufo_task_graph_connect_nodes (task_graph, task, last); g_debug ("remote: connected %s -> [remote] -> %s", ufo_task_node_get_identifier (first), ufo_task_node_get_identifier (last)); g_free (json); } static void expand_remotes (UfoTaskGraph *task_graph, GList *remotes, GList *path) { UfoTaskGraph *remote_graph; UfoTaskNode *node; GList *first; GList *last; GList *it; first = g_list_first (path); last = g_list_last (path); remote_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); node = build_remote_graph (remote_graph, first, last); if (ufo_graph_get_num_nodes (UFO_GRAPH (remote_graph)) == 0) { ufo_task_graph_connect_nodes (remote_graph, UFO_TASK_NODE (ufo_dummy_task_new ()), node); } g_list_for (remotes, it) { create_remote_tasks (task_graph, remote_graph, first->data, last->data, it->data); } g_object_unref (remote_graph); } static GList * nodes_with_common_ancestries (UfoTaskGraph *graph, GList *path) { GList *result = NULL; GList *it; g_list_for (path, it) { if (ufo_graph_get_num_predecessors (UFO_GRAPH (graph), UFO_NODE (it->data)) > 1) result = g_list_append (result, it->data); } return result; } /** * ufo_task_graph_expand: * @graph: A #UfoTaskGraph * @resources: A #UfoResources objects * @n_gpus: Number of GPUs to expand the graph for * @expand_remote: %TRUE if remote nodes should be inserted * * Expands @graph in a way that most of the resources in @graph can be occupied. * In the simple pipeline case, the longest possible GPU paths are duplicated as * much as there are GPUs in @arch_graph. */ void ufo_task_graph_expand (UfoTaskGraph *graph, UfoResources *resources, guint n_gpus, gboolean expand_remote) { GList *path; GList *common; g_return_if_fail (UFO_IS_TASK_GRAPH (graph)); path = ufo_graph_find_longest_path (UFO_GRAPH (graph), (UfoFilterPredicate) is_gpu_task, NULL); common = nodes_with_common_ancestries (graph, path); if (g_list_length (common) > 1) { g_debug ("WARN More than one nodes have multiple inputs, not going to expand"); g_list_free (common); g_list_free (path); return; } if (g_list_length (common) == 1) { GList *it; g_debug ("INFO Found node with multiple inputs, going to prune it"); g_list_for (path, it) { if (ufo_graph_get_num_predecessors (UFO_GRAPH (graph), UFO_NODE (it->data)) > 1) { GList *predecessor; predecessor = g_list_previous (it); if (predecessor != NULL) path = g_list_remove_link (path, predecessor); } } g_list_free (common); } if (path != NULL && g_list_length (path) > 0) { GList *predecessors; GList *successors; g_object_unref (UFO_NODE (g_list_first (path)->data)); if (g_list_length (path) > 1) g_object_unref (UFO_NODE (g_list_last (path)->data)); /* Add predecessor and successor nodes to path */ predecessors = ufo_graph_get_predecessors (UFO_GRAPH (graph), UFO_NODE (g_list_first (path)->data)); successors = ufo_graph_get_successors (UFO_GRAPH (graph), UFO_NODE (g_list_last (path)->data)); if (predecessors != NULL) path = g_list_prepend (path, g_list_first (predecessors)->data); if (successors != NULL) path = g_list_append (path, g_list_first (successors)->data); g_list_free (predecessors); g_list_free (successors); if (expand_remote) { GList *remotes; guint n_remotes; remotes = ufo_resources_get_remote_nodes (resources); n_remotes = g_list_length (remotes); if (n_remotes > 0) { g_debug ("INFO Expand for %i remote nodes", n_remotes); expand_remotes (graph, remotes, path); } g_list_free (remotes); } g_debug ("INFO Expand for %i GPU nodes", n_gpus); for (guint i = 1; i < n_gpus; i++) ufo_graph_expand (UFO_GRAPH (graph), path); } g_list_free (path); } /** * ufo_task_graph_fuse: * @graph: A #UfoTaskGraph * * Fuses task nodes to increase data locality. * * Note: This is not implemented and a no-op right now. */ void ufo_task_graph_fuse (UfoTaskGraph *graph) { } static void map_proc_node (UfoGraph *graph, UfoNode *node, guint proc_index, GList *gpu_nodes) { UfoNode *proc_node; GList *successors; GList *it; guint n_gpus; proc_node = UFO_NODE (g_list_nth_data (gpu_nodes, proc_index)); if ((ufo_task_uses_gpu (UFO_TASK (node)) || UFO_IS_INPUT_TASK (node)) && (!ufo_task_node_get_proc_node (UFO_TASK_NODE (node)))) { g_debug ("MAP UfoGpuNode-%p -> %s", (gpointer) proc_node, ufo_task_node_get_identifier (UFO_TASK_NODE (node))); ufo_task_node_set_proc_node (UFO_TASK_NODE (node), proc_node); } n_gpus = g_list_length (gpu_nodes); successors = ufo_graph_get_successors (graph, node); g_list_for (successors, it) { map_proc_node (graph, UFO_NODE (it->data), proc_index, gpu_nodes); if (!UFO_IS_REMOTE_TASK (UFO_NODE (it->data))) proc_index = n_gpus > 0 ? (proc_index + 1) % n_gpus : 0; } g_list_free (successors); } /** * ufo_task_graph_is_alright: * @graph: A #UfoTaskGraph * @error: Location for a GError or %NULL * * Check if nodes int the task graph are properly connected. * * Returns: %TRUE if everything is alright, %FALSE else. */ gboolean ufo_task_graph_is_alright (UfoTaskGraph *graph, GError **error) { GList *nodes; GList *it; gboolean alright = TRUE; nodes = ufo_graph_get_nodes (UFO_GRAPH (graph)); /* Check that nodes don't receive input from reductors and processors */ g_list_for (nodes, it) { GList *predecessors; predecessors = ufo_graph_get_predecessors (UFO_GRAPH (graph), UFO_NODE (it->data)); if (g_list_length (predecessors) > 1) { GList *jt; UfoTaskMode combined_modes = UFO_TASK_MODE_INVALID; g_list_for (predecessors, jt) combined_modes |= ufo_task_get_mode (UFO_TASK (jt->data)); if ((combined_modes & UFO_TASK_MODE_PROCESSOR) && (combined_modes & UFO_TASK_MODE_REDUCTOR)) { #if 0 g_set_error (error, UFO_TASK_GRAPH_ERROR, UFO_TASK_GRAPH_ERROR_BAD_INPUTS, "`%s' receives both processor and reductor inputs which may deadlock.", ufo_task_node_get_plugin_name (UFO_TASK_NODE (it->data))); g_list_free (predecessors); g_list_free (nodes); return FALSE; #endif g_warning ("`%s' receives both processor and reductor inputs which may deadlock.", ufo_task_node_get_plugin_name (UFO_TASK_NODE (it->data))); } } g_list_free (predecessors); } g_list_free (nodes); /* Check leaves are sinks */ nodes = ufo_graph_get_leaves (UFO_GRAPH (graph)); g_list_for (nodes, it) { if ((ufo_task_get_mode (UFO_TASK (it->data)) & UFO_TASK_MODE_TYPE_MASK) != UFO_TASK_MODE_SINK) { alright = FALSE; g_set_error (error, UFO_TASK_GRAPH_ERROR, UFO_TASK_GRAPH_ERROR_BAD_INPUTS, "`%s' is a leaf node but not a sink task", ufo_task_node_get_plugin_name (UFO_TASK_NODE (it->data))); break; } } g_list_free (nodes); return alright; } /** * ufo_task_graph_map: * @graph: A #UfoTaskGraph * @gpu_nodes: (transfer none) (element-type Ufo.GpuNode): List of #UfoGpuNode objects * * Map task nodes of @graph to the list of @gpu_nodes. */ void ufo_task_graph_map (UfoTaskGraph *graph, GList *gpu_nodes) { GList *roots; GList *it; roots = ufo_graph_get_roots (UFO_GRAPH (graph)); g_list_for (roots, it) { map_proc_node (UFO_GRAPH (graph), UFO_NODE (it->data), 0, gpu_nodes); } g_list_free (roots); } /** * ufo_task_graph_connect_nodes: * @graph: A #UfoTaskGraph * @n1: A source node * @n2: A destination node * * Connect @n1 with @n2 using @n2's default input port. To specify any other * port, use ufo_task_graph_connect_nodes_full(). */ void ufo_task_graph_connect_nodes (UfoTaskGraph *graph, UfoTaskNode *n1, UfoTaskNode *n2) { ufo_task_graph_connect_nodes_full (graph, n1, n2, 0); } /** * ufo_task_graph_connect_nodes_full: * @graph: A #UfoTaskGraph * @n1: A source node * @n2: A destination node * @input: Input port of @n2 * * Connect @n1 with @n2 using @n2's @input port. */ void ufo_task_graph_connect_nodes_full (UfoTaskGraph *graph, UfoTaskNode *n1, UfoTaskNode *n2, guint input) { g_debug ("CONN %s -> %s [input=%i]", ufo_task_node_get_identifier (n1), ufo_task_node_get_identifier (n2), input); ufo_graph_connect_nodes (UFO_GRAPH (graph), UFO_NODE (n1), UFO_NODE (n2), GINT_TO_POINTER (input)); } /** * ufo_task_graph_set_partition: * @graph: A #UfoTaskGraph * @index: index of @total partitions - 1 * @total: total number of partitions * * Set the partition of this task graph. */ void ufo_task_graph_set_partition (UfoTaskGraph *graph, guint index, guint total) { g_return_if_fail (UFO_IS_TASK_GRAPH (graph)); g_assert (index < total); graph->priv->index = index; graph->priv->total = total; } /** * ufo_task_graph_get_partition: * @graph: A #UfoTaskGraph * @index: Location to store index * @total: Location to store the total number of partitions * * Get the partition structure of @graph. */ void ufo_task_graph_get_partition (UfoTaskGraph *graph, guint *index, guint *total) { g_return_if_fail (UFO_IS_TASK_GRAPH (graph)); *index = graph->priv->index; *total = graph->priv->total; } static void add_nodes_from_json (UfoTaskGraph *self, JsonNode *root, GError **error) { UfoTaskGraphPrivate *priv = UFO_TASK_GRAPH_GET_PRIVATE(self); JsonObject *root_object = json_node_get_object (root); if (json_object_has_member (root_object, "prop-sets")) { JsonObject *sets = json_object_get_object_member (root_object, "prop-sets"); json_object_foreach_member (sets, handle_json_prop_set, priv); } if (json_object_has_member (root_object, "nodes")) { JsonArray *nodes = json_object_get_array_member (root_object, "nodes"); GList *elements = json_array_get_elements (nodes); GList *it; g_list_for (elements, it) { UfoTaskNode *new_node = create_node_from_json (it->data, priv->manager, priv->prop_sets, error); if (new_node != NULL) { const char *name = ufo_task_node_get_identifier (new_node); if (g_hash_table_lookup (priv->json_nodes, name) != NULL) { g_object_unref (new_node); g_set_error (error, UFO_TASK_GRAPH_ERROR, UFO_TASK_GRAPH_ERROR_JSON_KEY, "Duplicate name `%s' found", name); return; } g_hash_table_insert (priv->json_nodes, g_strdup (name), new_node); } else { g_list_free (elements); return; } } g_list_free (elements); /* * We only check edges if we have nodes, anything else doesn't make much * sense. */ if (json_object_has_member (root_object, "edges")) { JsonArray *edges = json_object_get_array_member (root_object, "edges"); json_array_foreach_element (edges, handle_json_task_edge, self); } } } static UfoTaskNode * create_node_from_json (JsonNode *json_node, UfoPluginManager *manager, GHashTable *prop_sets, GError **error) { UfoTaskNode *ret_node = NULL; JsonObject *json_object; GError *tmp_error = NULL; GHashTableIter iter; gpointer key, value; const gchar *plugin_name; const gchar *task_name; json_object = json_node_get_object (json_node); if (!json_object_has_member (json_object, "plugin") || !json_object_has_member (json_object, "name")) { g_set_error (error, UFO_TASK_GRAPH_ERROR, UFO_TASK_GRAPH_ERROR_JSON_KEY, "Node does not have `plugin' or `name' key"); return NULL; } plugin_name = json_object_get_string_member (json_object, "plugin"); if (json_object_has_member (json_object, "package")) { const gchar *package_name = json_object_get_string_member (json_object, "package"); ret_node = ufo_plugin_manager_get_task_from_package (manager, package_name, plugin_name, &tmp_error); } else { ret_node = ufo_plugin_manager_get_task (manager, plugin_name, &tmp_error); } ufo_task_node_set_plugin_name (ret_node, plugin_name); if (tmp_error != NULL) { g_propagate_error (error, tmp_error); return NULL; } task_name = json_object_get_string_member (json_object, "name"); ufo_task_node_set_identifier (ret_node, task_name); GHashTable *nodes_to_process = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); if (json_object_has_member (json_object, "properties")) { JsonObject *prop_object = json_object_get_object_member (json_object, "properties"); json_object_foreach_member (prop_object, add_node_to_table, nodes_to_process); } if (json_object_has_member (json_object, "prop-refs")) { JsonArray *prop_refs = json_object_get_array_member (json_object, "prop-refs"); for (guint i = 0; i < json_array_get_length (prop_refs); i++) { const gchar *ref_name = json_array_get_string_element (prop_refs, i); JsonObject *prop_set = g_hash_table_lookup (prop_sets, ref_name); if (prop_set == NULL) { g_warning ("No property set `%s' found in `prop-sets'", ref_name); } else { json_object_foreach_member (prop_set, add_node_to_table, nodes_to_process); } } } g_hash_table_iter_init (&iter, nodes_to_process); while (g_hash_table_iter_next (&iter, &key, &value)) { JsonNode *node = (JsonNode *) value; if (JSON_NODE_HOLDS_VALUE (node)) { GValue val = {0,}; json_node_get_value (node, &val); g_object_set_property (G_OBJECT(ret_node), key, &val); g_value_unset (&val); } else if (JSON_NODE_HOLDS_OBJECT (node)) { JsonObject *node_object = json_node_get_object (node); if (json_object_has_member (node_object, "plugin")) { UfoTaskNode *inner_node = create_node_from_json (node, manager, prop_sets, error); g_object_force_floating (G_OBJECT (inner_node)); g_object_set (G_OBJECT (ret_node), key, inner_node, NULL); } else { ufo_task_set_json_object_property (UFO_TASK (node), key, node_object); } } else { g_warning ("`%s' is neither a primitive value nor an object!", (char*)key); } } g_hash_table_unref (nodes_to_process); return ret_node; } static void handle_json_task_edge (JsonArray *array, guint index, JsonNode *element, gpointer user) { UfoTaskGraph *graph = user; UfoTaskGraphPrivate *priv = graph->priv; JsonObject *edge; UfoTaskNode *from_node, *to_node; JsonObject *from_object, *to_object; guint to_port; const gchar *from_name; const gchar *to_name; GError *error = NULL; edge = json_node_get_object (element); if (!json_object_has_member (edge, "from") || !json_object_has_member (edge, "to")) { g_error ("Edge does not have `from' or `to' key"); return; } /* Get from details */ from_object = json_object_get_object_member (edge, "from"); if (!json_object_has_member (from_object, "name")) { g_error ("From node does not have `name' key"); return; } from_name = json_object_get_string_member (from_object, "name"); /* Get to details */ to_object = json_object_get_object_member (edge, "to"); if (!json_object_has_member (to_object, "name")) { g_error ("To node does not have `name' key"); return; } to_name = json_object_get_string_member (to_object, "name"); to_port = 0; if (json_object_has_member (to_object, "input")) to_port = (guint) json_object_get_int_member (to_object, "input"); /* Get actual filters and connect them */ from_node = g_hash_table_lookup (priv->json_nodes, from_name); to_node = g_hash_table_lookup (priv->json_nodes, to_name); if (from_node == NULL) g_error ("No filter `%s' defined", from_name); if (to_node == NULL) g_error ("No filter `%s' defined", to_name); ufo_task_graph_connect_nodes_full (graph, from_node, to_node, to_port); if (error != NULL) g_warning ("%s", error->message); } static void handle_json_prop_set (JsonObject *object, const gchar *name, JsonNode *node, gpointer user) { UfoTaskGraphPrivate *priv; JsonObject *properties; priv = (UfoTaskGraphPrivate *) user; properties = json_object_get_object_member (object, name); json_object_ref (properties); g_hash_table_insert (priv->prop_sets, g_strdup (name), properties); } static void add_node_to_table (JsonObject *object, const gchar *name, JsonNode *node, gpointer table) { GHashTable *hash_table = (GHashTable*)table; g_hash_table_insert(hash_table, g_strdup(name), node); } static JsonObject * create_full_json_from_task_node (UfoTaskNode *task_node) { JsonObject *node_object; JsonNode *prop_node; JsonObject *prop_object; GParamSpec **pspecs; guint n_pspecs; const gchar *plugin_name; const gchar *package_name; const gchar *name; node_object = json_object_new (); plugin_name = ufo_task_node_get_plugin_name (task_node); /* plugin_name can be NULL for task graphs expanded with remote nodes */ if (plugin_name == NULL) return NULL; json_object_set_string_member (node_object, "plugin", plugin_name); package_name = ufo_task_node_get_package_name (task_node); if (package_name != NULL) json_object_set_string_member (node_object, "package", package_name); name = ufo_task_node_get_identifier (task_node); g_assert (name != NULL); json_object_set_string_member (node_object, "name", name); prop_node = json_gobject_serialize (G_OBJECT (task_node)); /* Remove num-processed which is a read-only property */ json_object_remove_member (json_node_get_object (prop_node), "num-processed"); prop_object = json_node_get_object (prop_node); pspecs = g_object_class_list_properties (G_OBJECT_GET_CLASS (task_node), &n_pspecs); for (guint i = 0; i < n_pspecs; i++) { GParamSpec *pspec = pspecs[i]; GValue value = { 0, }; /* read only what we can */ if (!(pspec->flags & G_PARAM_READABLE)) continue; /* Process only task node type properties */ if (!UFO_IS_TASK_NODE_CLASS (&(pspec->value_type))) continue; g_value_init (&value, G_PARAM_SPEC_VALUE_TYPE (pspec)); g_object_get_property (G_OBJECT (task_node), pspec->name, &value); /* skip if the value is the default for the property */ if (!g_param_value_defaults (pspec, &value)) { JsonObject *subtask_json_object = create_full_json_from_task_node (g_value_get_object (&value)); json_object_set_object_member (prop_object, pspec->name, subtask_json_object); } g_value_unset (&value); } g_free (pspecs); json_object_set_member (node_object, "properties", prop_node); return node_object; } static void add_task_node_to_json_array (UfoTaskNode *node, JsonArray *array) { json_array_add_object_element (array, create_full_json_from_task_node (node)); } static JsonObject * json_object_from_ufo_node (UfoNode *node) { JsonObject *object; object = json_object_new (); const gchar *unique_name = ufo_task_node_get_identifier (UFO_TASK_NODE (node)); json_object_set_string_member (object, "name", unique_name); return object; } static void ufo_task_graph_dispose (GObject *object) { UfoTaskGraphPrivate *priv; GList *nodes; priv = UFO_TASK_GRAPH_GET_PRIVATE (object); if (priv->manager != NULL) { g_object_unref (priv->manager); priv->manager = NULL; } g_list_foreach (priv->remote_tasks, (GFunc) g_object_unref, NULL); g_list_free (priv->remote_tasks); priv->remote_tasks = NULL; nodes = g_hash_table_get_values (priv->json_nodes); g_list_foreach (nodes, (GFunc) g_object_unref, NULL); g_list_free (nodes); G_OBJECT_CLASS (ufo_task_graph_parent_class)->dispose (object); } static void ufo_task_graph_finalize (GObject *object) { UfoTaskGraphPrivate *priv; priv = UFO_TASK_GRAPH_GET_PRIVATE (object); g_hash_table_destroy (priv->json_nodes); g_hash_table_destroy (priv->prop_sets); G_OBJECT_CLASS (ufo_task_graph_parent_class)->finalize (object); } static void ufo_task_graph_class_init (UfoTaskGraphClass *klass) { GObjectClass *oclass; oclass = G_OBJECT_CLASS (klass); oclass->dispose = ufo_task_graph_dispose; oclass->finalize = ufo_task_graph_finalize; g_type_class_add_private(klass, sizeof(UfoTaskGraphPrivate)); } static void ufo_task_graph_init (UfoTaskGraph *self) { UfoTaskGraphPrivate *priv; self->priv = priv = UFO_TASK_GRAPH_GET_PRIVATE (self); priv->manager = NULL; priv->remote_tasks = NULL; priv->json_nodes = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); priv->prop_sets = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, (GDestroyNotify) json_object_unref); priv->index = 0; priv->total = 1; }