summaryrefslogtreecommitdiff
path: root/ufo/ufo-daemon.c
diff options
context:
space:
mode:
Diffstat (limited to 'ufo/ufo-daemon.c')
-rw-r--r--ufo/ufo-daemon.c171
1 files changed, 84 insertions, 87 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index d982131..f2d0602 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -51,27 +51,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT)
#define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate))
struct _UfoDaemonPrivate {
- UfoPluginManager *manager;
- UfoTaskGraph *task_graph;
- UfoBaseScheduler *scheduler;
- GThread *scheduler_thread;
- UfoMessenger *msger;
-
- UfoNode **input_tasks;
- UfoNode *output_task;
- UfoBuffer **inputs;
-
- gchar *listen_address;
- gpointer socket;
- GThread *thread;
- GMutex *startstop_lock;
- GMutex *started_lock;
- GMutex *stopped_lock;
- GCond *started_cond;
- GCond *stopped_cond;
- gboolean has_started;
- gboolean has_stopped;
- guint num_inputs;
+ UfoPluginManager *manager;
+ UfoResources *resources;
+ UfoArchGraph *arch;
+ UfoTaskGraph *task_graph;
+ UfoBaseScheduler *scheduler;
+ GThread *scheduler_thread;
+ gpointer socket;
+ UfoNode *input_task;
+ UfoNode *output_task;
+ UfoBuffer *input;
+ cl_context context;
+ gchar *listen_address;
+ GThread *thread;
+ GMutex *startstop_lock;
+ GMutex *started_lock;
+ GMutex *stopped_lock;
+ gboolean has_started;
+ gboolean has_stopped;
+ GCond *started_cond;
+ GCond *stopped_cond;
+ UfoMessenger *msger;
};
static gpointer run_scheduler (UfoDaemon *daemon);
@@ -79,24 +79,23 @@ static gpointer run_scheduler (UfoDaemon *daemon);
UfoDaemon *
ufo_daemon_new (const gchar *listen_address)
{
+ UfoDaemonPrivate *priv;
UfoDaemon *daemon;
g_return_val_if_fail (listen_address != NULL, NULL);
daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL));
+ priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
priv->listen_address = g_strdup (listen_address);
priv->manager = ufo_plugin_manager_new ();
- priv->scheduler = ufo_scheduler_new ();
+
#ifdef WITH_MPI
priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
-#else
- /* TODO: we should return a GError in the constructor */
- g_warning ("No messenger backend available!");
#endif
+
return daemon;
}
@@ -105,14 +104,12 @@ handle_get_num_devices (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv;
UfoMessage *msg;
- UfoResources *resources;
cl_uint num_devices;
cl_context context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
- resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler));
- context = ufo_resources_get_context (resources);
+ context = ufo_resources_get_context (priv->resources);
UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL));
@@ -201,7 +198,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
priv = UFO_DAEMON_GET_PRIVATE (daemon);
json = read_json (daemon, msg);
- /* Send ack */
+ /* send ack */
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
@@ -227,22 +224,11 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first);
- priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first));
- priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode));
- priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer));
+ priv->input_task = ufo_input_task_new ();
priv->output_task = ufo_output_task_new (2);
- for (guint i = 0; i < priv->num_inputs; i++) {
- priv->input_tasks[i] = ufo_input_task_new();
- ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
- priv->input_tasks[i], first,
- GINT_TO_POINTER (i));
- }
-
- ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
- last, priv->output_task,
- GINT_TO_POINTER (0));
-
+ ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), priv->input_task, first, GINT_TO_POINTER (0));
+ ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), last, priv->output_task, GINT_TO_POINTER (0));
priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL);
g_free (json);
@@ -275,43 +261,36 @@ static void
handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv;
- UfoArchGraph *arch;
- UfoResources *resources;
UfoRequisition requisition;
gpointer context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ context = ufo_resources_get_context (priv->resources);
- /* TODO: We should store the context */
- arch = ufo_base_scheduler_get_arch (priv->scheduler);
- resources = ufo_arch_graph_get_resources (arch);
- context = ufo_resources_get_context (resources);
-
- struct _Header {
+ struct Header {
UfoRequisition requisition;
guint64 buffer_size;
};
char *base = request->data;
- struct _Header *header = (struct _Header *) base;
+ struct Header *header = (struct Header *) base;
+ /* Receive buffer size */
requisition = header->requisition;
- for (guint i = 0; i < priv->num_inputs; i++) {
- if (priv->inputs[i] == NULL) {
- priv->inputs[i] = ufo_buffer_new (&requisition, context);
- }
- else {
- if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition))
- ufo_buffer_resize (priv->inputs[i], &requisition);
- }
+ if (priv->input == NULL) {
+ priv->input = ufo_buffer_new (&requisition, context);
+ }
+ else {
+ if (ufo_buffer_cmp_dimensions (priv->input, &requisition))
+ ufo_buffer_resize (priv->input, &requisition);
+ }
- memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL),
- base + sizeof (struct _Header),
- ufo_buffer_get_size (priv->inputs[i]));
+ memcpy (ufo_buffer_get_host_array (priv->input, NULL),
+ base + sizeof (struct Header),
+ ufo_buffer_get_size (priv->input));
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]);
- }
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
@@ -372,21 +351,21 @@ void handle_cleanup (UfoDaemon *daemon)
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
- /* TODO check that we don't need to execture this branch wen priv->input is null */
- for (guint i = 0; i < priv->num_inputs; i++) {
- if (priv->input_tasks[i] && priv->inputs[i]) {
- ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i]));
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]),
- priv->inputs[i]);
+ /* TODO: check that we don't need to execute this branch wen priv->input is null */
+ if (priv->input_task && priv->input) {
+ ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task));
- g_usleep (1.5 * G_USEC_PER_SEC);
- unref_and_free ((GObject **) &priv->input_tasks[i]);
- unref_and_free ((GObject **) &priv->inputs[i]);
- }
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task),
+ priv->input);
+
+ g_usleep (1.5 * G_USEC_PER_SEC);
+ unref_and_free ((GObject **) &priv->input_task);
+ unref_and_free ((GObject **) &priv->input);
}
unref_and_free ((GObject **) &priv->output_task);
unref_and_free ((GObject **) &priv->task_graph);
+
}
static void
@@ -397,7 +376,7 @@ handle_terminate (UfoDaemon *daemon)
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
- if(priv->scheduler_thread != NULL) {
+ if (priv->scheduler_thread != NULL) {
g_message ("waiting for scheduler to finish");
g_thread_join (priv->scheduler_thread);
g_message ("scheduler finished!");
@@ -410,19 +389,35 @@ static gpointer
run_scheduler (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+
+ g_message ("Start scheduler");
+ priv->scheduler = ufo_scheduler_new ();
+ ufo_base_scheduler_set_arch (priv->scheduler, priv->arch);
ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL);
- g_object_unref (priv->scheduler);
+ g_message ("Done");
+ g_object_unref (priv->scheduler);
priv->scheduler = ufo_scheduler_new ();
+
return NULL;
}
static void
ufo_daemon_start_impl (UfoDaemon *daemon)
{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- g_debug ("UfoDaemon started on address %s", priv->listen_address);
+ UfoDaemonPrivate *priv;
+ GError *error = NULL;
+
+ priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ priv->resources = ufo_resources_new (&error);
+
+ if (error != NULL) {
+ g_warning ("%s\n", error->message);
+ return;
+ }
+
+ priv->arch = UFO_ARCH_GRAPH (ufo_arch_graph_new (priv->resources, NULL));
/* tell the calling thread that we have started */
g_mutex_lock (priv->started_lock);
@@ -431,16 +426,17 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
g_mutex_unlock (priv->started_lock);
gboolean wait_for_messages = TRUE;
- while (wait_for_messages) {
+ while (wait_for_messages) {
GError *err = NULL;
UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err);
+
if (err != NULL) {
- /* if daemon is stopped, socket will be closed and msg_recv
- * will yield an error - we stop
- */
+ /* If daemon is stopped, socket will be closed and msg_recv will
+ * yield an error - we stop. */
wait_for_messages = FALSE;
- } else {
+ }
+ else {
switch (msg->type) {
case UFO_MESSAGE_GET_NUM_DEVICES:
handle_get_num_devices (daemon);
@@ -531,8 +527,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error)
* - we thus send a TERMINATE message to that thread
*/
- UfoMessenger *tmp_msger = NULL;
-
+ UfoMessenger *tmp_msger;
#ifdef WITH_MPI
tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
@@ -621,6 +616,8 @@ ufo_daemon_init (UfoDaemon *self)
{
UfoDaemonPrivate *priv;
self->priv = priv = UFO_DAEMON_GET_PRIVATE (self);
+
+ priv->scheduler = NULL;
priv->startstop_lock = g_mutex_new ();
priv->started_lock = g_mutex_new ();
priv->stopped_lock = g_mutex_new ();