summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2017-11-28 09:57:56 +0100
committerMatthias Vogelgesang <matthias.vogelgesang@kit.edu>2018-01-12 10:44:36 +0100
commitc8cb828eab3325dc4a8517c1b0f572aa7dc9dead (patch)
tree8068b6293cd0613cf36d9723e0da5fa21c676ab4
parentf6eae00e12b964b0638250580916ba4660c2f5a8 (diff)
downloadufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.gz
ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.bz2
ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.tar.xz
ufo-filters-c8cb828eab3325dc4a8517c1b0f572aa7dc9dead.zip
Add zmq-pub and zmq-sub tasks
-rw-r--r--docs/examples.rst29
-rw-r--r--docs/generators.rst15
-rw-r--r--docs/sinks.rst14
-rw-r--r--src/CMakeLists.txt8
-rw-r--r--src/meson.build23
-rw-r--r--src/ufo-zmq-common.h51
-rw-r--r--src/ufo-zmq-pub-task.c461
-rw-r--r--src/ufo-zmq-pub-task.h53
-rw-r--r--src/ufo-zmq-sub-task.c353
-rw-r--r--src/ufo-zmq-sub-task.h53
10 files changed, 1060 insertions, 0 deletions
diff --git a/docs/examples.rst b/docs/examples.rst
index f530eb9..1653a45 100644
--- a/docs/examples.rst
+++ b/docs/examples.rst
@@ -128,6 +128,35 @@ space. To reconstruct, you have to feed the sinograms into :gobj:class:`zeropad`
null
+=================
+Data distribution
+=================
+
+To distribute data in a compute network you can use the :gobj:class:`zmq-pub`
+sink and :gobj:class:`zmq-sub` generator. For example, to read data on machine A
+and store it on machine B, you would run
+
+.. code-block:: bash
+
+ ufo-launch read path=/data ! zmq-pub
+
+on machine A and
+
+.. code-block:: bash
+
+ ufo-launch zmq-sub address=tcp://hostname-of-machine-a ! write
+
+on machine B. Note that by default :gobj:class:`zmq-pub` publishes data as soon
+as it receives it, thus some of the data will get lost if the
+:gobj:class:`zmq-sub` is run after :gobj:class:`zmq-pub`. You can prevent this
+by telling the :gobj:class:`zmq-pub` task to wait for a certain number of
+subscribers to subscribe:
+
+.. code-block:: bash
+
+ ufo-launch read path=/data ! zmq-pub expected-subscribers=1
+
+
.. rubric:: References
.. [KaSl01] Kak, A. C., & Slaney, M. (2001). Principles of Computerized Tomographic Imaging (Philadelphia, PA: SIAM).
diff --git a/docs/generators.rst b/docs/generators.rst
index 67e6f42..7e52fb0 100644
--- a/docs/generators.rst
+++ b/docs/generators.rst
@@ -145,6 +145,21 @@ Memory reader
Specifies the number of items to read.
+ZeroMQ subscriber
+=================
+
+.. gobj:class:: zmq-sub
+
+ Generates a stream from a compatible ZeroMQ data stream, for example
+ published by the :gobj:class:`zmq-pub` task.
+
+ .. gobj:prop:: address:string
+
+ Host address of the ZeroMQ publisher. Note, that as of now the publisher
+ binds to a ``tcp`` endpoint, thus you have to use that as well. By
+ default, the address is set to the local host address 127.0.0.1.
+
+
UcaCamera reader
================
diff --git a/docs/sinks.rst b/docs/sinks.rst
index d4135da..c78d083 100644
--- a/docs/sinks.rst
+++ b/docs/sinks.rst
@@ -106,6 +106,20 @@ Memory writer
that point only.
+ZeroMQ publisher
+================
+
+.. gobj:class:: zmq-pub
+
+ Publishes the stream as a ZeroMQ data stream to compatible ZeroMQ
+ subscribers such as the :gobj:class:`zmq-sub` source.
+
+ .. gobj:prop:: expected-subscribers:uint
+
+ If set, the publisher will wait until the number of expected subscribers
+ have connected.
+
+
Auxiliary sink
==============
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 008734e..d10a4e0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -135,6 +135,7 @@ pkg_check_modules(CLFFT clFFT)
pkg_check_modules(CLBLAST clblast)
pkg_check_modules(PANGOCAIRO pangocairo)
pkg_check_modules(OPENCV opencv)
+pkg_check_modules(ZMQ libzmq)
if (OPENMP_FOUND)
@@ -251,6 +252,13 @@ if (OPENCV_FOUND)
list(APPEND cv_show_aux_LIBS ${OPENCV_LIBRARIES})
list(APPEND cv_show_aux_SRCS writers/ufo-writer.c)
endif ()
+
+if (ZMQ_FOUND)
+ include_directories(${ZMQ_INCLUDE_DIRS})
+ link_directories(${ZMQ_LIBRARY_DIRS})
+ list(APPEND ufofilter_SRCS ufo-zmq-pub-task.c ufo-zmq-sub-task.c)
+ list(APPEND zmq_pub_aux_LIBS ${ZMQ_LIBRARIES})
+endif ()
#}}}
#{{{ Plugin targets
include_directories(${CMAKE_CURRENT_BINARY_DIR}
diff --git a/src/meson.build b/src/meson.build
index 721cb96..3466c12 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -78,6 +78,11 @@ fft_plugins = [
'retrieve-phase',
]
+zmq_plugins = [
+ 'zmq-pub',
+ 'zmq-sub',
+]
+
kernels = [
]
@@ -103,6 +108,8 @@ opencv_dep = dependency('opencv', required: false)
uca_dep = dependency('libuca', required: false)
clblast_dep = dependency('clblast', required: false)
clfft_dep = dependency('clFFT', required: false)
+zmq_dep = dependency('libzmq', required: false)
+json_dep = dependency('json-glib-1.0', required: false)
conf = configuration_data()
conf.set('HAVE_AMD', clfft_dep.found())
@@ -274,4 +281,20 @@ if clblast_dep.found()
)
endif
+# zmq-sub/zmq-pub
+
+if zmq_dep.found() and json_dep.found()
+ foreach plugin: zmq_plugins
+ name = ''.join(plugin.split('-'))
+
+ shared_module(name,
+ 'ufo-@0@-task.c'.format(plugin),
+ dependencies: deps + [zmq_dep, json_dep],
+ name_prefix: 'libufofilter',
+ install: true,
+ install_dir: plugin_install_dir,
+ )
+ endforeach
+endif
+
subdir('kernels')
diff --git a/src/ufo-zmq-common.h b/src/ufo-zmq-common.h
new file mode 100644
index 0000000..0d1edbb
--- /dev/null
+++ b/src/ufo-zmq-common.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2011-2017 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ZMQ_COMMON_H
+#define __UFO_ZMQ_COMMON_H
+
+#include <stdint.h>
+
+#define ZMQ_MAX_DIMENSIONS 16
+#define ZMQ_MAX_DIMENSION_LENGTH 32768
+
+#define ZMQ_REQUEST_REGISTER 0
+#define ZMQ_REQUEST_DATA 1
+
+#define ZMQ_REPLY_ACK 0
+#define ZMQ_REPLY_STOP 1
+
+#define ZMQ_ERROR_OKAY 0
+#define ZMQ_ERROR_REGISTRATION_EXPECTED 1
+#define ZMQ_ERROR_ALREADY_REGISTERED 2
+#define ZMQ_ERROR_NOT_REGISTERED 3
+#define ZMQ_ERROR_DATA_ALREADY_SENT 4
+
+typedef struct {
+ int32_t id;
+ guint8 type;
+} __attribute__((packed)) ZmqRequest;
+
+typedef struct {
+ guint8 error;
+ guint8 type;
+} __attribute__((packed)) ZmqReply;
+
+
+#endif
diff --git a/src/ufo-zmq-pub-task.c b/src/ufo-zmq-pub-task.c
new file mode 100644
index 0000000..a9beed0
--- /dev/null
+++ b/src/ufo-zmq-pub-task.c
@@ -0,0 +1,461 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include <string.h>
+#include <zmq.h>
+#include <json-glib/json-glib.h>
+#include "ufo-zmq-pub-task.h"
+#include "ufo-zmq-common.h"
+
+
+struct _UfoZmqPubTaskPrivate {
+ gpointer context;
+ gpointer socket;
+ guint expected_subscribers;
+ guint64 current;
+ GHashTable *counts;
+ JsonBuilder *builder;
+ JsonGenerator *generator;
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoZmqPubTask, ufo_zmq_pub_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ZMQ_PUB_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_EXPECTED_SUBSCRIBERS,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_zmq_pub_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ZMQ_PUB_TASK, NULL));
+}
+
+static gboolean
+handle_registration (UfoZmqPubTaskPrivate *priv, ZmqRequest *request, gboolean insert)
+{
+ zmq_msg_t msg;
+ ZmqReply *reply;
+ gboolean success = TRUE;
+
+ zmq_msg_init_size (&msg, sizeof (ZmqReply));
+ reply = zmq_msg_data (&msg);
+ reply->error = ZMQ_ERROR_OKAY;
+ reply->type = ZMQ_REPLY_ACK;
+
+ if (request->type != ZMQ_REQUEST_REGISTER) {
+ reply->error = ZMQ_ERROR_REGISTRATION_EXPECTED;
+ success = FALSE;
+ goto send_registration_reply;
+ }
+
+ if (g_hash_table_lookup (priv->counts, GINT_TO_POINTER (request->id)) != NULL) {
+ reply->error = ZMQ_ERROR_ALREADY_REGISTERED;
+ success = FALSE;
+ goto send_registration_reply;
+ }
+
+ if (insert)
+ g_hash_table_insert (priv->counts, GINT_TO_POINTER (request->id), GINT_TO_POINTER (1));
+
+send_registration_reply:
+ g_assert (zmq_msg_send (&msg, priv->socket, 0) >= 0);
+ return success;
+}
+
+static void
+ufo_zmq_pub_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ UfoZmqPubTaskPrivate *priv;
+
+ priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (task);
+
+ priv->context = zmq_ctx_new ();
+ priv->current = 0;
+
+ if (priv->context == NULL) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq context creation failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ priv->socket = zmq_socket (priv->context, ZMQ_REP);
+
+ if (priv->socket == NULL) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq pub_socket creation failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ if (zmq_bind (priv->socket, "tcp://*:5555") != 0) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq bind failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ for (guint registered = 0; registered < priv->expected_subscribers; ) {
+ zmq_msg_t msg;
+ ZmqRequest *request;
+
+ zmq_msg_init_size (&msg, sizeof (ZmqRequest));
+ zmq_msg_recv (&msg, priv->socket, 0);
+
+ request = zmq_msg_data (&msg);
+
+ if (handle_registration (priv, request, TRUE))
+ registered++;
+
+ zmq_msg_close (&msg);
+ }
+}
+
+static void
+ufo_zmq_pub_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition)
+{
+ requisition->n_dims = 0;
+}
+
+static guint
+ufo_zmq_pub_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_zmq_pub_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 2;
+}
+
+static UfoTaskMode
+ufo_zmq_pub_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_SINK | UFO_TASK_MODE_CPU;
+}
+
+static JsonNode *
+requisition_to_json_array (UfoRequisition *requisition)
+{
+ JsonNode *node;
+ JsonArray *array;
+
+ array = json_array_sized_new (requisition->n_dims);
+
+ for (guint i = 0; i < requisition->n_dims; i++) {
+ JsonNode *element;
+ gint64 length;
+
+ /* start from the last dimension as per htype spec */
+ length = (gint64) requisition->dims[requisition->n_dims - 1 - i];
+ element = json_node_alloc ();
+ json_node_init_int (element, length);
+ json_array_add_element (array, element);
+ }
+
+ node = json_node_alloc ();
+ json_node_init_array (node, array);
+
+ return node;
+}
+
+static gboolean
+ufo_zmq_pub_task_process (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ UfoZmqPubTaskPrivate *priv;
+ UfoRequisition req;
+ guint num_to_serve;
+ gsize size;
+ gchar *src;
+ JsonNode *array;
+ JsonNode *tree;
+ gchar *header;
+ gsize header_size;
+ GList *new_subscribers = NULL;
+
+ priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (task);
+ ufo_buffer_get_requisition (inputs[0], &req);
+ size = ufo_buffer_get_size (inputs[0]);
+
+ json_builder_reset (priv->builder);
+ json_builder_begin_object (priv->builder);
+
+ json_builder_set_member_name (priv->builder, "htype");
+ json_builder_add_string_value (priv->builder, "array-1.0");
+
+ json_builder_set_member_name (priv->builder, "frame");
+ json_builder_add_int_value (priv->builder, priv->current);
+
+ json_builder_set_member_name (priv->builder, "type");
+ json_builder_add_string_value (priv->builder, "float");
+
+ array = requisition_to_json_array (&req);
+ json_builder_set_member_name (priv->builder, "shape");
+ json_builder_add_value (priv->builder, array);
+
+ json_builder_end_object (priv->builder);
+ tree = json_builder_get_root (priv->builder);
+
+ json_generator_set_root (priv->generator, tree);
+ header = json_generator_to_data (priv->generator, &header_size);
+
+ json_node_unref (tree);
+
+ num_to_serve = g_hash_table_size (priv->counts);
+ src = (gchar *) ufo_buffer_get_host_array (inputs[0], NULL);
+
+ priv->current++;
+
+ while (num_to_serve > 0) {
+ zmq_msg_t request_msg;
+ ZmqRequest *request;
+
+ zmq_msg_init_size (&request_msg, sizeof (ZmqRequest));
+ zmq_msg_recv (&request_msg, priv->socket, 0);
+
+ request = zmq_msg_data (&request_msg);
+
+ if (request->type == ZMQ_REQUEST_REGISTER) {
+ if (handle_registration (priv, request, FALSE))
+ new_subscribers = g_list_append (new_subscribers, GINT_TO_POINTER (request->id));
+ }
+
+ if (request->type == ZMQ_REQUEST_DATA) {
+ guint count;
+ zmq_msg_t reply_msg;
+ ZmqReply *reply;
+
+ zmq_msg_init_size (&reply_msg, sizeof (ZmqReply));
+ reply = zmq_msg_data (&reply_msg);
+ reply->type = ZMQ_REPLY_ACK;
+
+ count = GPOINTER_TO_INT (g_hash_table_lookup (priv->counts, GINT_TO_POINTER (request->id)));
+
+ if (count == 0) {
+ reply->error = ZMQ_ERROR_NOT_REGISTERED;
+ g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0);
+ zmq_msg_close (&reply_msg);
+ }
+ else if (count == priv->current + 1) {
+ reply->error = ZMQ_ERROR_DATA_ALREADY_SENT;
+ g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0);
+ zmq_msg_close (&reply_msg);
+ }
+ else {
+ zmq_msg_t htype_msg;
+ zmq_msg_t data_msg;
+ gchar *dst;
+
+ /* send ack */
+ reply->error = ZMQ_ERROR_OKAY;
+ g_assert (zmq_msg_send (&reply_msg, priv->socket, ZMQ_SNDMORE) >= 0);
+ zmq_msg_close (&reply_msg);
+
+ /* send geometry */
+ zmq_msg_init_size (&htype_msg, header_size);
+ dst = zmq_msg_data (&htype_msg);
+ memcpy (dst, header, header_size);
+ g_assert (zmq_msg_send (&htype_msg, priv->socket, ZMQ_SNDMORE) >= 0);
+ zmq_msg_close (&htype_msg);
+
+ /* send actual payload */
+ zmq_msg_init_size (&data_msg, size);
+ dst = zmq_msg_data (&data_msg);
+ memcpy (dst, src, size);
+ g_assert (zmq_msg_send (&data_msg, priv->socket, 0) >= 0);
+ zmq_msg_close (&data_msg);
+
+ g_hash_table_insert (priv->counts, GINT_TO_POINTER (request->id), GINT_TO_POINTER (priv->current + 1));
+ }
+ }
+
+ num_to_serve--;
+ zmq_msg_close (&request_msg);
+ }
+
+ /* now add new subscribers */
+ for (GList *it = g_list_first (new_subscribers); it != NULL; it = g_list_next (it))
+ g_hash_table_insert (priv->counts, GINT_TO_POINTER (it->data), GINT_TO_POINTER (priv->current));
+
+ g_list_free (new_subscribers);
+ g_free (header);
+
+ return TRUE;
+}
+
+static void
+ufo_zmq_pub_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoZmqPubTaskPrivate *priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_EXPECTED_SUBSCRIBERS:
+ priv->expected_subscribers = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_zmq_pub_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoZmqPubTaskPrivate *priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_EXPECTED_SUBSCRIBERS:
+ g_value_set_uint (value, priv->expected_subscribers);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+static void
+ufo_zmq_pub_task_dispose (GObject *object)
+{
+ UfoZmqPubTaskPrivate *priv;
+
+ priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object);
+
+ g_object_unref (priv->builder);
+ g_object_unref (priv->generator);
+
+ G_OBJECT_CLASS (ufo_zmq_pub_task_parent_class)->dispose (object);
+}
+
+static void
+ufo_zmq_pub_task_finalize (GObject *object)
+{
+ UfoZmqPubTaskPrivate *priv;
+ guint num_to_serve;
+
+ priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE (object);
+ num_to_serve = g_hash_table_size (priv->counts);
+
+ while (num_to_serve > 0) {
+ zmq_msg_t msg;
+ ZmqRequest *request;
+
+ zmq_msg_init_size (&msg, sizeof (ZmqRequest));
+ zmq_msg_recv (&msg, priv->socket, 0);
+
+ request = zmq_msg_data (&msg);
+
+ if (request->type == ZMQ_REQUEST_REGISTER)
+ g_debug ("zmq-pub: ignoring registration request because of shutdown");
+
+ if (request->type == ZMQ_REQUEST_DATA) {
+ zmq_msg_t reply_msg;
+ ZmqReply *reply;
+
+ zmq_msg_init_size (&reply_msg, sizeof (ZmqReply));
+ reply = zmq_msg_data (&reply_msg);
+ reply->type = ZMQ_REPLY_STOP;
+ reply->error = ZMQ_ERROR_OKAY;
+ g_assert (zmq_msg_send (&reply_msg, priv->socket, 0) >= 0);
+ zmq_msg_close (&reply_msg);
+ num_to_serve--;
+ }
+
+ zmq_msg_close (&msg);
+ }
+
+ zmq_close (priv->socket);
+ zmq_ctx_destroy (priv->context);
+
+ g_hash_table_destroy (priv->counts);
+
+ G_OBJECT_CLASS (ufo_zmq_pub_task_parent_class)->finalize (object);
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_zmq_pub_task_setup;
+ iface->get_num_inputs = ufo_zmq_pub_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_zmq_pub_task_get_num_dimensions;
+ iface->get_mode = ufo_zmq_pub_task_get_mode;
+ iface->get_requisition = ufo_zmq_pub_task_get_requisition;
+ iface->process = ufo_zmq_pub_task_process;
+}
+
+static void
+ufo_zmq_pub_task_class_init (UfoZmqPubTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_zmq_pub_task_set_property;
+ oclass->get_property = ufo_zmq_pub_task_get_property;
+ oclass->dispose = ufo_zmq_pub_task_dispose;
+ oclass->finalize = ufo_zmq_pub_task_finalize;
+
+ properties[PROP_EXPECTED_SUBSCRIBERS] =
+ g_param_spec_uint ("expected-subscribers",
+ "Number of expected subscribers",
+ "Number of expected subscribers",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoZmqPubTaskPrivate));
+}
+
+static void
+ufo_zmq_pub_task_init(UfoZmqPubTask *self)
+{
+ self->priv = UFO_ZMQ_PUB_TASK_GET_PRIVATE(self);
+ self->priv->context = NULL;
+ self->priv->socket = NULL;
+ self->priv->expected_subscribers = 0;
+ self->priv->counts = g_hash_table_new (g_direct_hash, g_direct_equal);
+ self->priv->builder = json_builder_new_immutable ();
+ self->priv->generator = json_generator_new ();
+}
diff --git a/src/ufo-zmq-pub-task.h b/src/ufo-zmq-pub-task.h
new file mode 100644
index 0000000..578e2f3
--- /dev/null
+++ b/src/ufo-zmq-pub-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ZMQ_PUB_TASK_H
+#define __UFO_ZMQ_PUB_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ZMQ_PUB_TASK (ufo_zmq_pub_task_get_type())
+#define UFO_ZMQ_PUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTask))
+#define UFO_IS_ZMQ_PUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_PUB_TASK))
+#define UFO_ZMQ_PUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskClass))
+#define UFO_IS_ZMQ_PUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_PUB_TASK))
+#define UFO_ZMQ_PUB_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_PUB_TASK, UfoZmqPubTaskClass))
+
+typedef struct _UfoZmqPubTask UfoZmqPubTask;
+typedef struct _UfoZmqPubTaskClass UfoZmqPubTaskClass;
+typedef struct _UfoZmqPubTaskPrivate UfoZmqPubTaskPrivate;
+
+struct _UfoZmqPubTask {
+ UfoTaskNode parent_instance;
+
+ UfoZmqPubTaskPrivate *priv;
+};
+
+struct _UfoZmqPubTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_zmq_pub_task_new (void);
+GType ufo_zmq_pub_task_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ufo-zmq-sub-task.c b/src/ufo-zmq-sub-task.c
new file mode 100644
index 0000000..1145272
--- /dev/null
+++ b/src/ufo-zmq-sub-task.c
@@ -0,0 +1,353 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include <string.h>
+#include <zmq.h>
+#include <json-glib/json-glib.h>
+#include "ufo-zmq-sub-task.h"
+#include "ufo-zmq-common.h"
+
+
+struct _UfoZmqSubTaskPrivate {
+ gint32 id;
+ gpointer context;
+ gpointer socket;
+ gchar *address;
+ gboolean stop;
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoZmqSubTask, ufo_zmq_sub_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ZMQ_SUB_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_ADDRESS,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_zmq_sub_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ZMQ_SUB_TASK, NULL));
+}
+
+static void
+ufo_zmq_sub_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ UfoZmqSubTaskPrivate *priv;
+ gchar *addr;
+ zmq_msg_t request_msg;
+ zmq_msg_t reply_msg;
+ ZmqRequest *request;
+ ZmqReply *reply;
+
+ priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task);
+ priv->context = zmq_ctx_new ();
+
+ if (priv->context == NULL) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq context creation failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ priv->socket = zmq_socket (priv->context, ZMQ_REQ);
+
+ if (priv->socket == NULL) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq sub_socket creation failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ addr = g_strdup_printf ("%s:5555", priv->address);
+
+ if (zmq_connect (priv->socket, addr) != 0) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq connect failed: %s\n", zmq_strerror (zmq_errno ()));
+ g_free (addr);
+ return;
+ }
+
+ g_free (addr);
+
+ zmq_msg_init_size (&request_msg, sizeof (ZmqRequest));
+
+ request = zmq_msg_data (&request_msg);
+ /* FIXME: use a better scheme than that */
+ request->id = priv->id;
+ request->type = ZMQ_REQUEST_REGISTER;
+
+ if (zmq_msg_send (&request_msg, priv->socket, 0) < 0) {
+ g_set_error (error, UFO_TASK_ERROR, UFO_TASK_ERROR_SETUP,
+ "zmq msg_send failed: %s\n", zmq_strerror (zmq_errno ()));
+ return;
+ }
+
+ zmq_msg_close (&request_msg);
+
+ zmq_msg_init_size (&reply_msg, sizeof (ZmqReply));
+ zmq_msg_recv (&reply_msg, priv->socket, 0);
+ reply = zmq_msg_data (&reply_msg);
+ g_assert (reply->type == ZMQ_REPLY_ACK && reply->error == ZMQ_ERROR_OKAY);
+ zmq_msg_close (&reply_msg);
+}
+
+static gboolean
+request_data (UfoZmqSubTaskPrivate *priv)
+{
+ zmq_msg_t request_msg;
+ zmq_msg_t reply_msg;
+ ZmqRequest *request;
+ ZmqReply *reply;
+
+ while (1) {
+ zmq_msg_init_size (&request_msg, sizeof (ZmqRequest));
+ request = zmq_msg_data (&request_msg);
+ request->id = priv->id;
+ request->type = ZMQ_REQUEST_DATA;
+ g_assert (zmq_msg_send (&request_msg, priv->socket, 0) > 0);
+ zmq_msg_close (&request_msg);
+
+ zmq_msg_init_size (&reply_msg, sizeof (ZmqReply));
+ zmq_msg_recv (&reply_msg, priv->socket, 0);
+ reply = zmq_msg_data (&reply_msg);
+
+ if (reply->error == ZMQ_ERROR_REGISTRATION_EXPECTED) {
+ /*
+ * We are supposed to wait until all subscribers have connected to
+ * the publisher.
+ */
+
+ zmq_msg_close (&reply_msg);
+ g_usleep (1000);
+ }
+ else if (reply->error != ZMQ_ERROR_OKAY) {
+ g_warning ("Could not receive data: %i\n", reply->error);
+ zmq_msg_close (&reply_msg);
+ return FALSE;
+ }
+ else
+ break;
+ }
+
+ if (reply->type == ZMQ_REPLY_STOP)
+ priv->stop = TRUE;
+
+ zmq_msg_close (&reply_msg);
+ return TRUE;
+}
+
+static void
+ufo_zmq_sub_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition)
+{
+ UfoZmqSubTaskPrivate *priv;
+ zmq_msg_t htype_msg;
+ gchar *header;
+ JsonParser *parser;
+ JsonObject *object;
+ JsonArray *array;
+ GError *error = NULL;
+
+ priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task);
+
+ if (!request_data (priv) || priv->stop)
+ return;
+
+ zmq_msg_init (&htype_msg);
+ zmq_msg_recv (&htype_msg, priv->socket, 0);
+ header = zmq_msg_data (&htype_msg);
+ parser = json_parser_new_immutable ();
+ json_parser_load_from_data (parser, header, zmq_msg_size (&htype_msg), &error);
+
+ if (error != NULL) {
+ g_error ("Error parsing JSON: %s", error->message);
+ g_error_free (error);
+ g_object_unref (parser);
+ return;
+ }
+
+ object = json_node_get_object (json_parser_get_root (parser));
+ array = json_object_get_array_member (object, "shape");
+ requisition->n_dims = json_array_get_length (array);
+
+ /* FIXME: we should get this from a public ufo-core header */
+ g_assert (requisition->n_dims <= ZMQ_MAX_DIMENSIONS);
+
+ for (guint i = 0; i < requisition->n_dims; i++) {
+ requisition->dims[requisition->n_dims - 1 - i] = json_array_get_int_element (array, i);
+ g_assert (requisition->dims[requisition->n_dims - 1 - i] <= ZMQ_MAX_DIMENSION_LENGTH);
+ }
+
+ zmq_msg_close (&htype_msg);
+ g_object_unref (parser);
+}
+
+static guint
+ufo_zmq_sub_task_get_num_inputs (UfoTask *task)
+{
+ return 0;
+}
+
+static guint
+ufo_zmq_sub_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 2;
+}
+
+static UfoTaskMode
+ufo_zmq_sub_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_zmq_sub_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ UfoZmqSubTaskPrivate *priv;
+ zmq_msg_t msg;
+ gsize size;
+
+ priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (task);
+
+ if (priv->stop)
+ return FALSE;
+
+ size = ufo_buffer_get_size (output);
+ zmq_msg_init_size (&msg, size);
+ zmq_msg_recv (&msg, priv->socket, 0);
+ g_assert (zmq_msg_size (&msg) == size);
+ memcpy (ufo_buffer_get_host_array (output, NULL), zmq_msg_data (&msg), size);
+ zmq_msg_close (&msg);
+
+ return TRUE;
+}
+
+static void
+ufo_zmq_sub_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoZmqSubTaskPrivate *priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_ADDRESS:
+ g_free (priv->address);
+ priv->address = g_value_dup_string (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_zmq_sub_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoZmqSubTaskPrivate *priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_ADDRESS:
+ g_value_set_string (value, priv->address);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_zmq_sub_task_finalize (GObject *object)
+{
+ UfoZmqSubTaskPrivate *priv;
+
+ priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE (object);
+ zmq_close (priv->socket);
+ zmq_ctx_destroy (priv->context);
+ g_free (priv->address);
+
+ G_OBJECT_CLASS (ufo_zmq_sub_task_parent_class)->finalize (object);
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_zmq_sub_task_setup;
+ iface->get_num_inputs = ufo_zmq_sub_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_zmq_sub_task_get_num_dimensions;
+ iface->get_mode = ufo_zmq_sub_task_get_mode;
+ iface->get_requisition = ufo_zmq_sub_task_get_requisition;
+ iface->generate = ufo_zmq_sub_task_generate;
+}
+
+static void
+ufo_zmq_sub_task_class_init (UfoZmqSubTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_zmq_sub_task_set_property;
+ oclass->get_property = ufo_zmq_sub_task_get_property;
+ oclass->finalize = ufo_zmq_sub_task_finalize;
+
+ properties[PROP_ADDRESS] =
+ g_param_spec_string ("address",
+ "ZMQ address to subscribe to",
+ "ZMQ address to subscribe to",
+ "tcp://127.0.0.1",
+ G_PARAM_READWRITE);
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoZmqSubTaskPrivate));
+}
+
+static void
+ufo_zmq_sub_task_init(UfoZmqSubTask *self)
+{
+ self->priv = UFO_ZMQ_SUB_TASK_GET_PRIVATE(self);
+ self->priv->context = NULL;
+ self->priv->socket = NULL;
+ self->priv->address = g_strdup ("tcp://127.0.0.1");
+ self->priv->id = (gint32) g_random_int ();
+ self->priv->stop = FALSE;
+}
diff --git a/src/ufo-zmq-sub-task.h b/src/ufo-zmq-sub-task.h
new file mode 100644
index 0000000..925e07b
--- /dev/null
+++ b/src/ufo-zmq-sub-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2017 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ZMQ_SUB_TASK_H
+#define __UFO_ZMQ_SUB_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ZMQ_SUB_TASK (ufo_zmq_sub_task_get_type())
+#define UFO_ZMQ_SUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTask))
+#define UFO_IS_ZMQ_SUB_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ZMQ_SUB_TASK))
+#define UFO_ZMQ_SUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskClass))
+#define UFO_IS_ZMQ_SUB_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ZMQ_SUB_TASK))
+#define UFO_ZMQ_SUB_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ZMQ_SUB_TASK, UfoZmqSubTaskClass))
+
+typedef struct _UfoZmqSubTask UfoZmqSubTask;
+typedef struct _UfoZmqSubTaskClass UfoZmqSubTaskClass;
+typedef struct _UfoZmqSubTaskPrivate UfoZmqSubTaskPrivate;
+
+struct _UfoZmqSubTask {
+ UfoTaskNode parent_instance;
+
+ UfoZmqSubTaskPrivate *priv;
+};
+
+struct _UfoZmqSubTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_zmq_sub_task_new (void);
+GType ufo_zmq_sub_task_get_type (void);
+
+G_END_DECLS
+
+#endif