diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index b401fa9d7963..b697654340a8 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -292,7 +292,14 @@ struct rxrpc_connection { struct rxrpc_conn_parameters params; spinlock_t channel_lock; - struct rxrpc_call __rcu *channels[RXRPC_MAXCALLS]; /* active calls */ + + struct rxrpc_channel { + struct rxrpc_call __rcu *call; /* Active call */ + u32 call_id; /* ID of current call */ + u32 call_counter; /* Call ID counter */ + u32 last_call; /* ID of last call */ + u32 last_result; /* Result of last call (0/abort) */ + } channels[RXRPC_MAXCALLS]; wait_queue_head_t channel_wq; /* queue to wait for channel to become available */ struct rcu_head rcu; @@ -302,7 +309,6 @@ struct rxrpc_connection { struct rb_node service_node; /* Node in peer->service_conns */ }; struct list_head link; /* link in master connection list */ - struct rb_root calls; /* calls on this connection */ struct sk_buff_head rx_queue; /* received conn-level packets */ const struct rxrpc_security *security; /* applied security module */ struct key *server_key; /* security for this service */ @@ -311,7 +317,6 @@ struct rxrpc_connection { unsigned long flags; unsigned long events; unsigned long put_time; /* Time at which last put */ - rwlock_t lock; /* access lock */ spinlock_t state_lock; /* state-change lock */ atomic_t usage; enum rxrpc_conn_proto_state state : 8; /* current state of connection */ @@ -319,7 +324,6 @@ struct rxrpc_connection { u32 remote_abort; /* remote abort code */ int error; /* local error incurred */ int debug_id; /* debug ID for printks */ - unsigned int call_counter; /* call ID counter */ atomic_t serial; /* packet serial number counter */ atomic_t hi_serial; /* highest serial number received */ atomic_t avail_chans; /* number of channels available */ @@ -412,7 +416,6 @@ struct rxrpc_call { struct hlist_node error_link; /* link in error distribution list */ struct list_head accept_link; /* calls awaiting acceptance */ struct rb_node sock_node; /* node in socket call tree */ - struct rb_node conn_node; /* node in connection call tree */ struct sk_buff_head rx_queue; /* received packets */ struct sk_buff_head rx_oos_queue; /* packets received out of sequence */ struct sk_buff *tx_pending; /* Tx socket buffer being filled */ @@ -564,6 +567,7 @@ int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *, struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *, struct rxrpc_peer *, struct sk_buff *); +void __rxrpc_disconnect_call(struct rxrpc_call *); void rxrpc_disconnect_call(struct rxrpc_call *); void rxrpc_put_connection(struct rxrpc_connection *); void __exit rxrpc_destroy_all_connections(void); diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index 2c6c57c0d52c..3f278721269e 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -456,8 +456,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, { struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_call *call, *candidate; - struct rb_node **p, *parent; - u32 call_id; + u32 call_id, chan; _enter(",%d", conn->debug_id); @@ -467,21 +466,23 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, if (!candidate) return ERR_PTR(-EBUSY); + chan = sp->hdr.cid & RXRPC_CHANNELMASK; candidate->socket = rx; candidate->conn = conn; candidate->cid = sp->hdr.cid; candidate->call_id = sp->hdr.callNumber; - candidate->channel = sp->hdr.cid & RXRPC_CHANNELMASK; + candidate->channel = chan; candidate->rx_data_post = 0; candidate->state = RXRPC_CALL_SERVER_ACCEPTING; if (conn->security_ix > 0) candidate->state = RXRPC_CALL_SERVER_SECURING; - write_lock_bh(&conn->lock); + spin_lock(&conn->channel_lock); /* set the channel for this call */ - call = rcu_dereference_protected(conn->channels[candidate->channel], - lockdep_is_held(&conn->lock)); + call = rcu_dereference_protected(conn->channels[chan].call, + lockdep_is_held(&conn->channel_lock)); + _debug("channel[%u] is %p", candidate->channel, call); if (call && call->call_id == sp->hdr.callNumber) { /* already set; must've been a duplicate packet */ @@ -510,9 +511,9 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, call->debug_id, rxrpc_call_states[call->state]); if (call->state >= RXRPC_CALL_COMPLETE) { - conn->channels[call->channel] = NULL; + __rxrpc_disconnect_call(call); } else { - write_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); kmem_cache_free(rxrpc_call_jar, candidate); _leave(" = -EBUSY"); return ERR_PTR(-EBUSY); @@ -522,33 +523,22 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, /* check the call number isn't duplicate */ _debug("check dup"); call_id = sp->hdr.callNumber; - p = &conn->calls.rb_node; - parent = NULL; - while (*p) { - parent = *p; - call = rb_entry(parent, struct rxrpc_call, conn_node); - /* The tree is sorted in order of the __be32 value without - * turning it into host order. - */ - if (call_id < call->call_id) - p = &(*p)->rb_left; - else if (call_id > call->call_id) - p = &(*p)->rb_right; - else - goto old_call; - } + /* We just ignore calls prior to the current call ID. Terminated calls + * are handled via the connection. + */ + if (call_id <= conn->channels[chan].call_counter) + goto old_call; /* TODO: Just drop packet */ /* make the call available */ _debug("new call"); call = candidate; candidate = NULL; - rb_link_node(&call->conn_node, parent, p); - rb_insert_color(&call->conn_node, &conn->calls); - rcu_assign_pointer(conn->channels[call->channel], call); + conn->channels[chan].call_counter = call_id; + rcu_assign_pointer(conn->channels[chan].call, call); sock_hold(&rx->sk); rxrpc_get_connection(conn); - write_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); spin_lock(&conn->params.peer->lock); hlist_add_head(&call->error_link, &conn->params.peer->error_targets); @@ -588,19 +578,19 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, return call; extant_call: - write_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); kmem_cache_free(rxrpc_call_jar, candidate); _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1); return call; aborted_call: - write_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); kmem_cache_free(rxrpc_call_jar, candidate); _leave(" = -ECONNABORTED"); return ERR_PTR(-ECONNABORTED); old_call: - write_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); kmem_cache_free(rxrpc_call_jar, candidate); _leave(" = -ECONNRESET [old]"); return ERR_PTR(-ECONNRESET); @@ -648,8 +638,7 @@ void rxrpc_release_call(struct rxrpc_call *call) write_unlock_bh(&rx->call_lock); /* free up the channel for reuse */ - write_lock_bh(&conn->lock); - write_lock(&call->state_lock); + write_lock_bh(&call->state_lock); if (call->state < RXRPC_CALL_COMPLETE && call->state != RXRPC_CALL_CLIENT_FINAL_ACK) { @@ -657,10 +646,7 @@ void rxrpc_release_call(struct rxrpc_call *call) call->state = RXRPC_CALL_LOCALLY_ABORTED; call->local_abort = RX_CALL_DEAD; } - write_unlock(&call->state_lock); - - rb_erase(&call->conn_node, &conn->calls); - write_unlock_bh(&conn->lock); + write_unlock_bh(&call->state_lock); rxrpc_disconnect_call(call); diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c index f6ca8c5c4496..cee0f35bc1cf 100644 --- a/net/rxrpc/conn_event.c +++ b/net/rxrpc/conn_event.c @@ -31,15 +31,17 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state, u32 abort_code) { struct rxrpc_call *call; - struct rb_node *p; + int i; _enter("{%d},%x", conn->debug_id, abort_code); - read_lock_bh(&conn->lock); + spin_lock(&conn->channel_lock); - for (p = rb_first(&conn->calls); p; p = rb_next(p)) { - call = rb_entry(p, struct rxrpc_call, conn_node); - write_lock(&call->state_lock); + for (i = 0; i < RXRPC_MAXCALLS; i++) { + call = rcu_dereference_protected( + conn->channels[i].call, + lockdep_is_held(&conn->channel_lock)); + write_lock_bh(&call->state_lock); if (call->state <= RXRPC_CALL_COMPLETE) { call->state = state; if (state == RXRPC_CALL_LOCALLY_ABORTED) { @@ -51,10 +53,10 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state, } rxrpc_queue_call(call); } - write_unlock(&call->state_lock); + write_unlock_bh(&call->state_lock); } - read_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); _leave(""); } @@ -192,7 +194,7 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, if (ret < 0) return ret; - read_lock_bh(&conn->lock); + spin_lock(&conn->channel_lock); spin_lock(&conn->state_lock); if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING) { @@ -200,12 +202,12 @@ static int rxrpc_process_event(struct rxrpc_connection *conn, for (loop = 0; loop < RXRPC_MAXCALLS; loop++) rxrpc_call_is_secure( rcu_dereference_protected( - conn->channels[loop], - lockdep_is_held(&conn->lock))); + conn->channels[loop].call, + lockdep_is_held(&conn->channel_lock))); } spin_unlock(&conn->state_lock); - read_unlock_bh(&conn->lock); + spin_unlock(&conn->channel_lock); return 0; default: diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c index 0165a629388b..ce83f3e44da2 100644 --- a/net/rxrpc/conn_object.c +++ b/net/rxrpc/conn_object.c @@ -46,10 +46,8 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) init_waitqueue_head(&conn->channel_wq); INIT_WORK(&conn->processor, &rxrpc_process_connection); INIT_LIST_HEAD(&conn->link); - conn->calls = RB_ROOT; skb_queue_head_init(&conn->rx_queue); conn->security = &rxrpc_no_security; - rwlock_init(&conn->lock); spin_lock_init(&conn->state_lock); atomic_set(&conn->usage, 1); conn->debug_id = atomic_inc_return(&rxrpc_debug_id); @@ -62,39 +60,6 @@ static struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp) return conn; } -/* - * add a call to a connection's call-by-ID tree - */ -static void rxrpc_add_call_ID_to_conn(struct rxrpc_connection *conn, - struct rxrpc_call *call) -{ - struct rxrpc_call *xcall; - struct rb_node *parent, **p; - u32 call_id; - - write_lock_bh(&conn->lock); - - call_id = call->call_id; - p = &conn->calls.rb_node; - parent = NULL; - while (*p) { - parent = *p; - xcall = rb_entry(parent, struct rxrpc_call, conn_node); - - if (call_id < xcall->call_id) - p = &(*p)->rb_left; - else if (call_id > xcall->call_id) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&call->conn_node, parent, p); - rb_insert_color(&call->conn_node, &conn->calls); - - write_unlock_bh(&conn->lock); -} - /* * Allocate a client connection. The caller must take care to clear any * padding bytes in *cp. @@ -277,12 +242,12 @@ found_channel: call->channel = chan; call->epoch = conn->proto.epoch; call->cid = conn->proto.cid | chan; - call->call_id = ++conn->call_counter; - rcu_assign_pointer(conn->channels[chan], call); + call->call_id = ++conn->channels[chan].call_counter; + conn->channels[chan].call_id = call->call_id; + rcu_assign_pointer(conn->channels[chan].call, call); _net("CONNECT call %d on conn %d", call->debug_id, conn->debug_id); - rxrpc_add_call_ID_to_conn(conn, call); spin_unlock(&conn->channel_lock); rxrpc_put_peer(cp->peer); cp->peer = NULL; @@ -326,7 +291,7 @@ found_extant_conn: spin_lock(&conn->channel_lock); for (chan = 0; chan < RXRPC_MAXCALLS; chan++) - if (!conn->channels[chan]) + if (!conn->channels[chan].call) goto found_channel; BUG(); @@ -529,6 +494,35 @@ found: return conn; } +/* + * Disconnect a call and clear any channel it occupies when that call + * terminates. The caller must hold the channel_lock and must release the + * call's ref on the connection. + */ +void __rxrpc_disconnect_call(struct rxrpc_call *call) +{ + struct rxrpc_connection *conn = call->conn; + struct rxrpc_channel *chan = &conn->channels[call->channel]; + + _enter("%d,%d", conn->debug_id, call->channel); + + if (rcu_access_pointer(chan->call) == call) { + /* Save the result of the call so that we can repeat it if necessary + * through the channel, whilst disposing of the actual call record. + */ + chan->last_result = call->local_abort; + smp_wmb(); + chan->last_call = chan->call_id; + chan->call_id = chan->call_counter; + + rcu_assign_pointer(chan->call, NULL); + atomic_inc(&conn->avail_chans); + wake_up(&conn->channel_wq); + } + + _leave(""); +} + /* * Disconnect a call and clear any channel it occupies when that call * terminates. @@ -536,23 +530,13 @@ found: void rxrpc_disconnect_call(struct rxrpc_call *call) { struct rxrpc_connection *conn = call->conn; - unsigned chan = call->channel; - - _enter("%d,%d", conn->debug_id, call->channel); spin_lock(&conn->channel_lock); - - if (rcu_access_pointer(conn->channels[chan]) == call) { - rcu_assign_pointer(conn->channels[chan], NULL); - atomic_inc(&conn->avail_chans); - wake_up(&conn->channel_wq); - } - + __rxrpc_disconnect_call(call); spin_unlock(&conn->channel_lock); call->conn = NULL; rxrpc_put_connection(conn); - _leave(""); } /* @@ -591,7 +575,6 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu) _net("DESTROY CONN %d", conn->debug_id); - ASSERT(RB_EMPTY_ROOT(&conn->calls)); rxrpc_purge_queue(&conn->rx_queue); conn->security->clear(conn); diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c index 2a25ab425b6f..ced5f07444e5 100644 --- a/net/rxrpc/proc.c +++ b/net/rxrpc/proc.c @@ -137,7 +137,7 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v) if (v == &rxrpc_connections) { seq_puts(seq, "Proto Local Remote " - " SvID ConnID Calls End Use State Key " + " SvID ConnID End Use State Key " " Serial ISerial\n" ); return 0; @@ -154,13 +154,12 @@ static int rxrpc_connection_seq_show(struct seq_file *seq, void *v) ntohs(conn->params.peer->srx.transport.sin.sin_port)); seq_printf(seq, - "UDP %-22.22s %-22.22s %4x %08x %08x %s %3u" + "UDP %-22.22s %-22.22s %4x %08x %s %3u" " %s %08x %08x %08x\n", lbuff, rbuff, conn->params.service_id, conn->proto.cid, - conn->call_counter, rxrpc_conn_is_service(conn) ? "Svc" : "Clt", atomic_read(&conn->usage), rxrpc_conn_states[conn->state], diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c index 3acc7c1241d4..63afa9e9cc08 100644 --- a/net/rxrpc/rxkad.c +++ b/net/rxrpc/rxkad.c @@ -767,14 +767,10 @@ static int rxkad_respond_to_challenge(struct rxrpc_connection *conn, resp.kvno = htonl(token->kad->kvno); resp.ticket_len = htonl(token->kad->ticket_len); - resp.encrypted.call_id[0] = - htonl(conn->channels[0] ? conn->channels[0]->call_id : 0); - resp.encrypted.call_id[1] = - htonl(conn->channels[1] ? conn->channels[1]->call_id : 0); - resp.encrypted.call_id[2] = - htonl(conn->channels[2] ? conn->channels[2]->call_id : 0); - resp.encrypted.call_id[3] = - htonl(conn->channels[3] ? conn->channels[3]->call_id : 0); + resp.encrypted.call_id[0] = htonl(conn->channels[0].call_counter); + resp.encrypted.call_id[1] = htonl(conn->channels[1].call_counter); + resp.encrypted.call_id[2] = htonl(conn->channels[2].call_counter); + resp.encrypted.call_id[3] = htonl(conn->channels[3].call_counter); /* calculate the response checksum and then do the encryption */ rxkad_calc_response_checksum(&resp); @@ -991,7 +987,7 @@ static int rxkad_verify_response(struct rxrpc_connection *conn, void *ticket; u32 abort_code, version, kvno, ticket_len, level; __be32 csum; - int ret; + int ret, i; _enter("{%d,%x}", conn->debug_id, key_serial(conn->server_key)); @@ -1054,11 +1050,26 @@ static int rxkad_verify_response(struct rxrpc_connection *conn, if (response.encrypted.checksum != csum) goto protocol_error_free; - if (ntohl(response.encrypted.call_id[0]) > INT_MAX || - ntohl(response.encrypted.call_id[1]) > INT_MAX || - ntohl(response.encrypted.call_id[2]) > INT_MAX || - ntohl(response.encrypted.call_id[3]) > INT_MAX) - goto protocol_error_free; + spin_lock(&conn->channel_lock); + for (i = 0; i < RXRPC_MAXCALLS; i++) { + struct rxrpc_call *call; + u32 call_id = ntohl(response.encrypted.call_id[i]); + + if (call_id > INT_MAX) + goto protocol_error_unlock; + + if (call_id < conn->channels[i].call_counter) + goto protocol_error_unlock; + if (call_id > conn->channels[i].call_counter) { + call = rcu_dereference_protected( + conn->channels[i].call, + lockdep_is_held(&conn->channel_lock)); + if (call && call->state < RXRPC_CALL_COMPLETE) + goto protocol_error_unlock; + conn->channels[i].call_counter = call_id; + } + } + spin_unlock(&conn->channel_lock); abort_code = RXKADOUTOFSEQUENCE; if (ntohl(response.encrypted.inc_nonce) != conn->security_nonce + 1) @@ -1083,6 +1094,8 @@ static int rxkad_verify_response(struct rxrpc_connection *conn, _leave(" = 0"); return 0; +protocol_error_unlock: + spin_unlock(&conn->channel_lock); protocol_error_free: kfree(ticket); protocol_error: