diff options
author | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2017-11-28 09:57:56 +0100 |
---|---|---|
committer | Matthias Vogelgesang <matthias.vogelgesang@kit.edu> | 2018-01-12 10:44:36 +0100 |
commit | c8cb828eab3325dc4a8517c1b0f572aa7dc9dead (patch) | |
tree | 8068b6293cd0613cf36d9723e0da5fa21c676ab4 | |
parent | f6eae00e12b964b0638250580916ba4660c2f5a8 (diff) | |
download | ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.gz ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.bz2 ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.xz ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.zip |
Add zmq-pub and zmq-sub tasks
-rw-r--r-- | docs/examples.rst | 29 | ||||
-rw-r--r-- | docs/generators.rst | 15 | ||||
-rw-r--r-- | docs/sinks.rst | 14 | ||||
-rw-r--r-- | src/CMakeLists.txt | 8 | ||||
-rw-r--r-- | src/meson.build | 23 | ||||
-rw-r--r-- | src/ufo-zmq-common.h | 51 | ||||
-rw-r--r-- | src/ufo-zmq-pub-task.c | 461 | ||||
-rw-r--r-- | src/ufo-zmq-pub-task.h | 53 | ||||
-rw-r--r-- | src/ufo-zmq-sub-task.c | 353 | ||||
-rw-r--r-- | src/ufo-zmq-sub-task.h | 53 |
10 files changed, 1060 insertions, 0 deletions
diff --git a/docs/examples.rst b/docs/examples.rst index f530eb9..1653a45 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -128,6 +128,35 @@ space. To reconstruct, you have to feed the sinograms into :gobj:class:`zeropad` null +================= +Data distribution +================= + +To distribute data in a compute network you can use the :gobj:class:`zmq-pub` +sink and :gobj:class:`zmq-sub` generator. For example, to read data on machine A +and store it on machine B, you would run + +.. code-block:: bash + + ufo-launch read path=/data ! zmq-pub + +on machine A and + +.. code-block:: bash + + ufo-launch zmq-sub address=tcp://hostname-of-machine-a ! write + +on machine B. Note that by default :gobj:class:`zmq-pub` publishes data as soon +as it receives it, thus some of the data will get lost if the +:gobj:class:`zmq-sub` is run after :gobj:class:`zmq-pub`. You can prevent this +by telling the :gobj:class:`zmq-pub` task to wait for a certain number of +subscribers to subscribe: + +.. code-block:: bash + + ufo-launch read path=/data ! zmq-pub expected-subscribers=1 + + .. rubric:: References .. [KaSl01] Kak, A. C., & Slaney, M. (2001). Principles of Computerized Tomographic Imaging (Philadelphia, PA: SIAM). diff --git a/docs/generators.rst b/docs/generators.rst index 67e6f42..7e52fb0 100644 --- a/docs/generators.rst +++ b/docs/generators.rst @@ -145,6 +145,21 @@ Memory reader Specifies the number of items to read. +ZeroMQ subscriber +================= + +.. gobj:class:: zmq-sub + + Generates a stream from a compatible ZeroMQ data stream, for example + published by the :gobj:class:`zmq-pub` task. + + .. gobj:prop:: address:string + + Host address of the ZeroMQ publisher. Note, that as of now the publisher + binds to a ``tcp`` endpoint, thus you have to use that as well. By + default, the address is set to the local host address 127.0.0.1. + + UcaCamera reader ================ diff --git a/docs/sinks.rst b/docs/sinks.rst index d4135da..c78d083 100644 --- a/docs/sinks.rst +++ b/docs/sinks.rst @@ -106,6 +106,20 @@ Memory writer that point only. +ZeroMQ publisher +================ + +.. gobj:class:: zmq-pub + + Publishes the stream as a ZeroMQ data stream to compatible ZeroMQ + subscribers such as the :gobj:class:`zmq-sub` source. + + .. gobj:prop:: expected-subscribers:uint + + If set, the publisher will wait until the number of expected subscribers + have connected. + + Auxiliary sink ============== diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 008734e..d10a4e0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -135,6 +135,7 @@ pkg_check_modules(CLFFT clFFT) pkg_check_modules(CLBLAST clblast) pkg_check_modules(PANGOCAIRO pangocairo) pkg_check_modules(OPENCV opencv) +pkg_check_modules(ZMQ libzmq) if (OPENMP_FOUND) @@ -251,6 +252,13 @@ if (OPENCV_FOUND) list(APPEND cv_show_aux_LIBS ${OPENCV_LIBRARIES}) list(APPEND cv_show_aux_SRCS writers/ufo-writer.c) endif () + +if (ZMQ_FOUND) + include_directories(${ZMQ_INCLUDE_DIRS}) + link_directories(${ZMQ_LIBRARY_DIRS}) + list(APPEND ufofilter_SRCS ufo-zmq-pub-task.c ufo-zmq-sub-task.c) + list(APPEND zmq_pub_aux_LIBS ${ZMQ_LIBRARIES}) +endif () #}}} #{{{ Plugin targets include_directories(${CMAKE_CURRENT_BINARY_DIR} diff --git a/src/meson.build b/src/meson.build index 721cb96..3466c12 100644 --- a/src/meson.build +++ b/src/meson.build @@ -78,6 +78,11 @@ fft_plugins = [ 'retrieve-phase', ] +zmq_plugins = [ + 'zmq-pub', + 'zmq-sub', +] + kernels = [ ] @@ -103,6 +108,8 @@ opencv_dep = dependency('opencv', required: false) uca_dep = dependency('libuca', required: false) clblast_dep = dependency('clblast', required: false) clfft_dep = dependency('clFFT', required: false) +zmq_dep = dependency('libzmq', required: false) +json_dep = dependency('json-glib-1.0', required: false) conf = configuration_data() conf.set('HAVE_AMD', clfft_dep.found()) @@ -274,4 +281,20 @@ if clblast_dep.found() ) endif +# zmq-sub/zmq-pub + +if zmq_dep.found() and json_dep.found() + foreach plugin: zmq_plugins + name = ''.join(plugin.split('-')) + + shared_module(name, + 'ufo-@0@-task.c'.format(plugin), + dependencies: deps + [zmq_dep, json_dep], + name_prefix: 'libufofilter', + install: true, + install_dir: plugin_install_dir, + ) + endforeach +endif + subdir('kernels') diff --git a/src/ufo-zmq-common.h b/src/ufo-zmq-common.h new file mode 100644 index 0000000..0d1edbb --- /dev/null +++ b/src/ufo-zmq-common.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2011-2017 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ZMQ_COMMON_H +#define __UFO_ZMQ_COMMON_H + +#include <stdint.h> + +#define ZMQ_MAX_DIMENSIONS 16 +#define ZMQ_MAX_DIMENSION_LENGTH 32768 + +#define ZMQ_REQUEST_REGISTER 0 +#define ZMQ_REQUEST_DATA 1 + +#define ZMQ_REPLY_ACK 0 +#define ZMQ_REPLY_STOP 1 + +#define ZMQ_ERROR_OKAY 0 +#define ZMQ_ERROR_REGISTRATION_EXPECTED 1 +#define ZMQ_ERROR_ALREADY_REGISTERED 2 +#define ZMQ_ERROR_NOT_REGISTERED 3 +#define ZMQ_ERROR_DATA_ALREADY_SENT 4 + +typedef struct { + int32_t id; + guint8 type; +} __attribute__((packed)) ZmqRequest; + +typedef struct { + guint8 error; + guint8 type; +} __attribute__((packed)) ZmqReply; + + +#endif diff --git a/src/ufo-zmq-pub-task.c b/src/ufo-zmq-pub-task.c new file mode 100644 index 0000000..a9beed0 --- /dev/null +++ b/src/ufo-zmq-pub-task.c @@ -0,0 +1,461 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include <string.h> +#include <zmq.h> +#include <json-glib/json-glib.h> +#include "ufo-zmq-pub-task.h" +#include "ufo-zmq-common.h" + + +struct _UfoZmqPubTaskPrivate { + gpointer context; + gpointer socket; + guint expected_subscribers; + guint64 current; + GHashTable *counts; + JsonBuilder *builder; + JsonGenerator *generator; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoZmqPubTask, ufo_zmq_pub_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ZMQ_PUB_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskPrivate)) + +enum { + PROP_0, + PROP_EXPECTED_SUBSCRIBERS, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_zmq_pub_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ZMQ_PUB_TASK, NULL)); +} + +static gboolean +handle_registration (UfoZmqPubTaskPrivate *priv, ZmqRequest *request, gboolean insert) +{ + zmq_msg_t msg; + ZmqReply *reply; + gboolean success = TRUE; + + zmq_msg_init_size (&msg, sizeof (ZmqReply)); + reply = zmq_msg_data (&msg); + reply->error = ZMQ_ERROR_OKAY; + reply->type = ZMQ_REPLY_ACK; + + if (request->type != ZMQ_REQUEST_REGISTER) { + reply->error = ZMQ_ERROR_REGISTRATION_EXPECTED; + success = FALSE; + goto send_registration_reply; + } + + if (g_hash_table_lookup (priv->counts, GINT_TO_POINTER (request->id)) != NULL) { + reply->error = ZMQ_ERROR_ALREADY_REGISTERED; + success = FALSE; + goto send_registration_reply; + } + + if (insert) + g_hash_table_insert (priv->counts, GINT_TO_POINTER (request->id), GINT_TO_POINTER (1)); + +send_registration_reply: + g_assert (zmq_msg_send (&msg, priv->socket, 0) >= 0); + return success; +} + +static void +ufo_zmq_pub_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + UfoZmqPubTaskPrivate *priv; + + priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (task); + + priv->context = zmq_ctx_new (); + priv->current = 0; + + if (priv->context == NULL) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq context creation failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + priv->socket = zmq_socket (priv->context, ZMQ_REP); + + if (priv->socket == NULL) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq pub_socket creation failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + if (zmq_bind (priv->socket, "tcp://*:5555") != 0) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq bind failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + for (guint registered = 0; registered < priv->expected_subscribers; ) { + zmq_msg_t msg; + ZmqRequest *request; + + zmq_msg_init_size (&msg, sizeof (ZmqRequest)); + zmq_msg_recv (&msg, priv->socket, 0); + + request = zmq_msg_data (&msg); + + if (handle_registration (priv, request, TRUE)) + registered++; + + zmq_msg_close (&msg); + } +} + +static void +ufo_zmq_pub_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition) +{ + requisition->n_dims = 0; +} + +static guint +ufo_zmq_pub_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_zmq_pub_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 2; +} + +static UfoTaskMode +ufo_zmq_pub_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_SINK | UFO_TASK_MODE_CPU; +} + +static JsonNode * +requisition_to_json_array (UfoRequisition *requisition) +{ + JsonNode *node; + JsonArray *array; + + array = json_array_sized_new (requisition->n_dims); + + for (guint i = 0; i < requisition->n_dims; i++) { + JsonNode *element; + gint64 length; + + /* start from the last dimension as per htype spec */ + length = (gint64) requisition->dims[requisition->n_dims - 1 - i]; + element = json_node_alloc (); + json_node_init_int (element, length); + json_array_add_element (array, element); + } + + node = json_node_alloc (); + json_node_init_array (node, array); + + return node; +} + +static gboolean +ufo_zmq_pub_task_process (UfoTask *task, + UfoBuffer **inputs, + UfoBuffer *output, + UfoRequisition *requisition) +{ + UfoZmqPubTaskPrivate *priv; + UfoRequisition req; + guint num_to_serve; + gsize size; + gchar *src; + JsonNode *array; + JsonNode *tree; + gchar *header; + gsize header_size; + GList *new_subscribers = NULL; + + priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (task); + ufo_buffer_get_requisition (inputs[0], &req); + size = ufo_buffer_get_size (inputs[0]); + + json_builder_reset (priv->builder); + json_builder_begin_object (priv->builder); + + json_builder_set_member_name (priv->builder, "htype"); + json_builder_add_string_value (priv->builder, "array-1.0"); + + json_builder_set_member_name (priv->builder, "frame"); + json_builder_add_int_value (priv->builder, priv->current); + + json_builder_set_member_name (priv->builder, "type"); + json_builder_add_string_value (priv->builder, "float"); + + array = requisition_to_json_array (&req); + json_builder_set_member_name (priv->builder, "shape"); + json_builder_add_value (priv->builder, array); + + json_builder_end_object (priv->builder); + tree = json_builder_get_root (priv->builder); + + json_generator_set_root (priv->generator, tree); + header = json_generator_to_data (priv->generator, &header_size); + + json_node_unref (tree); + + num_to_serve = g_hash_table_size (priv->counts); + src = (gchar *) ufo_buffer_get_host_array (inputs[0], NULL); + + priv->current++; + + while (num_to_serve > 0) { + zmq_msg_t request_msg; + ZmqRequest *request; + + zmq_msg_init_size (&request_msg, sizeof (ZmqRequest)); + zmq_msg_recv (&request_msg, priv->socket, 0); + + request = zmq_msg_data (&request_msg); + + if (request->type == ZMQ_REQUEST_REGISTER) { + if (handle_registration (priv, request, FALSE)) + new_subscribers = g_list_append (new_subscribers, GINT_TO_POINTER (request->id)); + } + + if (request->type == ZMQ_REQUEST_DATA) { + guint count; + zmq_msg_t reply_msg; + ZmqReply *reply; + + zmq_msg_init_size (&reply_msg, sizeof (ZmqReply)); + reply = zmq_msg_data (&reply_msg); + reply->type = ZMQ_REPLY_ACK; + + count = GPOINTER_TO_INT (g_hash_table_lookup (priv->counts, GINT_TO_POINTER (request->id))); + + if (count == 0) { + reply->error = ZMQ_ERROR_NOT_REGISTERED; + g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0); + zmq_msg_close (&reply_msg); + } + else if (count == priv->current + 1) { + reply->error = ZMQ_ERROR_DATA_ALREADY_SENT; + g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0); + zmq_msg_close (&reply_msg); + } + else { + zmq_msg_t htype_msg; + zmq_msg_t data_msg; + gchar *dst; + + /* send ack */ + reply->error = ZMQ_ERROR_OKAY; + g_assert (zmq_msg_send (&reply_msg, priv->socket, ZMQ_SNDMORE) >= 0); + zmq_msg_close (&reply_msg); + + /* send geometry */ + zmq_msg_init_size (&htype_msg, header_size); + dst = zmq_msg_data (&htype_msg); + memcpy (dst, header, header_size); + g_assert (zmq_msg_send (&htype_msg, priv->socket, ZMQ_SNDMORE) >= 0); + zmq_msg_close (&htype_msg); + + /* send actual payload */ + zmq_msg_init_size (&data_msg, size); + dst = zmq_msg_data (&data_msg); + memcpy (dst, src, size); + g_assert (zmq_msg_send (&data_msg, priv->socket, 0) >= 0); + zmq_msg_close (&data_msg); + + g_hash_table_insert (priv->counts, GINT_TO_POINTER (request->id), GINT_TO_POINTER (priv->current + 1)); + } + } + + num_to_serve--; + zmq_msg_close (&request_msg); + } + + /* now add new subscribers */ + for (GList *it = g_list_first (new_subscribers); it != NULL; it = g_list_next (it)) + g_hash_table_insert (priv->counts, GINT_TO_POINTER (it->data), GINT_TO_POINTER (priv->current)); + + g_list_free (new_subscribers); + g_free (header); + + return TRUE; +} + +static void +ufo_zmq_pub_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoZmqPubTaskPrivate *priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_EXPECTED_SUBSCRIBERS: + priv->expected_subscribers = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_zmq_pub_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoZmqPubTaskPrivate *priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_EXPECTED_SUBSCRIBERS: + g_value_set_uint (value, priv->expected_subscribers); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} +static void +ufo_zmq_pub_task_dispose (GObject *object) +{ + UfoZmqPubTaskPrivate *priv; + + priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object); + + g_object_unref (priv->builder); + g_object_unref (priv->generator); + + G_OBJECT_CLASS (ufo_zmq_pub_task_parent_class)->dispose (object); +} + +static void +ufo_zmq_pub_task_finalize (GObject *object) +{ + UfoZmqPubTaskPrivate *priv; + guint num_to_serve; + + priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object); + num_to_serve = g_hash_table_size (priv->counts); + + while (num_to_serve > 0) { + zmq_msg_t msg; + ZmqRequest *request; + + zmq_msg_init_size (&msg, sizeof (ZmqRequest)); + zmq_msg_recv (&msg, priv->socket, 0); + + request = zmq_msg_data (&msg); + + if (request->type == ZMQ_REQUEST_REGISTER) + g_debug ("zmq-pub: ignoring registration request because of shutdown"); + + if (request->type == ZMQ_REQUEST_DATA) { + zmq_msg_t reply_msg; + ZmqReply *reply; + + zmq_msg_init_size (&reply_msg, sizeof (ZmqReply)); + reply = zmq_msg_data (&reply_msg); + reply->type = ZMQ_REPLY_STOP; + reply->error = ZMQ_ERROR_OKAY; + g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0); + zmq_msg_close (&reply_msg); + num_to_serve--; + } + + zmq_msg_close (&msg); + } + + zmq_close (priv->socket); + zmq_ctx_destroy (priv->context); + + g_hash_table_destroy (priv->counts); + + G_OBJECT_CLASS (ufo_zmq_pub_task_parent_class)->finalize (object); +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_zmq_pub_task_setup; + iface->get_num_inputs = ufo_zmq_pub_task_get_num_inputs; + iface->get_num_dimensions = ufo_zmq_pub_task_get_num_dimensions; + iface->get_mode = ufo_zmq_pub_task_get_mode; + iface->get_requisition = ufo_zmq_pub_task_get_requisition; + iface->process = ufo_zmq_pub_task_process; +} + +static void +ufo_zmq_pub_task_class_init (UfoZmqPubTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_zmq_pub_task_set_property; + oclass->get_property = ufo_zmq_pub_task_get_property; + oclass->dispose = ufo_zmq_pub_task_dispose; + oclass->finalize = ufo_zmq_pub_task_finalize; + + properties[PROP_EXPECTED_SUBSCRIBERS] = + g_param_spec_uint ("expected-subscribers", + "Number of expected subscribers", + "Number of expected subscribers", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoZmqPubTaskPrivate)); +} + +static void +ufo_zmq_pub_task_init(UfoZmqPubTask *self) +{ + self->priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE(self); + self->priv->context = NULL; + self->priv->socket = NULL; + self->priv->expected_subscribers = 0; + self->priv->counts = g_hash_table_new (g_direct_hash, g_direct_equal); + self->priv->builder = json_builder_new_immutable (); + self->priv->generator = json_generator_new (); +} diff --git a/src/ufo-zmq-pub-task.h b/src/ufo-zmq-pub-task.h new file mode 100644 index 0000000..578e2f3 --- /dev/null +++ b/src/ufo-zmq-pub-task.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ZMQ_PUB_TASK_H +#define __UFO_ZMQ_PUB_TASK_H + +#include <ufo/ufo.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_ZMQ_PUB_TASK (ufo_zmq_pub_task_get_type()) +#define UFO_ZMQ_PUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTask)) +#define UFO_IS_ZMQ_PUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_PUB_TASK)) +#define UFO_ZMQ_PUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskClass)) +#define UFO_IS_ZMQ_PUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_PUB_TASK)) +#define UFO_ZMQ_PUB_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskClass)) + +typedef struct _UfoZmqPubTask UfoZmqPubTask; +typedef struct _UfoZmqPubTaskClass UfoZmqPubTaskClass; +typedef struct _UfoZmqPubTaskPrivate UfoZmqPubTaskPrivate; + +struct _UfoZmqPubTask { + UfoTaskNode parent_instance; + + UfoZmqPubTaskPrivate *priv; +}; + +struct _UfoZmqPubTaskClass { + UfoTaskNodeClass parent_class; +}; + +UfoNode *ufo_zmq_pub_task_new (void); +GType ufo_zmq_pub_task_get_type (void); + +G_END_DECLS + +#endif diff --git a/src/ufo-zmq-sub-task.c b/src/ufo-zmq-sub-task.c new file mode 100644 index 0000000..1145272 --- /dev/null +++ b/src/ufo-zmq-sub-task.c @@ -0,0 +1,353 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include <string.h> +#include <zmq.h> +#include <json-glib/json-glib.h> +#include "ufo-zmq-sub-task.h" +#include "ufo-zmq-common.h" + + +struct _UfoZmqSubTaskPrivate { + gint32 id; + gpointer context; + gpointer socket; + gchar *address; + gboolean stop; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoZmqSubTask, ufo_zmq_sub_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ZMQ_SUB_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskPrivate)) + +enum { + PROP_0, + PROP_ADDRESS, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_zmq_sub_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ZMQ_SUB_TASK, NULL)); +} + +static void +ufo_zmq_sub_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + UfoZmqSubTaskPrivate *priv; + gchar *addr; + zmq_msg_t request_msg; + zmq_msg_t reply_msg; + ZmqRequest *request; + ZmqReply *reply; + + priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task); + priv->context = zmq_ctx_new (); + + if (priv->context == NULL) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq context creation failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + priv->socket = zmq_socket (priv->context, ZMQ_REQ); + + if (priv->socket == NULL) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq sub_socket creation failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + addr = g_strdup_printf ("%s:5555", priv->address); + + if (zmq_connect (priv->socket, addr) != 0) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq connect failed: %s\n", zmq_strerror (zmq_errno ())); + g_free (addr); + return; + } + + g_free (addr); + + zmq_msg_init_size (&request_msg, sizeof (ZmqRequest)); + + request = zmq_msg_data (&request_msg); + /* FIXME: use a better scheme than that */ + request->id = priv->id; + request->type = ZMQ_REQUEST_REGISTER; + + if (zmq_msg_send (&request_msg, priv->socket, 0) < 0) { + g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP, + "zmq msg_send failed: %s\n", zmq_strerror (zmq_errno ())); + return; + } + + zmq_msg_close (&request_msg); + + zmq_msg_init_size (&reply_msg, sizeof (ZmqReply)); + zmq_msg_recv (&reply_msg, priv->socket, 0); + reply = zmq_msg_data (&reply_msg); + g_assert (reply->type == ZMQ_REPLY_ACK && reply->error == ZMQ_ERROR_OKAY); + zmq_msg_close (&reply_msg); +} + +static gboolean +request_data (UfoZmqSubTaskPrivate *priv) +{ + zmq_msg_t request_msg; + zmq_msg_t reply_msg; + ZmqRequest *request; + ZmqReply *reply; + + while (1) { + zmq_msg_init_size (&request_msg, sizeof (ZmqRequest)); + request = zmq_msg_data (&request_msg); + request->id = priv->id; + request->type = ZMQ_REQUEST_DATA; + g_assert (zmq_msg_send (&request_msg, priv->socket, 0) > 0); + zmq_msg_close (&request_msg); + + zmq_msg_init_size (&reply_msg, sizeof (ZmqReply)); + zmq_msg_recv (&reply_msg, priv->socket, 0); + reply = zmq_msg_data (&reply_msg); + + if (reply->error == ZMQ_ERROR_REGISTRATION_EXPECTED) { + /* + * We are supposed to wait until all subscribers have connected to + * the publisher. + */ + + zmq_msg_close (&reply_msg); + g_usleep (1000); + } + else if (reply->error != ZMQ_ERROR_OKAY) { + g_warning ("Could not receive data: %i\n", reply->error); + zmq_msg_close (&reply_msg); + return FALSE; + } + else + break; + } + + if (reply->type == ZMQ_REPLY_STOP) + priv->stop = TRUE; + + zmq_msg_close (&reply_msg); + return TRUE; +} + +static void +ufo_zmq_sub_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition) +{ + UfoZmqSubTaskPrivate *priv; + zmq_msg_t htype_msg; + gchar *header; + JsonParser *parser; + JsonObject *object; + JsonArray *array; + GError *error = NULL; + + priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task); + + if (!request_data (priv) || priv->stop) + return; + + zmq_msg_init (&htype_msg); + zmq_msg_recv (&htype_msg, priv->socket, 0); + header = zmq_msg_data (&htype_msg); + parser = json_parser_new_immutable (); + json_parser_load_from_data (parser, header, zmq_msg_size (&htype_msg), &error); + + if (error != NULL) { + g_error ("Error parsing JSON: %s", error->message); + g_error_free (error); + g_object_unref (parser); + return; + } + + object = json_node_get_object (json_parser_get_root (parser)); + array = json_object_get_array_member (object, "shape"); + requisition->n_dims = json_array_get_length (array); + + /* FIXME: we should get this from a public ufo-core header */ + g_assert (requisition->n_dims <= ZMQ_MAX_DIMENSIONS); + + for (guint i = 0; i < requisition->n_dims; i++) { + requisition->dims[requisition->n_dims - 1 - i] = json_array_get_int_element (array, i); + g_assert (requisition->dims[requisition->n_dims - 1 - i] <= ZMQ_MAX_DIMENSION_LENGTH); + } + + zmq_msg_close (&htype_msg); + g_object_unref (parser); +} + +static guint +ufo_zmq_sub_task_get_num_inputs (UfoTask *task) +{ + return 0; +} + +static guint +ufo_zmq_sub_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 2; +} + +static UfoTaskMode +ufo_zmq_sub_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_zmq_sub_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + UfoZmqSubTaskPrivate *priv; + zmq_msg_t msg; + gsize size; + + priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task); + + if (priv->stop) + return FALSE; + + size = ufo_buffer_get_size (output); + zmq_msg_init_size (&msg, size); + zmq_msg_recv (&msg, priv->socket, 0); + g_assert (zmq_msg_size (&msg) == size); + memcpy (ufo_buffer_get_host_array (output, NULL), zmq_msg_data (&msg), size); + zmq_msg_close (&msg); + + return TRUE; +} + +static void +ufo_zmq_sub_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoZmqSubTaskPrivate *priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_ADDRESS: + g_free (priv->address); + priv->address = g_value_dup_string (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_zmq_sub_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoZmqSubTaskPrivate *priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_ADDRESS: + g_value_set_string (value, priv->address); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_zmq_sub_task_finalize (GObject *object) +{ + UfoZmqSubTaskPrivate *priv; + + priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object); + zmq_close (priv->socket); + zmq_ctx_destroy (priv->context); + g_free (priv->address); + + G_OBJECT_CLASS (ufo_zmq_sub_task_parent_class)->finalize (object); +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_zmq_sub_task_setup; + iface->get_num_inputs = ufo_zmq_sub_task_get_num_inputs; + iface->get_num_dimensions = ufo_zmq_sub_task_get_num_dimensions; + iface->get_mode = ufo_zmq_sub_task_get_mode; + iface->get_requisition = ufo_zmq_sub_task_get_requisition; + iface->generate = ufo_zmq_sub_task_generate; +} + +static void +ufo_zmq_sub_task_class_init (UfoZmqSubTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_zmq_sub_task_set_property; + oclass->get_property = ufo_zmq_sub_task_get_property; + oclass->finalize = ufo_zmq_sub_task_finalize; + + properties[PROP_ADDRESS] = + g_param_spec_string ("address", + "ZMQ address to subscribe to", + "ZMQ address to subscribe to", + "tcp://127.0.0.1", + G_PARAM_READWRITE); + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoZmqSubTaskPrivate)); +} + +static void +ufo_zmq_sub_task_init(UfoZmqSubTask *self) +{ + self->priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE(self); + self->priv->context = NULL; + self->priv->socket = NULL; + self->priv->address = g_strdup ("tcp://127.0.0.1"); + self->priv->id = (gint32) g_random_int (); + self->priv->stop = FALSE; +} diff --git a/src/ufo-zmq-sub-task.h b/src/ufo-zmq-sub-task.h new file mode 100644 index 0000000..925e07b --- /dev/null +++ b/src/ufo-zmq-sub-task.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2011-2017 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ZMQ_SUB_TASK_H +#define __UFO_ZMQ_SUB_TASK_H + +#include <ufo/ufo.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_ZMQ_SUB_TASK (ufo_zmq_sub_task_get_type()) +#define UFO_ZMQ_SUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTask)) +#define UFO_IS_ZMQ_SUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_SUB_TASK)) +#define UFO_ZMQ_SUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskClass)) +#define UFO_IS_ZMQ_SUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_SUB_TASK)) +#define UFO_ZMQ_SUB_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskClass)) + +typedef struct _UfoZmqSubTask UfoZmqSubTask; +typedef struct _UfoZmqSubTaskClass UfoZmqSubTaskClass; +typedef struct _UfoZmqSubTaskPrivate UfoZmqSubTaskPrivate; + +struct _UfoZmqSubTask { + UfoTaskNode parent_instance; + + UfoZmqSubTaskPrivate *priv; +}; + +struct _UfoZmqSubTaskClass { + UfoTaskNodeClass parent_class; +}; + +UfoNode *ufo_zmq_sub_task_new (void); +GType ufo_zmq_sub_task_get_type (void); + +G_END_DECLS + +#endif |