summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimo Dritschler <timo.dritschler@kit.edu>2015-01-30 18:26:50 +0100
committerTimo Dritschler <timo.dritschler@kit.edu>2015-01-30 18:26:50 +0100
commit8f60d5536ef04f53969bb0a6e6ba5bea46f734de (patch)
treeae15468bd5346b4402d84cbaf0cb6657c0d17bd5
parentfb72ad6a4685274d37a73b048d50a7761c19f3c6 (diff)
parent5ad42a8bd4ec754b9c33f9c0b22dceb0e812c4a5 (diff)
downloadkiro-8f60d5536ef04f53969bb0a6e6ba5bea46f734de.tar.gz
kiro-8f60d5536ef04f53969bb0a6e6ba5bea46f734de.tar.bz2
kiro-8f60d5536ef04f53969bb0a6e6ba5bea46f734de.tar.xz
kiro-8f60d5536ef04f53969bb0a6e6ba5bea46f734de.zip
Merge pull request #17 from ufo-kit/serverRealloc
Added server-side reallocation request
-rw-r--r--CMakeLists.txt2
-rw-r--r--src/kiro-client.c75
-rw-r--r--src/kiro-rdma.h7
-rw-r--r--src/kiro-server.c299
-rw-r--r--src/kiro-server.h13
-rw-r--r--test/test-server.c2
6 files changed, 344 insertions, 54 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 99e2d09..1f44a18 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,7 +12,7 @@ set(LIBKIRO_VERSION_STRING "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}.${
set(VERSION "${LIBKIRO_VERSION_STRING}")
set(LIBKIRO_DESCRIPTION "Small InfiniBand communication Server and Client")
-set(LIBKIRO_ABI_VERSION "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}.${LIBKIRO_VERSION_PATCH}")
+set(LIBKIRO_ABI_VERSION "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}")
set(LIBKIRO_BUILD_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/common/cmake")
diff --git a/src/kiro-client.c b/src/kiro-client.c
index 6d0d3f5..3cf6e62 100644
--- a/src/kiro-client.c
+++ b/src/kiro-client.c
@@ -64,11 +64,32 @@ struct _KiroClientPrivate {
G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT);
-
// Temporary storage and lock for PING timing
G_LOCK_DEFINE (ping_time);
volatile struct timeval ping_time;
+G_LOCK_DEFINE (sync_lock);
+
+static inline gboolean
+send_msg (struct rdma_cm_id *id, struct kiro_rdma_mem *r)
+{
+ gboolean retval = TRUE;
+ G_LOCK (sync_lock);
+ if (rdma_post_send (id, id, r->mem, r->size, r->mr, IBV_SEND_SIGNALED)) {
+ retval = FALSE;
+ }
+ else {
+ struct ibv_wc wc;
+ if (rdma_get_send_comp (id, &wc) < 0) {
+ retval = FALSE;
+ }
+ g_debug ("WC Status: %i", wc.status);
+ }
+
+ G_UNLOCK (sync_lock);
+ return retval;
+}
+
KiroClient *
kiro_client_new (void)
@@ -134,7 +155,6 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
struct rdma_cm_event *active_event;
if (0 <= rdma_get_cm_event (priv->ec, &active_event)) {
- //Disable cancellation to prevent undefined states during shutdown
struct rdma_cm_event *ev = g_try_malloc (sizeof (*active_event));
if (!ev) {
@@ -190,13 +210,13 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
if (!ctx->rdma_mr) {
- //TODO: Connection teardown in an event handler routine? Not a good
+ //FIXME: Connection teardown in an event handler routine? Not a good
//idea...
g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
rdma_disconnect (priv->conn);
kiro_destroy_connection_context (&ctx);
rdma_destroy_ep (priv->conn);
- return FALSE;
+ return TRUE;
}
}
if (type == KIRO_PONG) {
@@ -215,11 +235,41 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
G_UNLOCK (ping_time);
}
+ if (type == KIRO_REALLOC) {
+ g_debug ("Got reallocation request from server.");
+ struct kiro_ctrl_msg *msg = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem);
+
+ G_LOCK (sync_lock);
+ g_debug ("Rallocating memory...");
+ kiro_destroy_rdma_memory (ctx->rdma_mr);
+ ctx->peer_mr = msg->peer_mri;
+ g_debug ("New size is: %zu", ctx->peer_mr.length);
+ ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE);
+ G_UNLOCK (sync_lock);
+
+ if (!ctx->rdma_mr) {
+ //FIXME: Connection teardown in an event handler routine? Not a good
+ //idea...
+ g_critical ("Failed to allocate memory for receive buffer (Out of memory?)");
+ rdma_disconnect (priv->conn);
+ kiro_destroy_connection_context (&ctx);
+ rdma_destroy_ep (priv->conn);
+ }
+
+ msg = ((struct kiro_ctrl_msg *)ctx->cf_mr_send->mem);
+ msg->msg_type = KIRO_ACK_RDMA;
+ if (!send_msg (priv->conn, ctx->cf_mr_send)) {
+ g_warning ("Failure while trying to post SEND for reallocation ACK: %s", strerror (errno));
+ }
+ else {
+ g_debug ("Sent ACK to server");
+ }
+ }
//Post a generic receive in order to stay responsive to any messages from
//the server
if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {
- //TODO: Connection teardown in an event handler routine? Not a good
+ //FIXME: Connection teardown in an event handler routine? Not a good
//idea...
g_critical ("Posting generic receive for connection failed: %s", strerror (errno));
kiro_destroy_connection_context (&ctx);
@@ -363,6 +413,7 @@ kiro_client_sync (KiroClient *self)
struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context;
+ G_LOCK (sync_lock);
if (rdma_post_read (priv->conn, priv->conn, ctx->rdma_mr->mem, ctx->peer_mr.length, ctx->rdma_mr->mr, 0, (uint64_t)ctx->peer_mr.addr, ctx->peer_mr.rkey)) {
g_critical ("Failed to RDMA_READ from server: %s", strerror (errno));
goto fail;
@@ -377,6 +428,7 @@ kiro_client_sync (KiroClient *self)
switch (wc.status) {
case IBV_WC_SUCCESS:
+ G_UNLOCK (sync_lock);
return 0;
case IBV_WC_RETRY_EXC_ERR:
g_critical ("Server no longer responding");
@@ -390,6 +442,7 @@ kiro_client_sync (KiroClient *self)
fail:
kiro_destroy_connection (&(priv->conn));
+ G_UNLOCK (sync_lock);
return -1;
}
@@ -399,6 +452,7 @@ ping_timeout (gpointer data) {
//Not needed. Void it to prevent 'unused variable' warning
(void) data;
+ g_debug ("PING timed out");
G_LOCK (ping_time);
@@ -444,20 +498,15 @@ kiro_client_ping_server (KiroClient *self)
struct timeval local_time;
gettimeofday (&local_time, NULL);
- if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
+ if (!send_msg (priv->conn, ctx->cf_mr_send)) {
g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno));
t_usec = -1;
+ G_UNLOCK (ping_time);
goto end;
}
+ g_debug ("PING message sent to server.");
G_UNLOCK (ping_time);
- struct ibv_wc wc;
- if (rdma_get_send_comp (priv->conn, &wc) < 0) {
- g_warning ("Failure during PING send: %s", strerror (errno));
- t_usec = -1;
- goto end;
- }
-
// Set a two-second timeout for the ping
guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL);
diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h
index 5b4895f..c17e044 100644
--- a/src/kiro-rdma.h
+++ b/src/kiro-rdma.h
@@ -56,7 +56,8 @@ struct kiro_ctrl_msg {
KIRO_ACK_RDMA, // acknowledge RDMA Request and provide Memory Region Information
KIRO_REJ_RDMA, // RDMA Request rejected :( (peer_mri will be invalid)
KIRO_PING, // PING Message
- KIRO_PONG // PONG Message (PING reply)
+ KIRO_PONG, // PONG Message (PING reply)
+ KIRO_REALLOC // Used by the server to notify the client about a new peer_mri
} msg_type;
struct ibv_mr peer_mri;
@@ -80,9 +81,9 @@ kiro_attach_qp (struct rdma_cm_id *id)
id->pd = ibv_alloc_pd (id->verbs);
id->send_cq_channel = ibv_create_comp_channel (id->verbs);
- id->recv_cq_channel = id->send_cq_channel; //we use one shared completion channel
+ id->recv_cq_channel = ibv_create_comp_channel (id->verbs);
id->send_cq = ibv_create_cq (id->verbs, 1, id, id->send_cq_channel, 0);
- id->recv_cq = id->send_cq; //we use one shared completion queue
+ id->recv_cq = ibv_create_cq (id->verbs, 1, id, id->recv_cq_channel, 0);
struct ibv_qp_init_attr qp_attr;
memset (&qp_attr, 0, sizeof (struct ibv_qp_init_attr));
qp_attr.qp_context = (void *) (uintptr_t) id;
diff --git a/src/kiro-server.c b/src/kiro-server.c
index fadc329..f8dd9d3 100644
--- a/src/kiro-server.c
+++ b/src/kiro-server.c
@@ -67,15 +67,50 @@ struct _KiroServerPrivate {
G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT);
+// List of clients that were asked to realloc their memory
+GList *realloc_list;
+
+// Temporary lock for connecting clients
+G_LOCK_DEFINE (connection_handling);
+G_LOCK_DEFINE (rdma_handling);
+
+// Used to prevent raceconditions during realloc timeout
+G_LOCK_DEFINE (realloc_timeout);
+
struct kiro_client_connection {
guint id; // Client identification (Easy access)
GIOChannel *rcv_ec; // GLib IO Channel encapsulation for receive completions for the client
guint source_id; // ID of the source created by g_io_add_watch, needed to remove it again
struct rdma_cm_id *conn; // Connection Manager ID of the client
+ struct kiro_rdma_mem *backup_mri; // Backup MRI for reallocation
};
+G_LOCK_DEFINE (send_lock);
+
+static inline gboolean
+send_msg (struct rdma_cm_id *id, struct kiro_rdma_mem *r)
+{
+ gboolean retval = TRUE;
+ G_LOCK (send_lock);
+ g_debug ("Sending message");
+ if (rdma_post_send (id, id, r->mem, r->size, r->mr, IBV_SEND_SIGNALED)) {
+ retval = FALSE;
+ }
+ else {
+ struct ibv_wc wc;
+ if (rdma_get_send_comp (id, &wc) < 0) {
+ retval = FALSE;
+ }
+ g_debug ("WC Status: %i", wc.status);
+ }
+
+ G_UNLOCK (send_lock);
+ return retval;
+}
+
+
KiroServer *
kiro_server_new (void)
{
@@ -177,7 +212,7 @@ error:
static int
-welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
+grant_client_access (struct rdma_cm_id *client, void *mem, size_t mem_size, guint type)
{
struct kiro_connection_context *ctx = (struct kiro_connection_context *) (client->context);
ctx->rdma_mr = (struct kiro_rdma_mem *)g_try_malloc0 (sizeof (struct kiro_rdma_mem));
@@ -199,24 +234,15 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
- msg->msg_type = KIRO_ACK_RDMA;
-
+ msg->msg_type = type;
msg->peer_mri = * (ctx->rdma_mr->mr);
- if (rdma_post_send (client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
+ if (!send_msg (client, ctx->cf_mr_send)) {
g_warning ("Failure while trying to post SEND: %s", strerror (errno));
kiro_destroy_rdma_memory (ctx->rdma_mr);
return -1;
}
- struct ibv_wc wc;
-
- if (rdma_get_send_comp (client, &wc) < 0) {
- g_warning ("Failed to post RDMA MRI to client: %s", strerror (errno));
- kiro_destroy_rdma_memory (ctx->rdma_mr);
- return -1;
- }
-
g_debug ("RDMA MRI sent to client");
return 0;
}
@@ -225,19 +251,30 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
static gboolean
process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
{
- // Right now, we don't need 'source' and 'condition'
- // Tell the compiler to ignore them by (void)-ing them
+ // Right now, we don't need 'source'
+ // Tell the compiler to ignore it by (void)-ing it
(void) source;
- //(void) condition;
- g_debug ("Message condition: %i", condition);
- struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ if (!G_TRYLOCK (rdma_handling)) {
+ g_debug ("RDMA handling will wait for the next dispatch.");
+ return TRUE;
+ }
+
+ g_debug ("Got message on condition: %i", condition);
+ void *payload = ((GList *)data)->data;
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)payload;
struct ibv_wc wc;
- if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) {
+ gint num_comp = ibv_poll_cq (cc->conn->recv_cq, 1, &wc);
+ if (!num_comp) {
+ g_critical ("RDMA event handling was triggered, but there is no completion on the queue");
+ goto end_rmda_eh;
+ }
+ if (num_comp < 0) {
g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno));
- return FALSE;
+ goto end_rmda_eh;
}
+ g_debug ("Got %i receive events from the queue", num_comp);
void *cq_ctx;
struct ibv_cq *cq;
int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx);
@@ -248,18 +285,38 @@ process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data)
guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type;
g_debug ("Received a message from Client %u of type %u", cc->id, type);
- if (type == KIRO_PING) {
- struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
- msg->msg_type = KIRO_PONG;
+ switch (type) {
+ case KIRO_PING:
+ {
+ struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem);
+ msg->msg_type = KIRO_PONG;
- if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) {
- g_warning ("Failure while trying to post PONG send: %s", strerror (errno));
- goto done;
+ if (!send_msg (cc->conn, ctx->cf_mr_send)) {
+ g_warning ("Failure while trying to post PONG send: %s", strerror (errno));
+ goto done;
+ }
+ break;
}
-
- if (rdma_get_send_comp (cc->conn, &wc) < 0) {
- g_warning ("An error occured while sending PONG: %s", strerror (errno));
+ case KIRO_ACK_RDMA:
+ {
+ g_debug ("ACK received");
+ if (G_TRYLOCK (realloc_timeout)) {
+ g_debug ("Client %i has ACKed the reallocation request", cc->id);
+ GList *client = g_list_find (realloc_list, (gpointer)cc);
+ if (client) {
+ realloc_list = g_list_remove_link (realloc_list, client);
+ if (cc->backup_mri->mr)
+ ibv_dereg_mr (cc->backup_mri->mr);
+ g_free (cc->backup_mri);
+ cc->backup_mri = NULL;
+ g_debug ("Client %i removed from realloc_list", cc->id);
+ }
+ G_UNLOCK (realloc_timeout);
+ }
+ break;
}
+ default:
+ g_debug ("Message Type is unknow. Ignoring...");
}
done:
@@ -271,12 +328,15 @@ done:
g_critical ("Posting generic receive for event handling failed: %s", strerror (errno));
kiro_destroy_connection_context (&ctx);
rdma_destroy_ep (cc->conn);
- return FALSE;
+ goto end_rmda_eh;
}
ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel
g_debug ("Finished RDMA event handling");
+
+end_rmda_eh:
+ G_UNLOCK (rdma_handling);
return TRUE;
}
@@ -289,17 +349,24 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
(void) source;
(void) condition;
+ g_debug ("CM event handler triggered");
+ if (!G_TRYLOCK (connection_handling)) {
+ // Unsafe to handle connection management right now.
+ // Wait for next dispatch.
+ g_debug ("Connection handling is busy. Waiting for next dispatch");
+ return TRUE;
+ }
+
KiroServerPrivate *priv = (KiroServerPrivate *)data;
struct rdma_cm_event *active_event;
if (0 <= rdma_get_cm_event (priv->ec, &active_event)) {
- //Disable cancellation to prevent undefined states during shutdown
struct rdma_cm_event *ev = g_try_malloc (sizeof (*active_event));
if (!ev) {
g_critical ("Unable to allocate memory for Event handling!");
rdma_ack_cm_event (active_event);
- return FALSE;
+ goto exit;
}
memcpy (ev, active_event, sizeof (*active_event));
@@ -311,7 +378,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
//Don't connect this client any more.
//Sorry mate!
rdma_reject (ev->id, NULL, 0);
- return TRUE;
+ goto exit;
}
do {
@@ -327,7 +394,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
goto fail;
// Post a welcoming "Receive" for handshaking
- if (welcome_client (ev->id, priv->mem, priv->mem_size))
+ if (grant_client_access (ev->id, priv->mem, priv->mem_size, KIRO_ACK_RDMA))
goto fail;
ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel
@@ -344,16 +411,24 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
cc->id = ctx->identifier;
cc->conn = ev->id;
cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd);
- cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc);
+ priv->clients = g_list_append (priv->clients, (gpointer)cc);
+ GList *client = g_list_find (priv->clients, (gpointer)cc);
+ if (!client->data || client->data != cc) {
+ g_critical ("Could not add client to list");
+ goto fail;
+ }
+
+ cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)client);
g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more
- priv->clients = g_list_append (priv->clients, (gpointer)cc);
g_debug ("Client connection assigned with ID %u", ctx->identifier);
g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients));
break;
fail:
g_warning ("Failed to accept client connection: %s", strerror (errno));
+ if (errno == EINVAL)
+ g_message ("This might happen if the client pulls back the connection request before the server can handle it.");
} while(0);
}
@@ -361,7 +436,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);
if (!ctx->container) {
g_debug ("Got disconnect request from unknown client");
- return FALSE;
+ goto exit;
}
GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container);
@@ -390,8 +465,12 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)
g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients));
}
+exit:
g_free (ev);
}
+
+ G_UNLOCK (connection_handling);
+ g_debug ("CM event handling done");
return TRUE;
}
@@ -503,7 +582,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void
}
-static void
+void
disconnect_client (gpointer data, gpointer user_data)
{
(void)user_data;
@@ -530,6 +609,152 @@ disconnect_client (gpointer data, gpointer user_data)
void
+request_client_realloc (gpointer data, gpointer user_data) {
+
+ if (!data || !user_data) {
+ g_critical ("Either client or server pointer was lost during client reconnect!");
+ // The client will remain in the old list, never receive the
+ // reallocation request, therefore never send an ACK and therefore will
+ // be forcefully disconnected once the timeout happens.
+ return;
+ }
+
+ realloc_list = g_list_append (realloc_list, data);
+
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)data;
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context;
+
+ // user_data is used to pass the information about the new RDMA memory to
+ // this function. It is encapsulated in a kiro_rdma_mem struct.
+ struct kiro_rdma_mem *new_rdma_mem = (struct kiro_rdma_mem *)user_data;
+
+ cc->backup_mri = ctx->rdma_mr;
+ ctx->rdma_mr = NULL;
+ g_debug ("Requesting REALLOC for client %i", cc->id);
+ if (grant_client_access (cc->conn, new_rdma_mem->mem, new_rdma_mem->size, KIRO_REALLOC)) {
+ ctx->rdma_mr = cc->backup_mri;
+ cc->backup_mri = NULL;
+ g_warning ("Failed to request REALLOC for client %i", cc->id);
+ G_UNLOCK (rdma_handling);
+ return;
+ }
+ g_debug ("Client %i REALLOC request sent.", cc->id);
+}
+
+
+volatile gboolean timeout_done = FALSE;
+
+gboolean
+client_realloc_timeout (gpointer data) {
+
+ (void) data;
+ g_debug ("TIMEOUT OCCURED");
+ timeout_done = TRUE;
+ return TRUE;
+}
+
+
+/*
+ * NOTE:
+ * Since all currently connected clients are guaranteed to be stored in the
+ * priv->clients list, using that one to detect failed ACKs makes much more
+ * sense, since clients that failed to be informed of reallocation would simply
+ * never send an ACK, stay on the list, and then securely be disconnected after
+ * the timeout.
+ *
+ * Therefore, we first try to send the REALLOC request to the clients, then try
+ * to copy them to the realloc_list. Once this is done, the realloc_list will only
+ * contain the list of clients that are guaranteed to have received the REALLOC
+ * request. We then swap the realloc_list and priv->clients list. The
+ * realloc_list then contains all of the clients that were previously
+ * connected, before we started to send out REALLOC requests. This makes it easy
+ * for us to ensure, that we don't 'forget' any client during timeout check.
+ *
+ * This is a trick to circumvent problems with clients that, for whatever
+ * reason, could not be copied to the 'new' list. If that happens, we would not
+ * even recognize that a client was not informed, because it never appears in
+ * the new list to begin with, and the client would therefore survive the
+ * timeout for reallocation. That clients peer_mri would then never be unpinned
+ * (unless the server stops or the client disconnects), causing MASSIVE memory
+ * leakage. Also, the client would continue to read stale data in the best case,
+ * or newly allocated garbage in the worst case.
+ **/
+
+void
+kiro_server_realloc (KiroServer *self, void *mem, size_t size) {
+
+ if (!self)
+ return;
+
+ g_debug ("Starting realloc");
+
+ struct kiro_rdma_mem rdma_mem;
+ rdma_mem.mem = mem;
+ rdma_mem.size = size;
+
+ KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE (self);
+
+
+ G_LOCK (connection_handling);
+ G_LOCK (rdma_handling);
+
+ priv->mem = mem;
+ priv->mem_size = size;
+ if (!priv->clients) {
+ g_debug ("No clients to reconnect. Done.");
+ G_UNLOCK (rdma_handling);
+ G_UNLOCK (connection_handling);
+ return;
+ }
+ g_list_foreach (priv->clients, request_client_realloc, &rdma_mem);
+
+ // Swap the two lists. See Note above.
+ GList *tmp = priv->clients;
+ priv->clients = realloc_list;
+ realloc_list = tmp;
+ G_UNLOCK (rdma_handling);
+
+ guint timeout = g_timeout_add_seconds (2, client_realloc_timeout, NULL);
+
+ timeout_done = FALSE;
+ while (!timeout_done) {
+ if (!realloc_list) {
+ g_debug ("All clients have ACKed");
+ break; // all clients ACKed
+ }
+ }
+
+ GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout);
+ if (timeout_source) {
+ g_source_destroy (timeout_source);
+ }
+
+ G_LOCK (realloc_timeout);
+ GList *current = g_list_first (realloc_list);
+ while (current) {
+ struct kiro_client_connection *cc = (struct kiro_client_connection *)current->data;
+ g_debug ("Client %i did not ACK the REALLOC request in time.", cc->id);
+ GList *client = g_list_find (priv->clients, current->data);
+ if (client) {
+ priv->clients = g_list_delete_link (priv->clients, client);
+ }
+ disconnect_client (current->data, NULL);
+ current = g_list_next (current);
+ }
+
+ if (realloc_list) {
+ g_list_free (realloc_list);
+ realloc_list = NULL;
+ }
+ G_UNLOCK (realloc_timeout);
+
+
+ g_debug ("Realloc procedure done!");
+ G_UNLOCK (connection_handling);
+}
+
+
+void
kiro_server_stop (KiroServer *self)
{
g_return_if_fail (self != NULL);
diff --git a/src/kiro-server.h b/src/kiro-server.h
index 7e42159..655140a 100644
--- a/src/kiro-server.h
+++ b/src/kiro-server.h
@@ -129,6 +129,19 @@ void kiro_server_free (KiroServer *server);
*/
int kiro_server_start (KiroServer *server, const char *bind_addr, const char *bind_port, void *mem, size_t mem_size);
+
+/**
+ * kiro_server_realloc - Change the memory that is provided by the server
+ * @server: #KiroServer to perform the operation on
+ * @mem: (transfer none): Pointer to the memory that is to be provided
+ * @mem_size: Size in bytes of the given memory
+ * Description:
+ * Changes the memory that is provided by the server. All connected clients
+ * will automatically be informed about this change.
+ */
+void kiro_server_realloc (KiroServer *self, void* mem, size_t mem_size);
+
+
/**
* kiro_server_stop - Stops the server
* @server: #KiroServer to perform the operation on
diff --git a/test/test-server.c b/test/test-server.c
index 91a3db5..87ff5d0 100644
--- a/test/test-server.c
+++ b/test/test-server.c
@@ -181,6 +181,8 @@ main (void)
buffer = kiro_trb_dma_push (rb);
print_current_frame (buffer, frame, 512, 512, rand);
frame++;
+ if (frame % 1000 == 0)
+ kiro_server_realloc (server, kiro_trb_get_raw_buffer (rb), kiro_trb_get_raw_size (rb));
}
done: