/*
* Copyright (C) 2011-2013 Karlsruhe Institute of Technology
*
* This file is part of Ufo.
*
* This library is free software: you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library. If not, see .
*/
#include
#ifdef WITH_ZMQ
#include
#endif
#ifdef WITH_MPI
#include
#endif
#include
typedef UfoMessengerIface UfoMessengerInterface;
/**
* UfoMessageType:
* @UFO_MESSAGE_STREAM_JSON: use data distribution mode
* @UFO_MESSAGE_REPLICATE_JSON: use replication mode
* @UFO_MESSAGE_GET_NUM_DEVICES: get remote number of devices
* @UFO_MESSAGE_GET_STRUCTURE: get remote structure
* @UFO_MESSAGE_STRUCTURE: returned structure
* @UFO_MESSAGE_GET_REQUISITION: get remote requisition
* @UFO_MESSAGE_REQUISITION: returned requisition
* @UFO_MESSAGE_SEND_INPUTS: insert
* @UFO_MESSAGE_GET_RESULT: get result
* @UFO_MESSAGE_RESULT: returned result
* @UFO_MESSAGE_CLEANUP: demand cleanup
* @UFO_MESSAGE_ACK: acknowledge
* @UFO_MESSAGE_TERMINATE: terminate connection
* @UFO_MESSAGE_INVALID_REQUEST: invalid request reply
*
* The type of a message.
*/
/**
* UfoMessage:
* @type: #UfoMessageType
* @data_size: The size of the data field.
* @data: A #gpointer to the transferred data
*
* A message transfered via a communication channel.
*/
/**
* UfoMessengerRole:
* @UFO_MESSENGER_CLIENT: Messenger is a client
* @UFO_MESSENGER_SERVER: Messenger is a server
*
* The role of a connection endpoint.
*/
/**
* ufo_messenger_create:
* @address: (transfer none) (type utf8): listen address for the messenger
* @error: A #GError used to report errors during messenger creation
*
* Create a new #UfoMessenger basend on the PROTOCOL:// of the given @address
*
* Returns: (transfer full) (allow-none): A new #UfoMessenger or %NULL in case of
* error.
*/
UfoMessenger *
ufo_messenger_create (const gchar *address, GError **error)
{
UfoMessenger *msgr_out = NULL;
GError *error_internal = NULL;
GRegex *regex = g_regex_new ("^[a-z A-Z]+://[a-z A-Z 0-9 \\.]+:[0-9]{1,5}", \
0, G_REGEX_MATCH_NOTEMPTY, &error_internal);
if (error_internal) {
g_propagate_error (error, error_internal);
return NULL;
}
if (g_regex_match_all (regex, address, 0, NULL)) {
gchar **protocol = g_strsplit (address, ":", 2);
g_debug ("Creating messenger for `%s'", protocol[0]);
#ifdef WITH_ZMQ
if (!g_strcmp0 (protocol[0], "tcp")) {
msgr_out = UFO_MESSENGER (ufo_zmq_messenger_new ());
goto done;
}
#endif
#ifdef WITH_MPI
if (!g_strcmp0 (protocol[0], "mpi")) {
msgr_out = UFO_MESSENGER (ufo_mpi_messenger_new ());
goto done;
}
#endif
g_set_error (error, UFO_MESSENGER_ERROR, UFO_MESSENGER_UNKNOWN_PROTOCOL,
"Don't know how to handle protocol '%s://'", protocol[0]);
#if defined(WITH_ZMQ) || (WITH_MPI)
done:
#endif
g_strfreev (protocol);
}
else {
g_set_error (error, UFO_MESSENGER_ERROR, UFO_MESSENGER_INVALID_ADDRESS,
"Given address has invalid format, expecting `://:'.");
}
g_regex_unref (regex);
return msgr_out;
}
void
ufo_message_free (UfoMessage *msg)
{
if (msg == NULL)
return;
g_free (msg->data);
g_free (msg);
}
/**
* ufo_message_new: (skip)
* @type: message type
* @data_size: total size of the message
*
* Create a new message.
*/
UfoMessage *
ufo_message_new (UfoMessageType type, guint64 data_size)
{
UfoMessage *msg = g_malloc (sizeof (UfoMessage));
msg->type = type;
msg->data_size = data_size;
if (data_size == 0)
msg->data = NULL;
else
msg->data = g_malloc (data_size);
return msg;
}
G_DEFINE_INTERFACE (UfoMessenger, ufo_messenger, G_TYPE_OBJECT)
/**
* UfoMessengerError:
* @UFO_MESSENGER_CONNECTION_PROBLEM: Could not establish a connection
* @UFO_MESSENGER_BUFFER_FULL: Buffer is filled up completely
* @UFO_MESSENGER_SIZE_MISSMATCH: Size mismatch
* @UFO_MESSENGER_INVALID_ADDRESS: Given listen address is invalid
* @UFO_MESSENGER_UNKNOWN_PROTOCOL: The given address is of unknown PROTOCOL://
*/
GQuark
ufo_messenger_error_quark ()
{
return g_quark_from_static_string ("ufo-messenger-error-quark");
}
/**
* ufo_messenger_connect:
* @messenger: The messenger object
* @addr: (transfer none) : The address to connect. This is implementation specific.
* @role: The role of the local endpoint (client or server).
* @error: (allow-none): Location for a #GError or %NULL.
*
* Connects a messenger to and endpoint.
*/
void
ufo_messenger_connect (UfoMessenger *messenger,
const gchar *addr,
UfoMessengerRole role,
GError **error)
{
UFO_MESSENGER_GET_IFACE (messenger)->connect (messenger, addr, role, error);
}
void
ufo_messenger_disconnect (UfoMessenger *messenger)
{
UFO_MESSENGER_GET_IFACE (messenger)->disconnect (messenger);
}
/**
* ufo_messenger_send_blocking: (skip)
* @messenger: The messenger object
* @request: (transfer none): The request #UfoMessage.
* @error: A #GError
*
* Returns: (allow-none) : A #UfoMessage response to the sent request.
*
* Sends a #UfoMessage request to the connected
* endpoint and blocks until the message want fully sent.
*/
UfoMessage *
ufo_messenger_send_blocking (UfoMessenger *messenger,
UfoMessage *request,
GError **error)
{
return UFO_MESSENGER_GET_IFACE (messenger)->send_blocking (messenger, request, error);
}
/**
* ufo_messenger_recv_blocking: (skip)
* @messenger: The messenger object.
* @error: The #GError object
*
* Returns: The received #UfoMessage.
*
* Receives a #UfoMessage from the connected endpoint and blocks until the
* message was fully received.
*
*/
UfoMessage *
ufo_messenger_recv_blocking (UfoMessenger *messenger,
GError **error)
{
return UFO_MESSENGER_GET_IFACE (messenger)->recv_blocking (messenger, error);
}
static void
ufo_messenger_default_init (UfoMessengerInterface *iface)
{
}