/* * Copyright (C) 2011-2013 Karlsruhe Institute of Technology * * This file is part of runjson. * * runjson is free software: you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation, either * version 3 of the License, or (at your option) any later version. * * runjson is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public * License along with runjson. If not, see . */ #define _POSIX_C_SOURCE 1 #include "config.h" #include #include #include #include #ifdef WITH_MPI #include #include #endif typedef struct { gchar **addresses; gchar *scheduler; gboolean trace; gboolean timestamps; gboolean version; gboolean quiet; gboolean quieter; } Options; static void handle_error (const gchar *prefix, GError *error, UfoGraph *graph) { if (error) { g_printerr ("%s: %s", prefix, error->message); g_error_free (error); exit (EXIT_FAILURE); } } static GValueArray * string_array_to_value_array (gchar **array) { GValueArray *result = NULL; if (array == NULL) return NULL; result = g_value_array_new (0); for (guint i = 0; array[i] != NULL; i++) { GValue *tmp = (GValue *) g_malloc0 (sizeof (GValue)); g_value_init (tmp, G_TYPE_STRING); g_value_set_string (tmp, array[i]); result = g_value_array_append (result, tmp); } return result; } static void progress_update (gpointer user) { static int n = 0; g_print ("\33[2K\r%i items processed ...", ++n); } static void execute_json (const gchar *filename, const Options *options) { UfoTaskGraph *task_graph; UfoBaseScheduler *scheduler = NULL; UfoPluginManager *manager; GList *leaves; UfoResources *resources = NULL; GValueArray *address_list = NULL; GError *error = NULL; gboolean have_tty; manager = ufo_plugin_manager_new (); task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ()); ufo_task_graph_read_from_file (task_graph, manager, filename, &error); handle_error ("Reading JSON", error, UFO_GRAPH (task_graph)); have_tty = isatty (fileno (stdin)); leaves = ufo_graph_get_leaves (UFO_GRAPH (task_graph)); if (!options->quiet && have_tty) { UfoTaskNode *leaf; leaf = UFO_TASK_NODE (leaves->data); g_signal_connect (leaf, "processed", G_CALLBACK (progress_update), NULL); } if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "fixed"))) { g_debug ("INFO: run-json: using fixed-scheduler"); scheduler = ufo_fixed_scheduler_new (); } /* if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "local"))) { g_debug ("INFO: run-json: using local-scheduler"); scheduler = ufo_local_scheduler_new (); } if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "group"))) { g_debug ("INFO: run-json: using group-scheduler"); scheduler = ufo_group_scheduler_new (); } */ if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "dynamic"))) { g_debug ("INFO: run-json: using dynamic scheduler"); scheduler = ufo_scheduler_new (); } if (!scheduler) { g_debug ("INFO: run-json: using dynamic scheduler by default"); scheduler = ufo_scheduler_new (); } g_object_set (scheduler, "enable-tracing", options->trace, "timestamps", options->timestamps, NULL); address_list = string_array_to_value_array (options->addresses); if (address_list) { resources = UFO_RESOURCES (ufo_resources_new (NULL)); g_object_set (G_OBJECT (resources), "remotes", address_list, NULL); g_value_array_free (address_list); ufo_base_scheduler_set_resources (scheduler, resources); } ufo_base_scheduler_run (scheduler, task_graph, &error); handle_error ("Executing", error, UFO_GRAPH (task_graph)); if (!options->quieter) { gdouble run_time; if (!options->quiet && have_tty) g_print ("\n"); g_object_get (scheduler, "time", &run_time, NULL); g_print ("Finished in %3.5fs\n", run_time); } g_list_free (leaves); g_object_unref (task_graph); g_object_unref (scheduler); g_object_unref (manager); if (resources != NULL) g_object_unref (resources); } #ifdef WITH_MPI static void mpi_terminate_processes (gint global_size) { for (int i = 1; i < global_size; i++) { gchar *addr = g_strdup_printf ("%d", i); UfoMessage *poisonpill = ufo_message_new (UFO_MESSAGE_TERMINATE, 0); UfoMessenger *msger = UFO_MESSENGER (ufo_mpi_messenger_new ()); ufo_mpi_messenger_connect (msger, addr, UFO_MESSENGER_CLIENT); g_debug ("sending poisonpill to %s", addr); ufo_messenger_send_blocking (msger, poisonpill, NULL); ufo_message_free (poisonpill); ufo_messenger_disconnect (msger); } } static gchar** mpi_build_addresses (gint global_size) { /* build addresses by MPI_COMM_WORLD size, exclude rank 0 but have room for NULL termination */ gchar **addresses = g_malloc (sizeof (gchar *) * global_size); for (int i = 1; i < global_size; i++) { addresses[i - 1] = g_strdup_printf ("%d", i); } addresses[global_size - 1] = NULL; return addresses; } static void mpi_init (int *argc, char *argv[], gint *rank, gint *global_size) { gint provided; MPI_Init_thread (argc, &argv, MPI_THREAD_MULTIPLE, &provided); MPI_Comm_rank (MPI_COMM_WORLD, rank); MPI_Comm_size (MPI_COMM_WORLD, global_size); if (*global_size == 1) { g_critical ("Warning: running MPI instance but found only single process"); exit (0); } #ifdef DEBUG // get us some time to attach a gdb session to the pids g_debug ("Process PID %d ranked %d of %d - ready for attach\n", getpid(), rank, *global_size - 1); sleep (3); #endif } #endif int main(int argc, char *argv[]) { GOptionContext *context; GError *error = NULL; static Options options = { .addresses = NULL, .scheduler = NULL, .trace = FALSE, .timestamps = FALSE, .version = FALSE, .quiet = FALSE, .quieter = FALSE, }; GOptionEntry entries[] = { { "trace", 't', 0, G_OPTION_ARG_NONE, &options.trace, "enable tracing", NULL }, { "scheduler", 's', 0, G_OPTION_ARG_STRING, &options.scheduler, "selecting a scheduler", "dynamic|fixed"}, { "address", 'a', 0, G_OPTION_ARG_STRING_ARRAY, &options.addresses, "Address of remote server running `ufod'", NULL }, { "timestamps", 0, 0, G_OPTION_ARG_NONE, &options.timestamps, "enable timestamps", NULL }, { "quiet", 'q', 0, G_OPTION_ARG_NONE, &options.quiet, "be quiet", NULL }, { "quieter", 0, 0, G_OPTION_ARG_NONE, &options.quieter, "be quieter", NULL }, { "version", 'v', 0, G_OPTION_ARG_NONE, &options.version, "Show version information", NULL }, { NULL } }; #if !(GLIB_CHECK_VERSION (2, 36, 0)) g_type_init(); #endif context = g_option_context_new ("FILE"); g_option_context_add_main_entries (context, entries, NULL); if (!g_option_context_parse (context, &argc, &argv, &error)) { g_printerr ("Option parsing failed: %s\n", error->message); return 1; } if (options.version) { g_print ("runjson %s\n", UFO_VERSION); exit (EXIT_SUCCESS); } if (argc < 2) { gchar *help; help = g_option_context_get_help (context, TRUE, NULL); g_print ("%s", help); g_free (help); return 1; } #ifdef WITH_MPI gint rank, size; mpi_init (&argc, argv, &rank, &size); if (rank == 0) { addresses = mpi_build_addresses (size); } else { gchar *addr = g_strdup_printf("%d", rank); UfoDaemon *daemon = ufo_daemon_new (addr); ufo_daemon_start (daemon); ufo_daemon_wait_finish (daemon); MPI_Finalize (); exit(EXIT_SUCCESS); } #endif execute_json (argv[argc-1], &options); #ifdef WITH_MPI if (rank == 0) { mpi_terminate_processes (size); MPI_Finalize (); } #endif g_strfreev (options.addresses); g_option_context_free (context); return 0; }