summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-04-09 15:54:11 +0200
committerMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2015-04-21 09:32:50 +0200
commitb0174d6770a03e77af7682eca91613b197eb6f73 (patch)
tree60d70329d2a69a3c58d3af7c50a30fd4b82f4b89
parentcc0fd83d80acb52df30fa7b90ee475f232c6c01d (diff)
Fix #88: communication broken
No better summary line, too many problems ...
-rw-r--r--ufo/ufo-daemon.c171
-rw-r--r--ufo/ufo-messenger-iface.c1
-rw-r--r--ufo/ufo-remote-node.c7
-rw-r--r--ufo/ufo-task-graph.c22
-rw-r--r--ufo/ufo-zmq-messenger.c75
5 files changed, 124 insertions, 152 deletions
diff --git a/ufo/ufo-daemon.c b/ufo/ufo-daemon.c
index d982131..f2d0602 100644
--- a/ufo/ufo-daemon.c
+++ b/ufo/ufo-daemon.c
@@ -51,27 +51,27 @@ G_DEFINE_TYPE (UfoDaemon, ufo_daemon, G_TYPE_OBJECT)
#define UFO_DAEMON_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_DAEMON, UfoDaemonPrivate))
struct _UfoDaemonPrivate {
- UfoPluginManager *manager;
- UfoTaskGraph *task_graph;
- UfoBaseScheduler *scheduler;
- GThread *scheduler_thread;
- UfoMessenger *msger;
-
- UfoNode **input_tasks;
- UfoNode *output_task;
- UfoBuffer **inputs;
-
- gchar *listen_address;
- gpointer socket;
- GThread *thread;
- GMutex *startstop_lock;
- GMutex *started_lock;
- GMutex *stopped_lock;
- GCond *started_cond;
- GCond *stopped_cond;
- gboolean has_started;
- gboolean has_stopped;
- guint num_inputs;
+ UfoPluginManager *manager;
+ UfoResources *resources;
+ UfoArchGraph *arch;
+ UfoTaskGraph *task_graph;
+ UfoBaseScheduler *scheduler;
+ GThread *scheduler_thread;
+ gpointer socket;
+ UfoNode *input_task;
+ UfoNode *output_task;
+ UfoBuffer *input;
+ cl_context context;
+ gchar *listen_address;
+ GThread *thread;
+ GMutex *startstop_lock;
+ GMutex *started_lock;
+ GMutex *stopped_lock;
+ gboolean has_started;
+ gboolean has_stopped;
+ GCond *started_cond;
+ GCond *stopped_cond;
+ UfoMessenger *msger;
};
static gpointer run_scheduler (UfoDaemon *daemon);
@@ -79,24 +79,23 @@ static gpointer run_scheduler (UfoDaemon *daemon);
UfoDaemon *
ufo_daemon_new (const gchar *listen_address)
{
+ UfoDaemonPrivate *priv;
UfoDaemon *daemon;
g_return_val_if_fail (listen_address != NULL, NULL);
daemon = UFO_DAEMON (g_object_new (UFO_TYPE_DAEMON, NULL));
+ priv = UFO_DAEMON_GET_PRIVATE (daemon);
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
priv->listen_address = g_strdup (listen_address);
priv->manager = ufo_plugin_manager_new ();
- priv->scheduler = ufo_scheduler_new ();
+
#ifdef WITH_MPI
priv->msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
priv->msger = UFO_MESSENGER (ufo_zmq_messenger_new ());
-#else
- /* TODO: we should return a GError in the constructor */
- g_warning ("No messenger backend available!");
#endif
+
return daemon;
}
@@ -105,14 +104,12 @@ handle_get_num_devices (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv;
UfoMessage *msg;
- UfoResources *resources;
cl_uint num_devices;
cl_context context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
msg = ufo_message_new (UFO_MESSAGE_ACK, sizeof (guint16));
- resources = ufo_arch_graph_get_resources (ufo_base_scheduler_get_arch (priv->scheduler));
- context = ufo_resources_get_context (resources);
+ context = ufo_resources_get_context (priv->resources);
UFO_RESOURCES_CHECK_CLERR (clGetContextInfo (context, CL_CONTEXT_NUM_DEVICES, sizeof (cl_uint), &num_devices, NULL));
@@ -201,7 +198,7 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
priv = UFO_DAEMON_GET_PRIVATE (daemon);
json = read_json (daemon, msg);
- /* Send ack */
+ /* send ack */
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
@@ -227,22 +224,11 @@ handle_stream_json (UfoDaemon *daemon, UfoMessage *msg)
first = remove_dummy_if_present (UFO_GRAPH (priv->task_graph), first);
- priv->num_inputs = ufo_task_get_num_inputs (UFO_TASK (first));
- priv->input_tasks = g_malloc0 (priv->num_inputs*sizeof (UfoNode));
- priv->inputs = g_malloc0 (priv->num_inputs*sizeof (UfoBuffer));
+ priv->input_task = ufo_input_task_new ();
priv->output_task = ufo_output_task_new (2);
- for (guint i = 0; i < priv->num_inputs; i++) {
- priv->input_tasks[i] = ufo_input_task_new();
- ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
- priv->input_tasks[i], first,
- GINT_TO_POINTER (i));
- }
-
- ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph),
- last, priv->output_task,
- GINT_TO_POINTER (0));
-
+ ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), priv->input_task, first, GINT_TO_POINTER (0));
+ ufo_graph_connect_nodes (UFO_GRAPH (priv->task_graph), last, priv->output_task, GINT_TO_POINTER (0));
priv->scheduler_thread = g_thread_create ((GThreadFunc) run_scheduler, daemon, TRUE, NULL);
g_free (json);
@@ -275,43 +261,36 @@ static void
handle_send_inputs (UfoDaemon *daemon, UfoMessage *request)
{
UfoDaemonPrivate *priv;
- UfoArchGraph *arch;
- UfoResources *resources;
UfoRequisition requisition;
gpointer context;
priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ context = ufo_resources_get_context (priv->resources);
- /* TODO: We should store the context */
- arch = ufo_base_scheduler_get_arch (priv->scheduler);
- resources = ufo_arch_graph_get_resources (arch);
- context = ufo_resources_get_context (resources);
-
- struct _Header {
+ struct Header {
UfoRequisition requisition;
guint64 buffer_size;
};
char *base = request->data;
- struct _Header *header = (struct _Header *) base;
+ struct Header *header = (struct Header *) base;
+ /* Receive buffer size */
requisition = header->requisition;
- for (guint i = 0; i < priv->num_inputs; i++) {
- if (priv->inputs[i] == NULL) {
- priv->inputs[i] = ufo_buffer_new (&requisition, context);
- }
- else {
- if (ufo_buffer_cmp_dimensions (priv->inputs[i], &requisition))
- ufo_buffer_resize (priv->inputs[i], &requisition);
- }
+ if (priv->input == NULL) {
+ priv->input = ufo_buffer_new (&requisition, context);
+ }
+ else {
+ if (ufo_buffer_cmp_dimensions (priv->input, &requisition))
+ ufo_buffer_resize (priv->input, &requisition);
+ }
- memcpy (ufo_buffer_get_host_array (priv->inputs[i], NULL),
- base + sizeof (struct _Header),
- ufo_buffer_get_size (priv->inputs[i]));
+ memcpy (ufo_buffer_get_host_array (priv->input, NULL),
+ base + sizeof (struct Header),
+ ufo_buffer_get_size (priv->input));
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]), priv->inputs[i]);
- }
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task), priv->input);
UfoMessage *response = ufo_message_new (UFO_MESSAGE_ACK, 0);
ufo_messenger_send_blocking (priv->msger, response, NULL);
@@ -372,21 +351,21 @@ void handle_cleanup (UfoDaemon *daemon)
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
- /* TODO check that we don't need to execture this branch wen priv->input is null */
- for (guint i = 0; i < priv->num_inputs; i++) {
- if (priv->input_tasks[i] && priv->inputs[i]) {
- ufo_input_task_stop (UFO_INPUT_TASK (priv->input_tasks[i]));
- ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_tasks[i]),
- priv->inputs[i]);
+ /* TODO: check that we don't need to execute this branch wen priv->input is null */
+ if (priv->input_task && priv->input) {
+ ufo_input_task_stop (UFO_INPUT_TASK (priv->input_task));
- g_usleep (1.5 * G_USEC_PER_SEC);
- unref_and_free ((GObject **) &priv->input_tasks[i]);
- unref_and_free ((GObject **) &priv->inputs[i]);
- }
+ ufo_input_task_release_input_buffer (UFO_INPUT_TASK (priv->input_task),
+ priv->input);
+
+ g_usleep (1.5 * G_USEC_PER_SEC);
+ unref_and_free ((GObject **) &priv->input_task);
+ unref_and_free ((GObject **) &priv->input);
}
unref_and_free ((GObject **) &priv->output_task);
unref_and_free ((GObject **) &priv->task_graph);
+
}
static void
@@ -397,7 +376,7 @@ handle_terminate (UfoDaemon *daemon)
ufo_messenger_send_blocking (priv->msger, response, NULL);
ufo_message_free (response);
- if(priv->scheduler_thread != NULL) {
+ if (priv->scheduler_thread != NULL) {
g_message ("waiting for scheduler to finish");
g_thread_join (priv->scheduler_thread);
g_message ("scheduler finished!");
@@ -410,19 +389,35 @@ static gpointer
run_scheduler (UfoDaemon *daemon)
{
UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
+
+ g_message ("Start scheduler");
+ priv->scheduler = ufo_scheduler_new ();
+ ufo_base_scheduler_set_arch (priv->scheduler, priv->arch);
ufo_base_scheduler_run (priv->scheduler, priv->task_graph, NULL);
- g_object_unref (priv->scheduler);
+ g_message ("Done");
+ g_object_unref (priv->scheduler);
priv->scheduler = ufo_scheduler_new ();
+
return NULL;
}
static void
ufo_daemon_start_impl (UfoDaemon *daemon)
{
- UfoDaemonPrivate *priv = UFO_DAEMON_GET_PRIVATE (daemon);
- g_debug ("UfoDaemon started on address %s", priv->listen_address);
+ UfoDaemonPrivate *priv;
+ GError *error = NULL;
+
+ priv = UFO_DAEMON_GET_PRIVATE (daemon);
+ priv->resources = ufo_resources_new (&error);
+
+ if (error != NULL) {
+ g_warning ("%s\n", error->message);
+ return;
+ }
+
+ priv->arch = UFO_ARCH_GRAPH (ufo_arch_graph_new (priv->resources, NULL));
/* tell the calling thread that we have started */
g_mutex_lock (priv->started_lock);
@@ -431,16 +426,17 @@ ufo_daemon_start_impl (UfoDaemon *daemon)
g_mutex_unlock (priv->started_lock);
gboolean wait_for_messages = TRUE;
- while (wait_for_messages) {
+ while (wait_for_messages) {
GError *err = NULL;
UfoMessage *msg = ufo_messenger_recv_blocking (priv->msger, &err);
+
if (err != NULL) {
- /* if daemon is stopped, socket will be closed and msg_recv
- * will yield an error - we stop
- */
+ /* If daemon is stopped, socket will be closed and msg_recv will
+ * yield an error - we stop. */
wait_for_messages = FALSE;
- } else {
+ }
+ else {
switch (msg->type) {
case UFO_MESSAGE_GET_NUM_DEVICES:
handle_get_num_devices (daemon);
@@ -531,8 +527,7 @@ ufo_daemon_stop (UfoDaemon *daemon, GError **error)
* - we thus send a TERMINATE message to that thread
*/
- UfoMessenger *tmp_msger = NULL;
-
+ UfoMessenger *tmp_msger;
#ifdef WITH_MPI
tmp_msger = UFO_MESSENGER (ufo_mpi_messenger_new ());
#elif WITH_ZMQ
@@ -621,6 +616,8 @@ ufo_daemon_init (UfoDaemon *self)
{
UfoDaemonPrivate *priv;
self->priv = priv = UFO_DAEMON_GET_PRIVATE (self);
+
+ priv->scheduler = NULL;
priv->startstop_lock = g_mutex_new ();
priv->started_lock = g_mutex_new ();
priv->stopped_lock = g_mutex_new ();
diff --git a/ufo/ufo-messenger-iface.c b/ufo/ufo-messenger-iface.c
index b02f584..e34d701 100644
--- a/ufo/ufo-messenger-iface.c
+++ b/ufo/ufo-messenger-iface.c
@@ -43,6 +43,7 @@ ufo_message_new (UfoMessageType type, guint64 data_size)
UfoMessage *msg = g_malloc (sizeof (UfoMessage));
msg->type = type;
msg->data_size = data_size;
+
if (data_size == 0)
msg->data = NULL;
else
diff --git a/ufo/ufo-remote-node.c b/ufo/ufo-remote-node.c
index 9f24ae2..13b998e 100644
--- a/ufo/ufo-remote-node.c
+++ b/ufo/ufo-remote-node.c
@@ -193,7 +193,7 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node,
// determine our total message size
guint64 size = 0;
-
+
for (guint i = 0; i < priv->n_inputs; i++) {
guint64 buffer_size = ufo_buffer_get_size (inputs[i]);
size += buffer_size;
@@ -219,7 +219,6 @@ ufo_remote_node_send_inputs (UfoRemoteNode *node,
request->data = buffer;
// send as a single message
ufo_messenger_send_blocking (priv->msger, request, NULL);
-
}
void
@@ -291,7 +290,7 @@ ufo_remote_node_terminate (UfoRemoteNode *node)
ufo_messenger_send_blocking (priv->msger, request, NULL);
ufo_messenger_disconnect (priv->msger);
- return;
+ return;
}
static void
@@ -330,6 +329,6 @@ ufo_remote_node_init (UfoRemoteNode *self)
{
UfoRemoteNodePrivate *priv;
self->priv = priv = UFO_REMOTE_NODE_GET_PRIVATE (self);
- priv->n_inputs = 0;
+ priv->n_inputs = 1;
priv->terminated = FALSE;
}
diff --git a/ufo/ufo-task-graph.c b/ufo/ufo-task-graph.c
index 4aad7a1..b2f7fbc 100644
--- a/ufo/ufo-task-graph.c
+++ b/ufo/ufo-task-graph.c
@@ -18,6 +18,7 @@
*/
#include <string.h>
+#include <stdio.h>
#include <json-glib/json-glib.h>
#include <ufo/ufo-task-graph.h>
#include <ufo/ufo-task-node.h>
@@ -336,35 +337,25 @@ build_remote_graph (UfoTaskGraph *remote_graph,
static void
create_remote_tasks (UfoTaskGraph *task_graph,
UfoTaskGraph *remote_graph,
- GList *first,
+ UfoTaskNode *first,
UfoTaskNode *last,
UfoRemoteNode *remote)
{
UfoTaskGraphPrivate *priv;
UfoTaskNode *task;
gchar *json;
- GList *pred;
- guint port = 0;
- priv = task_graph->priv;
json = ufo_task_graph_get_json_data (remote_graph, NULL);
+ priv = task_graph->priv;
ufo_remote_node_send_json (remote, UFO_REMOTE_MODE_STREAM, json);
task = UFO_TASK_NODE (ufo_remote_task_new ());
priv->remote_tasks = g_list_append (priv->remote_tasks, task);
ufo_task_node_set_proc_node (task, UFO_NODE (remote));
- /* Setting the remote's # of inputs to the # of inputs of the corresponding
- * task */
- GList *roots = ufo_graph_get_roots (UFO_GRAPH (remote_graph));
- guint root_num_inputs = ufo_task_get_num_inputs (UFO_TASK (roots->data));
- ufo_remote_node_set_num_inputs (remote, root_num_inputs);
-
- g_list_for (first->data, pred) {
- ufo_task_graph_connect_nodes_full (task_graph, pred->data, task, port++);
- }
-
+ ufo_task_graph_connect_nodes (task_graph, first, task);
ufo_task_graph_connect_nodes (task_graph, task, last);
+
g_free (json);
}
@@ -381,6 +372,7 @@ expand_remotes (UfoTaskGraph *task_graph,
first = g_list_first (path);
last = g_list_last (path);
+
remote_graph = UFO_TASK_GRAPH (ufo_task_graph_new ());
node = build_remote_graph (remote_graph, first, last);
@@ -392,7 +384,7 @@ expand_remotes (UfoTaskGraph *task_graph,
g_list_for (remotes, it) {
create_remote_tasks (task_graph, remote_graph,
- first, last->data, it->data);
+ first->data, last->data, it->data);
}
g_object_unref (remote_graph);
diff --git a/ufo/ufo-zmq-messenger.c b/ufo/ufo-zmq-messenger.c
index ec86e0c..da87193 100644
--- a/ufo/ufo-zmq-messenger.c
+++ b/ufo/ufo-zmq-messenger.c
@@ -40,11 +40,12 @@ struct _UfoZmqMessengerPrivate {
UfoMessengerRole role;
};
-/* C99 allows flexible length structs that we use to map
-* arbitrary frame lengths that are transferred via zmq.
-* Note: Sizes of datatypes should be fixed and equal on all plattforms
-* (i.e. don't use a gsize as it has different size on x86 & x86_64)
-*/
+/*
+ * C99 allows flexible length structs that we use to map arbitrary frame lengths
+ * that are transferred via zmq. Note: Sizes of datatypes should be fixed and
+ * equal on all plattforms (i.e. don't use a gsize as it has different size on
+ * x86 & x86_64).
+ */
typedef struct _DataFrame {
UfoMessageType type;
guint64 data_size;
@@ -137,7 +138,7 @@ ufo_zmq_messenger_disconnect (UfoMessenger *msger)
zmq_close (priv->zmq_socket);
priv->zmq_socket = NULL;
- // waits for outstanding messages to be flushed
+ /* waits for outstanding messages to be flushed */
zmq_term (priv->zmq_ctx);
g_free (priv->remote_addr);
}
@@ -146,14 +147,6 @@ ufo_zmq_messenger_disconnect (UfoMessenger *msger)
return;
}
-/**
- * ufo_zmq_messenger_send_blocking: (skip)
- * @msger: #UfoMessenger
- * @request_msg: Request message
- * @error: Location for an error or %NULL
- *
- * Send message in blocking way.
- */
static UfoMessage *
ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
UfoMessage *request_msg,
@@ -188,21 +181,23 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
goto finalize;
}
- /* if this is an ACK message, don't expect a response
- * (send_blocking is then most likely being called by the server)
+ /*
+ * If this is an ACK message, don't expect a response (send_blocking is then
+ * most likely being called by the server).
*/
- if (request_msg->type == UFO_MESSAGE_ACK) {
+ if (request_msg->type == UFO_MESSAGE_ACK)
goto finalize;
- }
- /* we always need to receive as response as ZMQ
- * requires REQ/REP/REQ/REP/... scheme
+ /*
+ * We always need to receive as response as ZMQ requires REQ/REP/REQ/REP/...
+ * scheme.
*/
zmq_msg_t reply;
zmq_msg_init (&reply);
err = zmq_msg_recv (&reply, priv->zmq_socket, 0);
gint size = zmq_msg_size (&reply);
+
if (err < 0) {
g_set_error (error, ufo_messenger_error_quark (), zmq_errno(),
"Could not receive from %s: %s ", priv->remote_addr,
@@ -223,36 +218,26 @@ ufo_zmq_messenger_send_blocking (UfoMessenger *msger,
UfoMessage *reply_msg = ufo_message_new (resp_frame->type, resp_frame->data_size);
memcpy (reply_msg->data, resp_frame->data, resp_frame->data_size);
- //if (frame->type != 5 && frame->type != 7 && frame->type!=8)
- //g_message ("Type: %i \tData_size: %i",(int) frame->type, (int) resp_frame->data_size);
-
zmq_msg_close (&reply);
result = reply_msg;
- goto finalize;
-
- finalize:
- g_mutex_unlock (priv->mutex);
- return result;
+finalize:
+ g_mutex_unlock (priv->mutex);
+ return result;
}
-/**
- * ufo_zmq_messenger_recv_blocking: (skip)
- * @msger: A #UfoMessenger
- * @error: Location for an error or %NULL
- *
- * Receive message in blocking way.
- */
static UfoMessage *
ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
GError **error)
{
- UfoZmqMessengerPrivate *priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
+ UfoZmqMessengerPrivate *priv;
+ UfoMessage *result = NULL;
+
+ priv = UFO_ZMQ_MESSENGER_GET_PRIVATE (msger);
g_assert (priv->role == UFO_MESSENGER_SERVER);
g_mutex_lock (priv->mutex);
- UfoMessage *result = NULL;
zmq_msg_t reply;
zmq_msg_init (&reply);
gint err = zmq_msg_recv (&reply, priv->zmq_socket, 0);
@@ -268,7 +253,8 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
DataFrame *frame = zmq_msg_data (&reply);
guint expected_size = (guint) (sizeof (DataFrame) + frame->data_size);
- if ((guint)size != expected_size) {
+
+ if ((guint) size != expected_size) {
g_set_error (error, ufo_messenger_error_quark(),
UFO_MESSENGER_SIZE_MISSMATCH,
"Received unexpected frame size: %d, should be: %d",
@@ -276,16 +262,13 @@ ufo_zmq_messenger_recv_blocking (UfoMessenger *msger,
goto finalize;
}
- UfoMessage *msg = ufo_message_new (frame->type, frame->data_size);
- memcpy (msg->data, frame->data, frame->data_size);
-
+ result = ufo_message_new (frame->type, frame->data_size);
+ memcpy (result->data, frame->data, frame->data_size);
zmq_msg_close (&reply);
- result = msg;
- goto finalize;
- finalize:
- g_mutex_unlock (priv->mutex);
- return result;
+finalize:
+ g_mutex_unlock (priv->mutex);
+ return result;
}
static void