From b5ea511c0a706a27c0f84e2e5cd523260af64a51 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 26 Nov 2014 16:45:45 +0100 Subject: Changed kiro-server to use a Main Loop model for event handling --- src/kiro-server.c | 144 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/src/kiro-server.c b/src/kiro-server.c index 91c86ec..29dd960 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -54,11 +54,13 @@ struct _KiroServerPrivate { struct rdma_cm_id *base; // Base-Listening-Connection GList *clients; // List of connected clients guint next_client_id; // Numeric ID for the next client that will connect - pthread_t event_listener; // Pointer to the completion-listener thread of this connection int close_signal; // Integer flag used to signal to the listener-thread that the server is going to shut down void *mem; // Pointer to the server buffer size_t mem_size; // Server Buffer Size in bytes + GMainLoop *main_loop; // Main loop of the server for event polling and handling + GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GThread *main_thread; // Main KIRO server thread }; @@ -218,71 +220,80 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) } -static void * -event_loop (void *self) +gboolean +process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) { - KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE ((KiroServer *)self); - struct rdma_cm_event *active_event; + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + (void) condition; - while (0 == priv->close_signal) { - if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { - //Disable cancellation to prevent undefined states during shutdown - pthread_setcancelstate (PTHREAD_CANCEL_DISABLE, NULL); - struct rdma_cm_event *ev = malloc (sizeof (*active_event)); + KiroServerPrivate *priv = (KiroServerPrivate *)data; + struct rdma_cm_event *active_event; - if (!ev) { - g_critical ("Unable to allocate memory for Event handling!"); - rdma_ack_cm_event (active_event); - continue; - } + if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { + //Disable cancellation to prevent undefined states during shutdown + struct rdma_cm_event *ev = malloc (sizeof (*active_event)); - memcpy (ev, active_event, sizeof (*active_event)); + if (!ev) { + g_critical ("Unable to allocate memory for Event handling!"); rdma_ack_cm_event (active_event); + return FALSE; + } - if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) { - if (0 != priv->close_signal) { - //Main thread has signalled shutdown! - //Don't connect this client any more. - //Sorry mate! - rdma_reject (ev->id, NULL, 0); - } + memcpy (ev, active_event, sizeof (*active_event)); + rdma_ack_cm_event (active_event); - g_debug ("Got connection request from client"); - - if (0 == connect_client (ev->id)) { - // Post a welcoming "Recieve" for handshaking - if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) { - // Connection set-up successfully! (Server) - struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); - ctx->identifier = priv->next_client_id++; - priv->clients = g_list_append (priv->clients, (gpointer)ev->id); - g_debug ("Client connection assigned with ID %u", ctx->identifier); - g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); - } - } + if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) { + if (0 != priv->close_signal) { + //Main thread has signalled shutdown! + //Don't connect this client any more. + //Sorry mate! + rdma_reject (ev->id, NULL, 0); + return TRUE; } - else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { - GList *client = g_list_find (priv->clients, (gconstpointer) ev->id); - if (client) { + g_debug ("Got connection request from client"); + + if (0 == connect_client (ev->id)) { + // Post a welcoming "Receive" for handshaking + if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) { + // Connection set-up successfully! (Server) struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); - g_debug ("Got disconnect request from client ID %u", ctx->identifier); - priv->clients = g_list_delete_link (priv->clients, client); + ctx->identifier = priv->next_client_id++; + priv->clients = g_list_append (priv->clients, (gpointer)ev->id); + g_debug ("Client connection assigned with ID %u", ctx->identifier); + g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); } - else - g_debug ("Got disconnect request from unknown client"); + } + else + g_warning ("Failed to accept client connection: %s", strerror (errno)); + } + else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { + GList *client = g_list_find (priv->clients, (gconstpointer) ev->id); - kiro_destroy_connection (& (ev->id)); - g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients)); + if (client) { + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); + g_debug ("Got disconnect request from client ID %u", ctx->identifier); + priv->clients = g_list_delete_link (priv->clients, client); } + else + g_debug ("Got disconnect request from unknown client"); - free (ev); + kiro_destroy_connection (& (ev->id)); + g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients)); } - pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); + free (ev); } + return TRUE; +} + - g_debug ("Closing Event Listener Thread"); +gpointer +start_server_main_loop (gpointer data) +{ + g_main_loop_run ((GMainLoop *)data); return NULL; } @@ -368,9 +379,17 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void return -1; } - pthread_create (& (priv->event_listener), NULL, event_loop, self); + priv->main_loop = g_main_loop_new (NULL, FALSE); + priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); + g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop); + + // We gave control to the main_loop (with add_watch) and don't need our ref + // any longer + g_io_channel_unref (priv->g_io_ec); + + g_message ("Enpoint listening"); - sleep (1); return 0; } @@ -400,15 +419,30 @@ kiro_server_stop (KiroServer *self) if (!priv->base) return; - //Shut down the listener-thread + //Shut down event listening priv->close_signal = 1; - pthread_cancel (priv->event_listener); - pthread_join (priv->event_listener, NULL); - g_debug ("Event Listener Thread stopped"); - priv->close_signal = 0; + g_debug ("Event handling stopped"); g_list_foreach (priv->clients, disconnect_client, NULL); g_list_free (priv->clients); + + // Stop the main loop and clear its memory + g_main_loop_quit (priv->main_loop); + g_main_loop_unref (priv->main_loop); + priv->main_loop = NULL; + + // Ask the main thread to join (It probably already has, but we do it + // anyways. Just in case!) + g_thread_join (priv->main_thread); + priv->main_thread = NULL; + + // We don't need the connection management IO channel container any more. + // Unref and thus free it. + g_io_channel_unref (priv->g_io_ec); + priv->g_io_ec = NULL; + + priv->close_signal = 0; + rdma_destroy_ep (priv->base); priv->base = NULL; -- cgit v1.2.1 From cea4dee2c9d257d65a2a5f07ea6e85d43634aa68 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 26 Nov 2014 18:03:31 +0100 Subject: Added a Main Loop model to kiro-client Added kiro_client_disconnect function Added missing memory cleanup to kiro server and client upon shutdown --- src/kiro-client.c | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- src/kiro-client.h | 17 +++++++++ src/kiro-server.c | 18 +++++---- 3 files changed, 133 insertions(+), 10 deletions(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 8fa582b..8618c46 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -51,9 +51,13 @@ struct _KiroClientPrivate { /* 'Real' private structures */ /* (Not accessible by properties) */ - struct rdma_event_channel *ec; // Main Event Channel - struct rdma_cm_id *conn; // Connection to the Server + struct rdma_event_channel *ec; // Main Event Channel + struct rdma_cm_id *conn; // Connection to the Server + gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down + GMainLoop *main_loop; // Main loop of the server for event polling and handling + GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GThread *main_thread; // Main KIRO client thread }; @@ -94,7 +98,8 @@ kiro_client_init (KiroClient *self) static void kiro_client_finalize (GObject *object) { - //PASS + if (KIRO_IS_CLIENT (object)) + kiro_client_disconnect ((KiroClient *)object); G_OBJECT_CLASS (kiro_client_parent_class)->finalize (object); } @@ -108,6 +113,48 @@ kiro_client_class_init (KiroClientClass *klass) } +static gboolean +process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + (void) condition; + + KiroClientPrivate *priv = (KiroClientPrivate *)data; + struct rdma_cm_event *active_event; + + if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { + //Disable cancellation to prevent undefined states during shutdown + struct rdma_cm_event *ev = malloc (sizeof (*active_event)); + + if (!ev) { + g_critical ("Unable to allocate memory for Event handling!"); + rdma_ack_cm_event (active_event); + return FALSE; + } + + memcpy (ev, active_event, sizeof (*active_event)); + rdma_ack_cm_event (active_event); + + if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { + g_debug ("Connection closed by server"); + } + + free (ev); + } + return TRUE; +} + + +gpointer +start_client_main_loop (gpointer data) +{ + g_main_loop_run ((GMainLoop *)data); + return NULL; +} + + int kiro_client_connect (KiroClient *self, const char *address, const char *port) { @@ -196,6 +243,7 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) } g_message ("Connection to server established"); + priv->ec = priv->conn->channel; //For easy access struct ibv_wc wc; if (rdma_get_recv_comp (priv->conn, &wc) < 0) { @@ -219,6 +267,15 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) return -1; } + priv->main_loop = g_main_loop_new (NULL, FALSE); + priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); + g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); + + // We gave control to the main_loop (with add_watch) and don't need our ref + // any longer + g_io_channel_unref (priv->g_io_ec); + g_message ("Connected to %s:%s", address, port); return 0; } @@ -300,3 +357,48 @@ kiro_client_get_memory_size (KiroClient *self) return ctx->rdma_mr->size; } + +void +kiro_client_disconnect (KiroClient *self) +{ + if (!self) + return; + + KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); + + if (!priv->conn) + return; + + //Shut down event listening + priv->close_signal = TRUE; + g_debug ("Event handling stopped"); + + // Stop the main loop and clear its memory + g_main_loop_quit (priv->main_loop); + g_main_loop_unref (priv->main_loop); + priv->main_loop = NULL; + + // Ask the main thread to join (It probably already has, but we do it + // anyways. Just in case!) + g_thread_join (priv->main_thread); + priv->main_thread = NULL; + + // We don't need the connection management IO channel container any more. + // Unref and thus free it. + g_io_channel_unref (priv->g_io_ec); + priv->g_io_ec = NULL; + + priv->close_signal = FALSE; + + //kiro_destroy_connection does not free RDMA memory. Therefore, we need to + //cache the memory pointer and free the memory afterwards manually + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context); + void *rdma_mem = ctx->rdma_mr->mem; + kiro_destroy_connection (&(priv->conn)); + free (rdma_mem); + + // priv->ec is just an easy-access pointer. Don't free it. Just NULL it + priv->ec = NULL; + g_message ("Client disconnected from server"); +} + diff --git a/src/kiro-client.h b/src/kiro-client.h index 9e9d3ef..9c6036d 100644 --- a/src/kiro-client.h +++ b/src/kiro-client.h @@ -124,6 +124,23 @@ void kiro_client_free (KiroClient *client); */ int kiro_client_connect (KiroClient *client, const char *dest_addr, const char *dest_port); +/** + * kiro_client_disconnect - Diconnect a #KiroClient from the Server + * @client: (transfer none): The #KiroClient to disconnect + * Description: + * Disconnects the given #KiroClient from the KIRO server that it is connected + * to. If the @client is not connected, this function has no effect. + * Note: + * The memory content that has been transfered from the server is + * automatically freed when calling this function. If you want to continue + * using the memory after disconnecting the @client, make sure to memcpy() it + * first, using the informations obtained from kiro_client_get_memory() and + * kiro_client_get_memory_size(). + * See also: + * kiro_server_connect + */ +void kiro_client_disconnect (KiroClient *client); + /** * kiro_client_sync - Read data from the connected server * @client: (transfer none): The #KiroServer to use sync on diff --git a/src/kiro-server.c b/src/kiro-server.c index 29dd960..a236c30 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -54,10 +54,10 @@ struct _KiroServerPrivate { struct rdma_cm_id *base; // Base-Listening-Connection GList *clients; // List of connected clients guint next_client_id; // Numeric ID for the next client that will connect - int close_signal; // Integer flag used to signal to the listener-thread that the server is going to shut down void *mem; // Pointer to the server buffer size_t mem_size; // Server Buffer Size in bytes + gboolean close_signal; // Flag used to signal event listening to stop for server shutdown GMainLoop *main_loop; // Main loop of the server for event polling and handling GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel GThread *main_thread; // Main KIRO server thread @@ -220,7 +220,7 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size) } -gboolean +static gboolean process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) { // Right now, we don't need 'source' and 'condition' @@ -245,7 +245,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) rdma_ack_cm_event (active_event); if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) { - if (0 != priv->close_signal) { + if (TRUE == priv->close_signal) { //Main thread has signalled shutdown! //Don't connect this client any more. //Sorry mate! @@ -420,7 +420,7 @@ kiro_server_stop (KiroServer *self) return; //Shut down event listening - priv->close_signal = 1; + priv->close_signal = TRUE; g_debug ("Event handling stopped"); g_list_foreach (priv->clients, disconnect_client, NULL); @@ -440,12 +440,16 @@ kiro_server_stop (KiroServer *self) // Unref and thus free it. g_io_channel_unref (priv->g_io_ec); priv->g_io_ec = NULL; + priv->close_signal = FALSE; - priv->close_signal = 0; - - + // kiro_destroy_connection would try to call rdma_disconnect on the given + // connection. But the server never 'connects' to anywhere, so this would + // cause a crash. We need to destroy the enpoint manually without disconnect + struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->base->context); + kiro_destroy_connection_context (&ctx); rdma_destroy_ep (priv->base); priv->base = NULL; + rdma_destroy_event_channel (priv->ec); priv->ec = NULL; g_message ("Server stopped successfully"); -- cgit v1.2.1 From e86050c93ba25895e97d3e6a68e07654a02e5110 Mon Sep 17 00:00:00 2001 From: Timo Dritschler Date: Wed, 26 Nov 2014 18:51:28 +0100 Subject: Changed a missed occurance of malloc to g_try_malloc --- src/kiro-client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kiro-client.c b/src/kiro-client.c index 57dbea2..714a003 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -126,7 +126,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) if (0 <= rdma_get_cm_event (priv->ec, &active_event)) { //Disable cancellation to prevent undefined states during shutdown - struct rdma_cm_event *ev = malloc (sizeof (*active_event)); + struct rdma_cm_event *ev = g_try_malloc (sizeof (*active_event)); if (!ev) { g_critical ("Unable to allocate memory for Event handling!"); -- cgit v1.2.1