rxrpc: Use IDR to allocate client conn IDs on a machine-wide basis
Use the IDR facility to allocate client connection IDs on a machine-wide
basis so that each client connection has a unique identifier. When the
connection ID space wraps, we advance the epoch by 1, thereby effectively
having a 62-bit ID space. The IDR facility is then used to look up client
connections during incoming packet routing instead of using an rbtree
rooted on the transport.
This change allows for the removal of the transport in the future and also
means that client connections can be looked up directly in the data-ready
handler by connection ID.
The ID management code is placed in a new file, conn-client.c, to which all
the client connection-specific code will eventually move.
Note that the IDR tree gets very expensive on memory if the connection IDs
are widely scattered throughout the number space, so we shall need to
retire connections that have, say, an ID more than four times the maximum
number of client conns away from the current allocation point to try and
keep the IDs concentrated. We will also need to retire connections from an
old epoch.
Also note that, for the moment, a pointer to the transport has to be passed
through into the ID allocation function so that we can take a BH lock to
prevent a locking issue against in-BH lookup of client connections. This
will go away later when RCU is used for server connections also.
Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index cab2f6d..312b750 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -207,81 +207,6 @@
}
/*
- * assign a connection ID to a connection and add it to the transport's
- * connection lookup tree
- * - called with transport client lock held
- */
-static void rxrpc_assign_connection_id(struct rxrpc_connection *conn)
-{
- struct rxrpc_connection *xconn;
- struct rb_node *parent, **p;
- __be32 epoch;
- u32 cid;
-
- _enter("");
-
- epoch = conn->proto.epoch;
-
- write_lock_bh(&conn->trans->conn_lock);
-
- conn->trans->conn_idcounter += RXRPC_CID_INC;
- if (conn->trans->conn_idcounter < RXRPC_CID_INC)
- conn->trans->conn_idcounter = RXRPC_CID_INC;
- cid = conn->trans->conn_idcounter;
-
-attempt_insertion:
- parent = NULL;
- p = &conn->trans->client_conns.rb_node;
-
- while (*p) {
- parent = *p;
- xconn = rb_entry(parent, struct rxrpc_connection, node);
-
- if (epoch < xconn->proto.epoch)
- p = &(*p)->rb_left;
- else if (epoch > xconn->proto.epoch)
- p = &(*p)->rb_right;
- else if (cid < xconn->proto.cid)
- p = &(*p)->rb_left;
- else if (cid > xconn->proto.cid)
- p = &(*p)->rb_right;
- else
- goto id_exists;
- }
-
- /* we've found a suitable hole - arrange for this connection to occupy
- * it */
- rb_link_node(&conn->node, parent, p);
- rb_insert_color(&conn->node, &conn->trans->client_conns);
-
- conn->proto.cid = cid;
- write_unlock_bh(&conn->trans->conn_lock);
- _leave(" [CID %x]", cid);
- return;
-
- /* we found a connection with the proposed ID - walk the tree from that
- * point looking for the next unused ID */
-id_exists:
- for (;;) {
- cid += RXRPC_CID_INC;
- if (cid < RXRPC_CID_INC) {
- cid = RXRPC_CID_INC;
- conn->trans->conn_idcounter = cid;
- goto attempt_insertion;
- }
-
- parent = rb_next(parent);
- if (!parent)
- goto attempt_insertion;
-
- xconn = rb_entry(parent, struct rxrpc_connection, node);
- if (epoch < xconn->proto.epoch ||
- cid < xconn->proto.cid)
- goto attempt_insertion;
- }
-}
-
-/*
* add a call to a connection's call-by-ID tree
*/
static void rxrpc_add_call_ID_to_conn(struct rxrpc_connection *conn,
@@ -315,6 +240,69 @@
}
/*
+ * Allocate a client connection.
+ */
+static struct rxrpc_connection *
+rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp,
+ struct rxrpc_transport *trans,
+ gfp_t gfp)
+{
+ struct rxrpc_connection *conn;
+ int ret;
+
+ _enter("");
+
+ conn = rxrpc_alloc_connection(gfp);
+ if (!conn) {
+ _leave(" = -ENOMEM");
+ return ERR_PTR(-ENOMEM);
+ }
+
+ conn->params = *cp;
+ conn->proto.local = cp->local;
+ conn->proto.epoch = rxrpc_epoch;
+ conn->proto.cid = 0;
+ conn->proto.in_clientflag = 0;
+ conn->proto.family = cp->peer->srx.transport.family;
+ conn->out_clientflag = RXRPC_CLIENT_INITIATED;
+ conn->state = RXRPC_CONN_CLIENT;
+
+ switch (conn->proto.family) {
+ case AF_INET:
+ conn->proto.addr_size = sizeof(conn->proto.ipv4_addr);
+ conn->proto.ipv4_addr = cp->peer->srx.transport.sin.sin_addr;
+ conn->proto.port = cp->peer->srx.transport.sin.sin_port;
+ break;
+ }
+
+ ret = rxrpc_get_client_connection_id(conn, trans, gfp);
+ if (ret < 0)
+ goto error_0;
+
+ ret = rxrpc_init_client_conn_security(conn);
+ if (ret < 0)
+ goto error_1;
+
+ conn->security->prime_packet_security(conn);
+
+ write_lock(&rxrpc_connection_lock);
+ list_add_tail(&conn->link, &rxrpc_connections);
+ write_unlock(&rxrpc_connection_lock);
+
+ key_get(conn->params.key);
+
+ _leave(" = %p", conn);
+ return conn;
+
+error_1:
+ rxrpc_put_client_connection_id(conn);
+error_0:
+ kfree(conn);
+ _leave(" = %d", ret);
+ return ERR_PTR(ret);
+}
+
+/*
* connect a call on an exclusive connection
*/
static int rxrpc_connect_exclusive(struct rxrpc_sock *rx,
@@ -324,55 +312,29 @@
gfp_t gfp)
{
struct rxrpc_connection *conn;
- int chan, ret;
+ int chan;
_enter("");
- conn = rxrpc_alloc_connection(gfp);
- if (!conn) {
- _leave(" = -ENOMEM");
- return -ENOMEM;
+ conn = rxrpc_alloc_client_connection(cp, trans, gfp);
+ if (IS_ERR(conn)) {
+ _leave(" = %ld", PTR_ERR(conn));
+ return PTR_ERR(conn);
}
- conn->trans = trans;
- conn->bundle = NULL;
- conn->params = *cp;
- conn->proto.local = cp->local;
- conn->proto.epoch = rxrpc_epoch;
- conn->proto.cid = 0;
- conn->proto.in_clientflag = 0;
- conn->proto.family = cp->peer->srx.transport.family;
- conn->out_clientflag = RXRPC_CLIENT_INITIATED;
- conn->state = RXRPC_CONN_CLIENT;
- conn->avail_calls = RXRPC_MAXCALLS - 1;
-
- key_get(conn->params.key);
-
- ret = rxrpc_init_client_conn_security(conn);
- if (ret < 0) {
- key_put(conn->params.key);
- kfree(conn);
- _leave(" = %d [key]", ret);
- return ret;
- }
-
- write_lock(&rxrpc_connection_lock);
- list_add_tail(&conn->link, &rxrpc_connections);
- write_unlock(&rxrpc_connection_lock);
-
- spin_lock(&trans->client_lock);
atomic_inc(&trans->usage);
+ conn->trans = trans;
+ conn->bundle = NULL;
_net("CONNECT EXCL new %d on TRANS %d",
conn->debug_id, conn->trans->debug_id);
- rxrpc_assign_connection_id(conn);
-
/* Since no one else can use the connection, we just use the first
* channel.
*/
chan = 0;
atomic_inc(&conn->usage);
+ conn->avail_calls = RXRPC_MAXCALLS - 1;
conn->channels[chan] = call;
conn->call_counter = 1;
call->conn = conn;
@@ -383,8 +345,6 @@
_net("CONNECT client on conn %d chan %d as call %x",
conn->debug_id, chan, call->call_id);
- spin_unlock(&trans->client_lock);
-
rxrpc_add_call_ID_to_conn(conn, call);
_leave(" = 0");
return 0;
@@ -402,7 +362,7 @@
gfp_t gfp)
{
struct rxrpc_connection *conn, *candidate;
- int chan, ret;
+ int chan;
DECLARE_WAITQUEUE(myself, current);
@@ -492,51 +452,25 @@
/* not yet present - create a candidate for a new connection and then
* redo the check */
- candidate = rxrpc_alloc_connection(gfp);
+ candidate = rxrpc_alloc_client_connection(cp, trans, gfp);
if (!candidate) {
_leave(" = -ENOMEM");
return -ENOMEM;
}
+ atomic_inc(&bundle->usage);
+ atomic_inc(&trans->usage);
candidate->trans = trans;
candidate->bundle = bundle;
- candidate->params = *cp;
- candidate->proto.local = cp->local;
- candidate->proto.epoch = rxrpc_epoch;
- candidate->proto.cid = 0;
- candidate->proto.in_clientflag = 0;
- candidate->proto.family = cp->peer->srx.transport.family;
- candidate->out_clientflag = RXRPC_CLIENT_INITIATED;
- candidate->state = RXRPC_CONN_CLIENT;
- candidate->avail_calls = RXRPC_MAXCALLS;
-
- key_get(candidate->params.key);
-
- ret = rxrpc_init_client_conn_security(candidate);
- if (ret < 0) {
- key_put(candidate->params.key);
- kfree(candidate);
- _leave(" = %d [key]", ret);
- return ret;
- }
-
- write_lock_bh(&rxrpc_connection_lock);
- list_add_tail(&candidate->link, &rxrpc_connections);
- write_unlock_bh(&rxrpc_connection_lock);
spin_lock(&trans->client_lock);
list_add(&candidate->bundle_link, &bundle->unused_conns);
bundle->num_conns++;
- atomic_inc(&bundle->usage);
- atomic_inc(&trans->usage);
_net("CONNECT new %d on TRANS %d",
candidate->debug_id, candidate->trans->debug_id);
- rxrpc_assign_connection_id(candidate);
- candidate->security->prime_packet_security(candidate);
-
/* leave the candidate lurking in zombie mode attached to the
* bundle until we're ready for it */
rxrpc_put_connection(candidate);
@@ -735,25 +669,27 @@
cid = sp->hdr.cid & RXRPC_CIDMASK;
epoch = sp->hdr.epoch;
- if (sp->hdr.flags & RXRPC_CLIENT_INITIATED)
+ if (sp->hdr.flags & RXRPC_CLIENT_INITIATED) {
p = trans->server_conns.rb_node;
- else
- p = trans->client_conns.rb_node;
+ while (p) {
+ conn = rb_entry(p, struct rxrpc_connection, node);
- while (p) {
- conn = rb_entry(p, struct rxrpc_connection, node);
+ _debug("maybe %x", conn->proto.cid);
- _debug("maybe %x", conn->proto.cid);
-
- if (epoch < conn->proto.epoch)
- p = p->rb_left;
- else if (epoch > conn->proto.epoch)
- p = p->rb_right;
- else if (cid < conn->proto.cid)
- p = p->rb_left;
- else if (cid > conn->proto.cid)
- p = p->rb_right;
- else
+ if (epoch < conn->proto.epoch)
+ p = p->rb_left;
+ else if (epoch > conn->proto.epoch)
+ p = p->rb_right;
+ else if (cid < conn->proto.cid)
+ p = p->rb_left;
+ else if (cid > conn->proto.cid)
+ p = p->rb_right;
+ else
+ goto found;
+ }
+ } else {
+ conn = idr_find(&rxrpc_client_conn_ids, cid >> RXRPC_CIDSHIFT);
+ if (conn && conn->proto.epoch == epoch)
goto found;
}
@@ -846,8 +782,7 @@
} else if (reap_time <= now) {
list_move_tail(&conn->link, &graveyard);
if (conn->out_clientflag)
- rb_erase(&conn->node,
- &conn->trans->client_conns);
+ rxrpc_put_client_connection_id(conn);
else
rb_erase(&conn->node,
&conn->trans->server_conns);