summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Dritschler <timo.dritschler@kit.edu>2014-12-03 19:29:03 +0100
committerTimo Dritschler <timo.dritschler@kit.edu>2014-12-03 19:39:08 +0100
commitcc2059982024af79136b9420eaec6fcfedabf3fb (patch)
tree344035ea910eda089c655f5382d19c6e7d46705a
parent8579e596df0bebee274dcadf766ae425bad9b1e8 (diff)
downloadkiro-cc2059982024af79136b9420eaec6fcfedabf3fb.tar.gz
kiro-cc2059982024af79136b9420eaec6fcfedabf3fb.tar.bz2
kiro-cc2059982024af79136b9420eaec6fcfedabf3fb.tar.xz
kiro-cc2059982024af79136b9420eaec6fcfedabf3fb.zip
KIRO Server now has a message event handler for receives
-rw-r--r--src/kiro-client.c7
-rw-r--r--src/kiro-rdma.h6
-rw-r--r--src/kiro-server.c138
3 files changed, 120 insertions, 31 deletions
diff --git a/src/kiro-client.c b/src/kiro-client.c
index 0bb95fa..bb2645c 100644
--- a/src/kiro-client.c
+++ b/src/kiro-client.c
@@ -154,7 +154,8 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
// Right now, we don't need 'source' and 'condition'
// Tell the compiler to ignore them by (void)-ing them
(void) source;
- (void) condition;
+ //(void) condition;
+ g_debug ("Message condidition: %i", condition);
KiroClientPrivate *priv = (KiroClientPrivate *)data;
struct ibv_wc wc;
@@ -297,8 +298,8 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)
priv->main_loop = g_main_loop_new (NULL, FALSE);
priv->conn_ec = g_io_channel_unix_new (priv->ec->fd);
priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd);
- g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
- g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_rdma_event, (gpointer)priv);
+ g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv);
+ g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv);
priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop);
// We gave control to the main_loop (with add_watch) and don't need our ref
diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h
index af502ec..361dabc 100644
--- a/src/kiro-rdma.h
+++ b/src/kiro-rdma.h
@@ -36,11 +36,15 @@ struct kiro_connection_context {
struct ibv_mr peer_mr; // RDMA Memory Region Information of the peer
+ void *container; // Make the connection aware of its container (if any)
+
enum {
KIRO_IDLE,
KIRO_MRI_REQUESTED, // Memory Region Information Requested
KIRO_RDMA_ESTABLISHED, // MRI Exchange complete. RDMA is ready
- KIRO_RDMA_ACTIVE // RDMA Operation is being performed
+ KIRO_RDMA_ACTIVE, // RDMA Operation is being performed
+ KIRO_PING, // PING Message
+ KIRO_PONG // PONG Message (PING reply)
} rdma_state;
};
diff --git a/src/kiro-server.c b/src/kiro-server.c
index bedba95..1694679 100644
--- a/src/kiro-server.c
+++ b/src/kiro-server.c
@@ -59,7 +59,7 @@ struct _KiroServerPrivate {
gboolean close_signal; // Flag used to signal event listening to stop for server shutdown
GMainLoop *main_loop; // Main loop of the server for event polling and handling
- GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel
+ GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel
GThread *main_thread; // Main KIRO server thread
};
@@ -67,6 +67,15 @@ struct _KiroServerPrivate {
G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT);
+struct kiro_client_connection {
+
+ guint id; // Client identification (Easy access)
+ GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client
+ guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again
+ struct rdma_cm_id *conn; // Connection Manager ID of the client
+};
+
+
KiroServer *
kiro_server_new (void)
{
@@ -213,6 +222,42 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
static gboolean
+process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ // Right now, we don't need 'source' and 'condition'
+ // Tell the compiler to ignore them by (void)-ing them
+ (void) source;
+ //(void) condition;
+ g_debug ("Message condition: %i", condition);
+
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ struct ibv_wc wc;
+
+ if (rdma_get_recv_comp (cc->conn, &wc) < 0) {
+ g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno));
+ return FALSE;
+ }
+
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context;
+ guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
+ g_debug ("Received a message from Client %u of type %u", cc->id, type);
+
+ //Post a generic receive in order to stay responsive to any messages from
+ //the client
+ if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
+ //TODO: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Posting generic receive for connection failed: %s", strerror (errno));
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (cc->conn);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+
+static gboolean
process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
{
// Right now, we don't need 'source' and 'condition'
@@ -245,29 +290,63 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
return TRUE;
}
- g_debug ("Got connection request from client");
+ do {
+ g_debug ("Got connection request from client");
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)g_try_malloc (sizeof (struct kiro_client_connection));
+ if (!cc) {
+ errno = ENOMEM;
+ rdma_reject (ev->id, NULL, 0);
+ goto fail;
+ }
+
+ if (connect_client (ev->id))
+ goto fail;
- if (0 == connect_client (ev->id)) {
// Post a welcoming "Receive" for handshaking
- if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) {
- // Connection set-up successfully! (Server)
- struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
- ctx->identifier = priv->next_client_id++;
- priv->clients = g_list_append (priv->clients, (gpointer)ev->id);
- g_debug ("Client connection assigned with ID %u", ctx->identifier);
- g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
- }
- }
- else
- g_warning ("Failed to accept client connection: %s", strerror (errno));
+ if (welcome_client (ev->id, priv->mem, priv->mem_size))
+ goto fail;
+
+ // Connection set-up successfully! (Server)
+ // ctx was created by 'welcome_client'
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
+ ctx->identifier = priv->next_client_id++;
+ ctx->container = cc; // Make the connection aware of its container
+
+ // Fill the client connection container. Also create a
+ // g_io_channel wrapper for the new clients receive queue event
+ // channel and add a main_loop watch to it.
+ cc->id = ctx->identifier;
+ cc->conn = ev->id;
+ cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd);
+ ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel
+ cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc);
+ g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more
+
+ priv->clients = g_list_append (priv->clients, (gpointer)cc);
+ g_debug ("Client connection assigned with ID %u", ctx->identifier);
+ g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
+ break;
+
+ fail:
+ g_warning ("Failed to accept client connection: %s", strerror (errno));
+
+ } while(0);
}
else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) {
- GList *client = g_list_find (priv->clients, (gconstpointer) ev->id);
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
+ if (!ctx->container) {
+ g_debug ("Got disconnect request from unknown client");
+ return FALSE;
+ }
+
+ GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container);
if (client) {
- struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
g_debug ("Got disconnect request from client ID %u", ctx->identifier);
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)ctx->container;
+ g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice.
priv->clients = g_list_delete_link (priv->clients, client);
+ g_free (cc);
}
else
g_debug ("Got disconnect request from unknown client");
@@ -281,6 +360,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
struct ibv_pd *pd = ev->id->pd;
kiro_destroy_connection (& (ev->id));
g_free (pd);
+
g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients));
}
@@ -324,7 +404,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void
int rtn = rdma_getaddrinfo (addr_c, port_c, &hints, &res_addrinfo);
g_free (addr_c);
g_free (port_c);
-
+
if (rtn) {
g_critical ("Failed to create address information: %s", strerror (errno));
return -1;
@@ -382,13 +462,13 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void
}
priv->main_loop = g_main_loop_new (NULL, FALSE);
- priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd);
- g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv);
+ priv->conn_ec = g_io_channel_unix_new (priv->ec->fd);
+ g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv);
priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop);
// We gave control to the main_loop (with add_watch) and don't need our ref
// any longer
- g_io_channel_unref (priv->g_io_ec);
+ g_io_channel_unref (priv->conn_ec);
g_message ("Enpoint listening");
@@ -400,19 +480,23 @@ static void
disconnect_client (gpointer data, gpointer user_data)
{
(void)user_data;
-
+
if (data) {
- struct rdma_cm_id *id = (struct rdma_cm_id *)data;
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ struct rdma_cm_id *id = cc->conn;
struct kiro_connection_context *ctx = (struct kiro_connection_context *) (id->context);
g_debug ("Disconnecting client: %u", ctx->identifier);
+ g_source_remove (cc->source_id);
+
// Note:
// The ProtectionDomain needs to be buffered and freed manually.
// Each connecting client is attached with its own pd, which we
// create manually. So we also need to clean it up manually.
// This needs to be done AFTER the connection is brought down, so we
// buffer the pointer to the pd and clean it up afterwards.
- struct ibv_pd *pd = id->pd;
- kiro_destroy_connection (&id);
+ struct ibv_pd *pd = cc->conn->pd;
+ kiro_destroy_connection (&(cc->conn));
+ g_free (cc);
g_free (pd);
}
}
@@ -432,7 +516,7 @@ kiro_server_stop (KiroServer *self)
//Shut down event listening
priv->close_signal = TRUE;
g_debug ("Event handling stopped");
-
+
g_list_foreach (priv->clients, disconnect_client, NULL);
g_list_free (priv->clients);
@@ -448,8 +532,8 @@ kiro_server_stop (KiroServer *self)
// We don't need the connection management IO channel container any more.
// Unref and thus free it.
- g_io_channel_unref (priv->g_io_ec);
- priv->g_io_ec = NULL;
+ g_io_channel_unref (priv->conn_ec);
+ priv->conn_ec = NULL;
priv->close_signal = FALSE;
// kiro_destroy_connection would try to call rdma_disconnect on the given