/*
* Copyright (C) 2011-2013 Karlsruhe Institute of Technology
*
* This file is part of runjson.
*
* runjson is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* runjson is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with runjson. If not, see .
*/
#define _POSIX_C_SOURCE 1
#include "config.h"
#include
#include
#include
#include
#ifdef WITH_MPI
#include
#include
#endif
typedef struct {
gchar **addresses;
gchar *scheduler;
gboolean trace;
gboolean timestamps;
gboolean version;
gboolean quiet;
gboolean quieter;
} Options;
static void
handle_error (const gchar *prefix, GError *error, UfoGraph *graph)
{
if (error) {
g_printerr ("%s: %s", prefix, error->message);
g_error_free (error);
exit (EXIT_FAILURE);
}
}
static GValueArray *
string_array_to_value_array (gchar **array)
{
GValueArray *result = NULL;
if (array == NULL)
return NULL;
result = g_value_array_new (0);
for (guint i = 0; array[i] != NULL; i++) {
GValue *tmp = (GValue *) g_malloc0 (sizeof (GValue));
g_value_init (tmp, G_TYPE_STRING);
g_value_set_string (tmp, array[i]);
result = g_value_array_append (result, tmp);
}
return result;
}
static void
progress_update (gpointer user)
{
static int n = 0;
g_print ("\33[2K\r%i items processed ...", ++n);
}
static void
execute_json (const gchar *filename,
const Options *options)
{
UfoTaskGraph *task_graph;
UfoBaseScheduler *scheduler = NULL;
UfoPluginManager *manager;
GList *leaves;
UfoResources *resources = NULL;
GValueArray *address_list = NULL;
GError *error = NULL;
gboolean have_tty;
manager = ufo_plugin_manager_new ();
task_graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
ufo_task_graph_read_from_file (task_graph, manager, filename, &error);
handle_error ("Reading JSON", error, UFO_GRAPH (task_graph));
have_tty = isatty (fileno (stdin));
leaves = ufo_graph_get_leaves (UFO_GRAPH (task_graph));
if (!options->quiet && have_tty) {
UfoTaskNode *leaf;
leaf = UFO_TASK_NODE (leaves->data);
g_signal_connect (leaf, "processed", G_CALLBACK (progress_update), NULL);
}
if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "fixed"))) {
g_debug ("INFO: run-json: using fixed-scheduler");
scheduler = ufo_fixed_scheduler_new ();
}
/*
if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "local"))) {
g_debug ("INFO: run-json: using local-scheduler");
scheduler = ufo_local_scheduler_new ();
}
if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "group"))) {
g_debug ("INFO: run-json: using group-scheduler");
scheduler = ufo_group_scheduler_new ();
}
*/
if ((NULL != options->scheduler) && (0 == g_ascii_strcasecmp (options->scheduler, "dynamic"))) {
g_debug ("INFO: run-json: using dynamic scheduler");
scheduler = ufo_scheduler_new ();
}
if (!scheduler) {
g_debug ("INFO: run-json: using dynamic scheduler by default");
scheduler = ufo_scheduler_new ();
}
g_object_set (scheduler,
"enable-tracing", options->trace,
"timestamps", options->timestamps,
NULL);
address_list = string_array_to_value_array (options->addresses);
if (address_list) {
resources = UFO_RESOURCES (ufo_resources_new (NULL));
g_object_set (G_OBJECT (resources), "remotes", address_list, NULL);
g_value_array_free (address_list);
ufo_base_scheduler_set_resources (scheduler, resources);
}
ufo_base_scheduler_run (scheduler, task_graph, &error);
handle_error ("Executing", error, UFO_GRAPH (task_graph));
if (!options->quieter) {
gdouble run_time;
if (!options->quiet && have_tty)
g_print ("\n");
g_object_get (scheduler, "time", &run_time, NULL);
g_print ("Finished in %3.5fs\n", run_time);
}
g_list_free (leaves);
g_object_unref (task_graph);
g_object_unref (scheduler);
g_object_unref (manager);
if (resources != NULL)
g_object_unref (resources);
}
#ifdef WITH_MPI
static void
mpi_terminate_processes (gint global_size)
{
for (int i = 1; i < global_size; i++) {
gchar *addr = g_strdup_printf ("%d", i);
UfoMessage *poisonpill = ufo_message_new (UFO_MESSAGE_TERMINATE, 0);
UfoMessenger *msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
ufo_mpi_messenger_connect (msger, addr, UFO_MESSENGER_CLIENT);
g_debug ("sending poisonpill to %s", addr);
ufo_messenger_send_blocking (msger, poisonpill, NULL);
ufo_message_free (poisonpill);
ufo_messenger_disconnect (msger);
}
}
static gchar**
mpi_build_addresses (gint global_size)
{
/* build addresses by MPI_COMM_WORLD size, exclude rank 0 but
have room for NULL termination */
gchar **addresses = g_malloc (sizeof (gchar *) * global_size);
for (int i = 1; i < global_size; i++) {
addresses[i - 1] = g_strdup_printf ("%d", i);
}
addresses[global_size - 1] = NULL;
return addresses;
}
static void
mpi_init (int *argc, char *argv[], gint *rank, gint *global_size)
{
gint provided;
MPI_Init_thread (argc, &argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_rank (MPI_COMM_WORLD, rank);
MPI_Comm_size (MPI_COMM_WORLD, global_size);
if (*global_size == 1) {
g_critical ("Warning: running MPI instance but found only single process");
exit (0);
}
#ifdef DEBUG
// get us some time to attach a gdb session to the pids
g_debug ("Process PID %d ranked %d of %d - ready for attach\n",
getpid(), rank, *global_size - 1);
sleep (3);
#endif
}
#endif
int main(int argc, char *argv[])
{
GOptionContext *context;
GError *error = NULL;
static Options options = {
.addresses = NULL,
.scheduler = NULL,
.trace = FALSE,
.timestamps = FALSE,
.version = FALSE,
.quiet = FALSE,
.quieter = FALSE,
};
GOptionEntry entries[] = {
{ "trace", 't', 0, G_OPTION_ARG_NONE, &options.trace, "enable tracing", NULL },
{ "scheduler", 's', 0, G_OPTION_ARG_STRING, &options.scheduler, "selecting a scheduler",
"dynamic|fixed"},
{ "address", 'a', 0, G_OPTION_ARG_STRING_ARRAY, &options.addresses, "Address of remote server running `ufod'", NULL },
{ "timestamps", 0, 0, G_OPTION_ARG_NONE, &options.timestamps, "enable timestamps", NULL },
{ "quiet", 'q', 0, G_OPTION_ARG_NONE, &options.quiet, "be quiet", NULL },
{ "quieter", 0, 0, G_OPTION_ARG_NONE, &options.quieter, "be quieter", NULL },
{ "version", 'v', 0, G_OPTION_ARG_NONE, &options.version, "Show version information", NULL },
{ NULL }
};
#if !(GLIB_CHECK_VERSION (2, 36, 0))
g_type_init();
#endif
context = g_option_context_new ("FILE");
g_option_context_add_main_entries (context, entries, NULL);
if (!g_option_context_parse (context, &argc, &argv, &error)) {
g_printerr ("Option parsing failed: %s\n", error->message);
return 1;
}
if (options.version) {
g_print ("runjson %s\n", UFO_VERSION);
exit (EXIT_SUCCESS);
}
if (argc < 2) {
gchar *help;
help = g_option_context_get_help (context, TRUE, NULL);
g_print ("%s", help);
g_free (help);
return 1;
}
#ifdef WITH_MPI
gint rank, size;
mpi_init (&argc, argv, &rank, &size);
if (rank == 0) {
addresses = mpi_build_addresses (size);
}
else {
gchar *addr = g_strdup_printf("%d", rank);
UfoDaemon *daemon = ufo_daemon_new (addr);
ufo_daemon_start (daemon);
ufo_daemon_wait_finish (daemon);
MPI_Finalize ();
exit(EXIT_SUCCESS);
}
#endif
execute_json (argv[argc-1], &options);
#ifdef WITH_MPI
if (rank == 0) {
mpi_terminate_processes (size);
MPI_Finalize ();
}
#endif
g_strfreev (options.addresses);
g_option_context_free (context);
return 0;
}