summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Dritschler <timo.dritschler@kit.edu>2014-12-10 16:23:49 +0100
committerTimo Dritschler <timo.dritschler@kit.edu>2014-12-10 16:23:49 +0100
commit8d50558755b93f26108313e40e92a8ae457864ab (patch)
tree7190a92e2bc2f13e3b20fc85caea10f349cfee37
parent9f0b6da7cf20f085d2729e5433f85ffa60a6fd94 (diff)
parent0dc8c19937b52dfb793672226183697b6987b9fe (diff)
downloadkiro-8d50558755b93f26108313e40e92a8ae457864ab.tar.gz
kiro-8d50558755b93f26108313e40e92a8ae457864ab.tar.bz2
kiro-8d50558755b93f26108313e40e92a8ae457864ab.tar.xz
kiro-8d50558755b93f26108313e40e92a8ae457864ab.zip
Merge pull request #14 from ufo-kit/communicationHandling
Release Version 2 (0.2.0) Added kiro_client_ping_server to KIRO client
-rw-r--r--CMakeLists.txt4
-rw-r--r--src/kiro-client.c274
-rw-r--r--src/kiro-client.h13
-rw-r--r--src/kiro-rdma.h12
-rw-r--r--src/kiro-server.c163
-rw-r--r--test/test-client-latency.c32
6 files changed, 402 insertions, 96 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a2491de..31c74f9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -5,9 +5,9 @@ set(CMAKE_INCLUDE_CURRENT_DIR TRUE)
set(TARNAME "kiro")
set(LIBKIRO_VERSION_MAJOR "0")
-set(LIBKIRO_VERSION_MINOR "1")
+set(LIBKIRO_VERSION_MINOR "2")
set(LIBKIRO_VERSION_PATCH "0")
-set(LIBKIRO_VERSION_RELEASE "1")
+set(LIBKIRO_VERSION_RELEASE "2")
set(LIBKIRO_VERSION_STRING "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}.${LIBKIRO_VERSION_PATCH}")
set(VERSION "${LIBKIRO_VERSION_STRING}")
set(LIBKIRO_DESCRIPTION "Small InfiniBand communication Server and Client")
diff --git a/src/kiro-client.c b/src/kiro-client.c
index 714a003..6e140b5 100644
--- a/src/kiro-client.c
+++ b/src/kiro-client.c
@@ -56,7 +56,8 @@ struct _KiroClientPrivate {
gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down
GMainLoop *main_loop; // Main loop of the server for event polling and handling
- GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the communication event channel
GThread *main_thread; // Main KIRO client thread
};
@@ -64,6 +65,11 @@ struct _KiroClientPrivate {
G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT);
+// Temporary storage and lock for PING timing
+G_LOCK_DEFINE (ping_time);
+volatile struct timeval ping_time;
+
+
KiroClient *
kiro_client_new (void)
{
@@ -89,9 +95,10 @@ kiro_client_init (KiroClient *self)
{
KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self);
memset (priv, 0, sizeof (&priv));
-
//Hack to make the 'unused function' from the kiro-rdma include go away...
kiro_attach_qp (NULL);
+ ping_time.tv_sec = -1;
+ ping_time.tv_usec = -1;
}
@@ -147,6 +154,87 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
}
+
+static gboolean
+process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ // Right now, we don't need 'source' and 'condition'
+ // Tell the compiler to ignore them by (void)-ing them
+ (void) source;
+ //(void) condition;
+ g_debug ("Message condidition: %i", condition);
+
+ KiroClientPrivate *priv = (KiroClientPrivate *)data;
+ struct ibv_wc wc;
+
+ if (ibv_poll_cq (priv->conn->recv_cq, 1, &wc) < 0) {
+ g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno));
+ return FALSE;
+ }
+ void *cq_ctx;
+ struct ibv_cq *cq;
+ int err = ibv_get_cq_event (priv->conn->recv_cq_channel, &cq, &cq_ctx);
+ if (!err)
+ ibv_ack_cq_events (cq, 1);
+
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context;
+ guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
+ g_debug ("Received a message from the Server of type: %u", type);
+
+ if (type == KIRO_ACK_RDMA) {
+ g_debug ("Got RDMI Access information from Server");
+ ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri);
+ g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length);
+ ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
+
+ if (!ctx->rdma_mr) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
+ rdma_disconnect (priv->conn);
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ return FALSE;
+ }
+ }
+ if (type == KIRO_PONG) {
+ G_LOCK (ping_time);
+ struct timeval local_time;
+ gettimeofday (&local_time, NULL);
+
+ if (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {
+ g_debug ("Received PONG message from server");
+ ping_time.tv_sec = local_time.tv_sec;
+ ping_time.tv_usec = local_time.tv_usec;
+ }
+ else {
+ g_debug ("Received unexpected PONG message from server");
+ }
+
+ G_UNLOCK (ping_time);
+ }
+
+ //Post a generic receive in order to stay responsive to any messages from
+ //the server
+ if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Posting generic receive for connection failed: %s", strerror (errno));
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ return FALSE;
+ }
+
+ // make sure the next incoming work completion causes an event on the
+ // receive completion channel. We will poll() the channels file descriptor
+ // for this in the kiro client main loop.
+ ibv_req_notify_cq (priv->conn->recv_cq, 0);
+
+ g_debug ("Finished RDMA event handling");
+ return TRUE;
+}
+
+
gpointer
start_client_main_loop (gpointer data)
{
@@ -210,64 +298,52 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)
if (!ctx->cf_mr_recv || !ctx->cf_mr_send) {
g_critical ("Failed to register control message memory (Out of memory?)");
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
+ goto fail;
}
ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg);
priv->conn->context = ctx;
+ //Post an preemtive receive for the servers welcome message
if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno));
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
+ goto fail;
}
if (rdma_connect (priv->conn, NULL)) {
g_critical ("Failed to establish connection to the server: %s", strerror (errno));
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
+ goto fail;
}
- g_message ("Connection to server established");
- priv->ec = priv->conn->channel; //For easy access
- struct ibv_wc wc;
-
- if (rdma_get_recv_comp (priv->conn, &wc) < 0) {
- g_critical ("Failure waiting for POST from server: %s", strerror (errno));
- rdma_disconnect (priv->conn);
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
+ g_message ("Connection to server established. Waiting for response.");
+ ibv_req_notify_cq (priv->conn->recv_cq, 0); // Make the respective Queue push events onto the channel
+ if (!process_rdma_event (NULL, 0, (gpointer)priv)) {
+ g_critical ("No RDMA access information received from the server. Failed to connect.");
+ goto fail;
}
- g_debug ("Got RDMI Access information from Server");
- ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri);
- g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length);
- ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
-
- if (!ctx->rdma_mr) {
- g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
- rdma_disconnect (priv->conn);
- kiro_destroy_connection_context (&ctx);
- rdma_destroy_ep (priv->conn);
- return -1;
- }
+ g_message ("Connected to %s:%s", address, port);
+ priv->ec = priv->conn->channel; //For easy access
priv->main_loop = g_main_loop_new (NULL, FALSE);
- priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd);
- g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
+ priv->conn_ec = g_io_channel_unix_new (priv->ec->fd);
+ priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd);
+ g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv);
+ g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv);
priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop);
// We gave control to the main_loop (with add_watch) and don't need our ref
// any longer
- g_io_channel_unref (priv->g_io_ec);
+ g_io_channel_unref (priv->conn_ec);
+ g_io_channel_unref (priv->rdma_ec);
- g_message ("Connected to %s:%s", address, port);
return 0;
+
+fail:
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ priv->conn = NULL;
+ return -1;
}
@@ -309,11 +385,119 @@ kiro_client_sync (KiroClient *self)
}
fail:
- kiro_destroy_connection (&(priv->conn));
+ kiro_destroy_connection (&(priv->conn));
return -1;
}
+gboolean
+ping_timeout (gpointer data) {
+
+ //Not needed. Void it to prevent 'unused variable' warning
+ (void) data;
+
+ G_LOCK (ping_time);
+
+ // Maybe the server did answer while dispatching the timeout?
+ if (ping_time.tv_sec != 0 || ping_time.tv_usec != 0) {
+ goto done;
+ }
+
+ ping_time.tv_usec = -1;
+ ping_time.tv_sec = -1;
+
+
+done:
+ G_UNLOCK (ping_time);
+
+ // Return FALSE to automtically stop the timeout from reoccuring
+ return FALSE;
+}
+
+
+gint
+kiro_client_ping_server (KiroClient *self)
+{
+ // Will be returned. -1 for error.
+ gint t_usec = 0;
+
+ KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self);
+ if (!priv->conn) {
+ g_warning ("Client not connected");
+ return -1;
+ }
+
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context;
+
+ struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *)(ctx->cf_mr_send->mem);
+ msg->msg_type = KIRO_PING;
+
+ G_LOCK (ping_time);
+ ping_time.tv_sec = 0;
+ ping_time.tv_usec = 0;
+ struct timeval local_time;
+ gettimeofday (&local_time, NULL);
+
+ if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
+ g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno));
+ t_usec = -1;
+ goto end;
+ }
+ G_UNLOCK (ping_time);
+
+ struct ibv_wc wc;
+ if (rdma_get_send_comp (priv->conn, &wc) < 0) {
+ g_warning ("Failure during PING send: %s", strerror (errno));
+ t_usec = -1;
+ goto end;
+ }
+
+ // Set a two-second timeout for the ping
+ guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL);
+
+ //Wait for ping response
+ while (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {};
+
+
+ G_LOCK (ping_time);
+ // No response from the server. Timeout kicked in
+ // (Note: The timeout callback has already deregistered itself. We don't
+ // need to do that here again)
+ if (ping_time.tv_sec == -1 && ping_time.tv_usec == -1) {
+ g_message ("PING timed out.");
+ G_UNLOCK (ping_time);
+ t_usec = -1;
+ goto end;
+ }
+
+ // Remove the timeout
+ GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout);
+ if (timeout_source) {
+ g_source_destroy (timeout_source);
+ }
+
+ gint secs = ping_time.tv_sec - local_time.tv_sec;
+
+ // tv_usecs wraps back to 0 at 1000000us (1s).
+ // This might cause our calculation to produce negative numbers when time > 1s.
+ for (int i = 0; i < secs; i++) {
+ ping_time.tv_usec += 1000 * 1000;
+ }
+ t_usec = ping_time.tv_usec - local_time.tv_usec;
+ gint millis = (gint)(t_usec/1000.);
+ G_UNLOCK (ping_time);
+
+ g_debug ("Server responded to PING in: %is, %ims, %ius", secs, millis, t_usec);
+
+end:
+ G_LOCK (ping_time);
+ ping_time.tv_sec = -1;
+ ping_time.tv_usec = -1;
+ G_UNLOCK (ping_time);
+ return t_usec;
+}
+
+
void *
kiro_client_get_memory (KiroClient *self)
{
@@ -331,7 +515,7 @@ kiro_client_get_memory (KiroClient *self)
}
-size_t
+size_t
kiro_client_get_memory_size (KiroClient *self)
{
KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self);
@@ -375,19 +559,23 @@ kiro_client_disconnect (KiroClient *self)
// We don't need the connection management IO channel container any more.
// Unref and thus free it.
- g_io_channel_unref (priv->g_io_ec);
- priv->g_io_ec = NULL;
+ g_io_channel_unref (priv->conn_ec);
+ priv->conn_ec = NULL;
+
+ // The same goes for the cp channels
+ g_io_channel_unref (priv->rdma_ec);
+ priv->rdma_ec = NULL;
priv->close_signal = FALSE;
//kiro_destroy_connection does not free RDMA memory. Therefore, we need to
- //cache the memory pointer and free the memory afterwards manually
+ //cache the memory pointer and free the memory afterwards manually
struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context);
void *rdma_mem = ctx->rdma_mr->mem;
kiro_destroy_connection (&(priv->conn));
free (rdma_mem);
- // priv->ec is just an easy-access pointer. Don't free it. Just NULL it
+ // priv->ec is just an easy-access pointer. Don't free it. Just NULL it
priv->ec = NULL;
g_message ("Client disconnected from server");
}
diff --git a/src/kiro-client.h b/src/kiro-client.h
index 9c6036d..3be2621 100644
--- a/src/kiro-client.h
+++ b/src/kiro-client.h
@@ -160,6 +160,19 @@ void kiro_client_disconnect (KiroClient *client);
int kiro_client_sync (KiroClient *client);
/**
+ * kiro_client_ping_server - Sends a PING to the server
+ * @client: (transfer none): The #KiroServer to send the PING from
+ * Returns:
+ * A #guint telling the time (in microseconds) how long it took for the
+ * connected #KiroServer to reply
+ * Description:
+ * Sends a PING package to the connected #KiroServer and waits for a PONG
+ * package from that server. The time between sending the PING and receiving
+ * the PONG (in microseconds) is measured and returned by this function.
+ */
+gint kiro_client_ping_server (KiroClient *client);
+
+/**
* kiro_client_get_memory - Return a pointer to the current client memory
* @client: (transfer none): The #KiroClient to get the memory from
* Returns: (transfer none):
diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h
index af502ec..5b4895f 100644
--- a/src/kiro-rdma.h
+++ b/src/kiro-rdma.h
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <sys/time.h>
#ifndef __KIRO_RDMA_H__
#define __KIRO_RDMA_H__
@@ -36,6 +37,8 @@ struct kiro_connection_context {
struct ibv_mr peer_mr; // RDMA Memory Region Information of the peer
+ void *container; // Make the connection aware of its container (if any)
+
enum {
KIRO_IDLE,
KIRO_MRI_REQUESTED, // Memory Region Information Requested
@@ -51,11 +54,12 @@ struct kiro_ctrl_msg {
enum {
KIRO_REQ_RDMA, // Requesting RDMA Access to/from the peer
KIRO_ACK_RDMA, // acknowledge RDMA Request and provide Memory Region Information
- KIRO_REJ_RDMA // RDMA Request rejected :( (peer_mri will be invalid)
+ KIRO_REJ_RDMA, // RDMA Request rejected :( (peer_mri will be invalid)
+ KIRO_PING, // PING Message
+ KIRO_PONG // PONG Message (PING reply)
} msg_type;
struct ibv_mr peer_mri;
-
};
@@ -85,8 +89,8 @@ kiro_attach_qp (struct rdma_cm_id *id)
qp_attr.send_cq = id->send_cq;
qp_attr.recv_cq = id->recv_cq;
qp_attr.qp_type = IBV_QPT_RC;
- qp_attr.cap.max_send_wr = 1;
- qp_attr.cap.max_recv_wr = 1;
+ qp_attr.cap.max_send_wr = 10;
+ qp_attr.cap.max_recv_wr = 10;
qp_attr.cap.max_send_sge = 1;
qp_attr.cap.max_recv_sge = 1;
return rdma_create_qp (id, id->pd, &qp_attr);
diff --git a/src/kiro-server.c b/src/kiro-server.c
index bedba95..ff6c0f8 100644
--- a/src/kiro-server.c
+++ b/src/kiro-server.c
@@ -59,7 +59,7 @@ struct _KiroServerPrivate {
gboolean close_signal; // Flag used to signal event listening to stop for server shutdown
GMainLoop *main_loop; // Main loop of the server for event polling and handling
- GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel
GThread *main_thread; // Main KIRO server thread
};
@@ -67,6 +67,15 @@ struct _KiroServerPrivate {
G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT);
+struct kiro_client_connection {
+
+ guint id; // Client identification (Easy access)
+ GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client
+ guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again
+ struct rdma_cm_id *conn; // Connection Manager ID of the client
+};
+
+
KiroServer *
kiro_server_new (void)
{
@@ -213,6 +222,65 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
static gboolean
+process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ // Right now, we don't need 'source' and 'condition'
+ // Tell the compiler to ignore them by (void)-ing them
+ (void) source;
+ //(void) condition;
+ g_debug ("Message condition: %i", condition);
+
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ struct ibv_wc wc;
+
+ if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) {
+ g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno));
+ return FALSE;
+ }
+ void *cq_ctx;
+ struct ibv_cq *cq;
+ int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx);
+ if (!err)
+ ibv_ack_cq_events (cq, 1);
+
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context;
+ guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
+ g_debug ("Received a message from Client %u of type %u", cc->id, type);
+
+ if (type == KIRO_PING) {
+ struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
+ msg->msg_type = KIRO_PONG;
+
+ if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
+ g_warning ("Failure while trying to post PONG send: %s", strerror (errno));
+ goto done;
+ }
+
+ if (rdma_get_send_comp (cc->conn, &wc) < 0) {
+ g_warning ("An error occured while sending PONG: %s", strerror (errno));
+ }
+ }
+
+done:
+ //Post a generic receive in order to stay responsive to any messages from
+ //the client
+ if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Posting generic receive for event handling failed: %s", strerror (errno));
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (cc->conn);
+ return FALSE;
+ }
+
+ ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel
+
+ g_debug ("Finished RDMA event handling");
+ return TRUE;
+}
+
+
+static gboolean
process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
{
// Right now, we don't need 'source' and 'condition'
@@ -245,29 +313,65 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
return TRUE;
}
- g_debug ("Got connection request from client");
+ do {
+ g_debug ("Got connection request from client");
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)g_try_malloc (sizeof (struct kiro_client_connection));
+ if (!cc) {
+ errno = ENOMEM;
+ rdma_reject (ev->id, NULL, 0);
+ goto fail;
+ }
+
+ if (connect_client (ev->id))
+ goto fail;
- if (0 == connect_client (ev->id)) {
// Post a welcoming "Receive" for handshaking
- if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) {
- // Connection set-up successfully! (Server)
- struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
- ctx->identifier = priv->next_client_id++;
- priv->clients = g_list_append (priv->clients, (gpointer)ev->id);
- g_debug ("Client connection assigned with ID %u", ctx->identifier);
- g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
- }
- }
- else
- g_warning ("Failed to accept client connection: %s", strerror (errno));
+ if (welcome_client (ev->id, priv->mem, priv->mem_size))
+ goto fail;
+
+ ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel
+
+ // Connection set-up successfully! (Server)
+ // ctx was created by 'welcome_client'
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
+ ctx->identifier = priv->next_client_id++;
+ ctx->container = cc; // Make the connection aware of its container
+
+ // Fill the client connection container. Also create a
+ // g_io_channel wrapper for the new clients receive queue event
+ // channel and add a main_loop watch to it.
+ cc->id = ctx->identifier;
+ cc->conn = ev->id;
+ cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd);
+ cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc);
+ g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more
+
+ priv->clients = g_list_append (priv->clients, (gpointer)cc);
+ g_debug ("Client connection assigned with ID %u", ctx->identifier);
+ g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
+ break;
+
+ fail:
+ g_warning ("Failed to accept client connection: %s", strerror (errno));
+
+ } while(0);
}
else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) {
- GList *client = g_list_find (priv->clients, (gconstpointer) ev->id);
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
+ if (!ctx->container) {
+ g_debug ("Got disconnect request from unknown client");
+ return FALSE;
+ }
+
+ GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container);
if (client) {
- struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
g_debug ("Got disconnect request from client ID %u", ctx->identifier);
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)ctx->container;
+ g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice.
priv->clients = g_list_delete_link (priv->clients, client);
+ g_free (cc);
+ ctx->container = NULL;
}
else
g_debug ("Got disconnect request from unknown client");
@@ -281,6 +385,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
struct ibv_pd *pd = ev->id->pd;
kiro_destroy_connection (& (ev->id));
g_free (pd);
+
g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients));
}
@@ -324,7 +429,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void
int rtn = rdma_getaddrinfo (addr_c, port_c, &hints, &res_addrinfo);
g_free (addr_c);
g_free (port_c);
-
+
if (rtn) {
g_critical ("Failed to create address information: %s", strerror (errno));
return -1;
@@ -382,13 +487,13 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void
}
priv->main_loop = g_main_loop_new (NULL, FALSE);
- priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd);
- g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
+ priv->conn_ec = g_io_channel_unix_new (priv->ec->fd);
+ g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv);
priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop);
// We gave control to the main_loop (with add_watch) and don't need our ref
// any longer
- g_io_channel_unref (priv->g_io_ec);
+ g_io_channel_unref (priv->conn_ec);
g_message ("Enpoint listening");
@@ -400,19 +505,23 @@ static void
disconnect_client (gpointer data, gpointer user_data)
{
(void)user_data;
-
+
if (data) {
- struct rdma_cm_id *id = (struct rdma_cm_id *)data;
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ struct rdma_cm_id *id = cc->conn;
struct kiro_connection_context *ctx = (struct kiro_connection_context *) (id->context);
g_debug ("Disconnecting client: %u", ctx->identifier);
+ g_source_remove (cc->source_id);
+
// Note:
// The ProtectionDomain needs to be buffered and freed manually.
// Each connecting client is attached with its own pd, which we
// create manually. So we also need to clean it up manually.
// This needs to be done AFTER the connection is brought down, so we
// buffer the pointer to the pd and clean it up afterwards.
- struct ibv_pd *pd = id->pd;
- kiro_destroy_connection (&id);
+ struct ibv_pd *pd = cc->conn->pd;
+ kiro_destroy_connection (&(cc->conn));
+ g_free (cc);
g_free (pd);
}
}
@@ -432,7 +541,7 @@ kiro_server_stop (KiroServer *self)
//Shut down event listening
priv->close_signal = TRUE;
g_debug ("Event handling stopped");
-
+
g_list_foreach (priv->clients, disconnect_client, NULL);
g_list_free (priv->clients);
@@ -448,8 +557,8 @@ kiro_server_stop (KiroServer *self)
// We don't need the connection management IO channel container any more.
// Unref and thus free it.
- g_io_channel_unref (priv->g_io_ec);
- priv->g_io_ec = NULL;
+ g_io_channel_unref (priv->conn_ec);
+ priv->conn_ec = NULL;
priv->close_signal = FALSE;
// kiro_destroy_connection would try to call rdma_disconnect on the given
diff --git a/test/test-client-latency.c b/test/test-client-latency.c
index d05747d..208c37c 100644
--- a/test/test-client-latency.c
+++ b/test/test-client-latency.c
@@ -6,7 +6,7 @@
#include <assert.h>
-int
+int
main ( int argc, char *argv[] )
{
if (argc < 3) {
@@ -15,38 +15,30 @@ main ( int argc, char *argv[] )
}
KiroClient *client = kiro_client_new ();
- KiroTrb *trb = kiro_trb_new ();
if (-1 == kiro_client_connect (client, argv[1], argv[2])) {
kiro_client_free (client);
return -1;
}
- kiro_client_sync (client);
- kiro_trb_adopt (trb, kiro_client_get_memory (client));
+ int iterations = 10000;
- GTimer *timer = g_timer_new ();
-while (1) {
- g_timer_reset (timer);
+while (1) {
int i = 0;
- while(i < 50000) {
- kiro_client_sync (client);
+ float ping_us = 0;
+ int fail_count = 0;
+ while(i < iterations) {
+ float tmp = kiro_client_ping_server (client);
+ if (tmp < 0)
+ fail_count++;
+ else
+ ping_us += tmp;
i++;
}
- double elapsed = g_timer_elapsed (timer, NULL);
- printf ("Average Latency: %fus\n", (elapsed/50000.)*1000*1000);
+ printf ("Average Latency: %fus\n", ping_us/(float)(iterations - fail_count));
}
- g_timer_stop (timer);
kiro_client_free (client);
- kiro_trb_free (trb);
return 0;
}
-
-
-
-
-
-
-