RxRPC rewrite

-----BEGIN PGP SIGNATURE-----
 
 iQIVAwUAV+2Oy/Sw1s6N8H32AQK/Wg//TElmZcd+RPSgQSrgCzsXrve0K48MwdcH
 jUQzk/fhNbMFNYE697ifJkIcrF9GfIUzbuwe0zxEwVAQ2qCf9eqkaB8dw/89eo+q
 nKwF+ku5eDDB4QapcFVfxbTJZdu12+xJdbEmnyTkfH2PRlNFELUQB/QEfnS3mme+
 oLu+KK1sh8dzA9MXtMrsBxZ+rq3AmTyqabsj9Zs48fdSX10xY/JVQsWRV7jjS84C
 6YIgr8rDfQ3eTSqGduQgU0Hh/PVXCI2U8QO7VNRuEYva7KK9417/Pb5AdELnsoP6
 izMuphGWi+HLPHlKKlWkOIIel9sjtB55Z3VJD5+ezp3uKTEI2CPJ9hksJ3ewBSxC
 5vJjQgr16DWgNpDs3JTthXb9/2R5wFTIGSlo6ToEVO22S9KtIlDZRNAWYEtRYfQb
 u2MyZyudH9PVj1cxzVpjRxvQIQswxApnF8gwbSJgvlmIImcqmIARu8fAegWZekGP
 MK8y1eIptVX0lAEaKUnHVcni2ttlQmhU9VN1F8pUu0tTXyIA+po06zkUv1KaZhgb
 TsNhoK1CDvRaR7EYGbMvQkUHGdRMATtFIcop4P+o0ZEMKBM+LnNSDaFsG7N1K5fj
 RggpaY2ohbL36NBHfEsPJY5iZzQJs2kqAs0P4sQ7PVx9GOKGPc1KgbkVFyHIQ7qK
 xok9xL0Agjk=
 =iaa4
 -----END PGP SIGNATURE-----

Merge tag 'rxrpc-rewrite-20160929' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Fixes and adjustments

This set of patches contains some fixes and adjustments:

 (1) Connections for exclusive calls are being reused because the check to
     work out whether to set RXRPC_CONN_DONT_REUSE is checking where the
     parameters will be copied to (but haven't yet).

 (2) Make Tx loss-injection go through the normal return, so the state gets
     set correctly for what the code thinks it has done.

     Note lost Tx packets in the tx_data trace rather than the skb
     tracepoint.

 (3) Activate channels according to the current state from within the
     channel_lock to avoid someone changing it on us.

 (4) Reduce the local endpoint's services list to a single pointer as we
     don't allow service AF_RXRPC sockets to share UDP ports with other
     AF_RXRPC sockets, so there can't be more than one element in the list.

 (5) Request more ACKs in slow-start mode to help monitor the state driving
     the window configuration.

 (6) Display the serial number of the packet being ACK'd rather than the
     ACK packet's own serial number in the congestion trace as this can be
     related to other entries in the trace.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2016-09-30 01:46:36 -04:00
commit fa1403548d
13 changed files with 68 additions and 64 deletions

View file

@ -258,15 +258,16 @@ TRACE_EVENT(rxrpc_rx_ack,
TRACE_EVENT(rxrpc_tx_data,
TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
rxrpc_serial_t serial, u8 flags, bool lose),
rxrpc_serial_t serial, u8 flags, bool retrans, bool lose),
TP_ARGS(call, seq, serial, flags, lose),
TP_ARGS(call, seq, serial, flags, retrans, lose),
TP_STRUCT__entry(
__field(struct rxrpc_call *, call )
__field(rxrpc_seq_t, seq )
__field(rxrpc_serial_t, serial )
__field(u8, flags )
__field(bool, retrans )
__field(bool, lose )
),
@ -275,6 +276,7 @@ TRACE_EVENT(rxrpc_tx_data,
__entry->seq = seq;
__entry->serial = serial;
__entry->flags = flags;
__entry->retrans = retrans;
__entry->lose = lose;
),

View file

@ -136,7 +136,8 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)saddr;
struct sock *sk = sock->sk;
struct rxrpc_local *local;
struct rxrpc_sock *rx = rxrpc_sk(sk), *prx;
struct rxrpc_sock *rx = rxrpc_sk(sk);
u16 service_id = srx->srx_service;
int ret;
_enter("%p,%p,%d", rx, saddr, len);
@ -160,15 +161,12 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
goto error_unlock;
}
if (rx->srx.srx_service) {
if (service_id) {
write_lock(&local->services_lock);
hlist_for_each_entry(prx, &local->services, listen_link) {
if (prx->srx.srx_service == rx->srx.srx_service)
goto service_in_use;
}
if (rcu_access_pointer(local->service))
goto service_in_use;
rx->local = local;
hlist_add_head_rcu(&rx->listen_link, &local->services);
rcu_assign_pointer(local->service, rx);
write_unlock(&local->services_lock);
rx->sk.sk_state = RXRPC_SERVER_BOUND;
@ -599,7 +597,6 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
rx->family = protocol;
rx->calls = RB_ROOT;
INIT_HLIST_NODE(&rx->listen_link);
spin_lock_init(&rx->incoming_lock);
INIT_LIST_HEAD(&rx->sock_calls);
INIT_LIST_HEAD(&rx->to_be_accepted);
@ -681,11 +678,9 @@ static int rxrpc_release_sock(struct sock *sk)
sk->sk_state = RXRPC_CLOSE;
spin_unlock_bh(&sk->sk_receive_queue.lock);
ASSERTCMP(rx->listen_link.next, !=, LIST_POISON1);
if (!hlist_unhashed(&rx->listen_link)) {
if (rx->local && rx->local->service == rx) {
write_lock(&rx->local->services_lock);
hlist_del_rcu(&rx->listen_link);
rx->local->service = NULL;
write_unlock(&rx->local->services_lock);
}

View file

@ -93,7 +93,6 @@ struct rxrpc_sock {
rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */
rxrpc_discard_new_call_t discard_new_call; /* Func to discard a new call */
struct rxrpc_local *local; /* local endpoint */
struct hlist_node listen_link; /* link in the local endpoint's listen list */
struct rxrpc_backlog *backlog; /* Preallocation for services */
spinlock_t incoming_lock; /* Incoming call vs service shutdown lock */
struct list_head sock_calls; /* List of calls owned by this socket */
@ -216,7 +215,7 @@ struct rxrpc_local {
struct list_head link;
struct socket *socket; /* my UDP socket */
struct work_struct processor;
struct hlist_head services; /* services listening on this endpoint */
struct rxrpc_sock __rcu *service; /* Service(s) listening on this endpoint */
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
struct sk_buff_head reject_queue; /* packets awaiting rejection */
struct sk_buff_head event_queue; /* endpoint event packets awaiting processing */
@ -603,7 +602,6 @@ enum rxrpc_skb_trace {
rxrpc_skb_tx_cleaned,
rxrpc_skb_tx_freed,
rxrpc_skb_tx_got,
rxrpc_skb_tx_lost,
rxrpc_skb_tx_new,
rxrpc_skb_tx_rotated,
rxrpc_skb_tx_seen,
@ -1073,7 +1071,7 @@ extern const s8 rxrpc_ack_priority[];
* output.c
*/
int rxrpc_send_call_packet(struct rxrpc_call *, u8);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *);
int rxrpc_send_data_packet(struct rxrpc_call *, struct sk_buff *, bool);
void rxrpc_reject_packets(struct rxrpc_local *);
/*

View file

@ -331,14 +331,14 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
struct rxrpc_sock *rx;
struct rxrpc_call *call;
u16 service_id = sp->hdr.serviceId;
_enter("");
/* Get the socket providing the service */
hlist_for_each_entry_rcu_bh(rx, &local->services, listen_link) {
if (rx->srx.srx_service == sp->hdr.serviceId)
goto found_service;
}
rx = rcu_dereference(local->service);
if (service_id == rx->srx.srx_service)
goto found_service;
trace_rxrpc_abort("INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_INVALID_OPERATION, EOPNOTSUPP);

View file

@ -256,7 +256,7 @@ static void rxrpc_resend(struct rxrpc_call *call)
rxrpc_get_skb(skb, rxrpc_skb_tx_got);
spin_unlock_bh(&call->lock);
if (rxrpc_send_data_packet(call, skb) < 0) {
if (rxrpc_send_data_packet(call, skb, true) < 0) {
rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
return;
}

View file

@ -200,7 +200,7 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
}
atomic_set(&conn->usage, 1);
if (conn->params.exclusive)
if (cp->exclusive)
__set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
conn->params = *cp;
@ -575,29 +575,43 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
wake_up(&call->waitq);
}
/*
* Assign channels and callNumbers to waiting calls with channel_lock
* held by caller.
*/
static void rxrpc_activate_channels_locked(struct rxrpc_connection *conn)
{
u8 avail, mask;
switch (conn->cache_state) {
case RXRPC_CONN_CLIENT_ACTIVE:
mask = RXRPC_ACTIVE_CHANS_MASK;
break;
default:
return;
}
while (!list_empty(&conn->waiting_calls) &&
(avail = ~conn->active_chans,
avail &= mask,
avail != 0))
rxrpc_activate_one_channel(conn, __ffs(avail));
}
/*
* Assign channels and callNumbers to waiting calls.
*/
static void rxrpc_activate_channels(struct rxrpc_connection *conn)
{
unsigned char mask;
_enter("%d", conn->debug_id);
trace_rxrpc_client(conn, -1, rxrpc_client_activate_chans);
if (conn->cache_state != RXRPC_CONN_CLIENT_ACTIVE ||
conn->active_chans == RXRPC_ACTIVE_CHANS_MASK)
if (conn->active_chans == RXRPC_ACTIVE_CHANS_MASK)
return;
spin_lock(&conn->channel_lock);
while (!list_empty(&conn->waiting_calls) &&
(mask = ~conn->active_chans,
mask &= RXRPC_ACTIVE_CHANS_MASK,
mask != 0))
rxrpc_activate_one_channel(conn, __ffs(mask));
rxrpc_activate_channels_locked(conn);
spin_unlock(&conn->channel_lock);
_leave("");
}

View file

@ -41,10 +41,10 @@ static void rxrpc_proto_abort(const char *why,
*/
static void rxrpc_congestion_management(struct rxrpc_call *call,
struct sk_buff *skb,
struct rxrpc_ack_summary *summary)
struct rxrpc_ack_summary *summary,
rxrpc_serial_t acked_serial)
{
enum rxrpc_congest_change change = rxrpc_cong_no_change;
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int cumulative_acks = call->cong_cumul_acks;
unsigned int cwnd = call->cong_cwnd;
bool resend = false;
@ -172,7 +172,7 @@ out_no_clear_ca:
cwnd = RXRPC_RXTX_BUFF_SIZE - 1;
call->cong_cwnd = cwnd;
call->cong_cumul_acks = cumulative_acks;
trace_rxrpc_congest(call, summary, sp->hdr.serial, change);
trace_rxrpc_congest(call, summary, acked_serial, change);
if (resend && !test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
rxrpc_queue_call(call);
return;
@ -848,7 +848,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
false, true,
rxrpc_propose_ack_ping_for_lost_reply);
return rxrpc_congestion_management(call, skb, &summary);
return rxrpc_congestion_management(call, skb, &summary, acked_serial);
}
/*

View file

@ -86,7 +86,6 @@ static struct rxrpc_local *rxrpc_alloc_local(const struct sockaddr_rxrpc *srx)
atomic_set(&local->usage, 1);
INIT_LIST_HEAD(&local->link);
INIT_WORK(&local->processor, rxrpc_local_processor);
INIT_HLIST_HEAD(&local->services);
init_rwsem(&local->defrag_sem);
skb_queue_head_init(&local->reject_queue);
skb_queue_head_init(&local->event_queue);
@ -292,7 +291,7 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
mutex_unlock(&rxrpc_local_mutex);
ASSERT(RB_EMPTY_ROOT(&local->client_conns));
ASSERT(hlist_empty(&local->services));
ASSERT(!local->service);
if (socket) {
local->socket = NULL;

View file

@ -108,7 +108,6 @@ const char rxrpc_skb_traces[rxrpc_skb__nr_trace][7] = {
[rxrpc_skb_tx_cleaned] = "Tx CLN",
[rxrpc_skb_tx_freed] = "Tx FRE",
[rxrpc_skb_tx_got] = "Tx GOT",
[rxrpc_skb_tx_lost] = "Tx *L*",
[rxrpc_skb_tx_new] = "Tx NEW",
[rxrpc_skb_tx_rotated] = "Tx ROT",
[rxrpc_skb_tx_seen] = "Tx SEE",

View file

@ -238,7 +238,8 @@ out:
/*
* send a packet through the transport endpoint
*/
int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb,
bool retrans)
{
struct rxrpc_connection *conn = call->conn;
struct rxrpc_wire_header whdr;
@ -247,6 +248,7 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
struct kvec iov[2];
rxrpc_serial_t serial;
size_t len;
bool lost = false;
int ret, opt;
_enter(",{%d}", skb->len);
@ -281,7 +283,8 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
/* If our RTT cache needs working on, request an ACK. Also request
* ACKs if a DATA packet appears to have been lost.
*/
if (call->cong_mode == RXRPC_CALL_FAST_RETRANSMIT ||
if (retrans ||
call->cong_mode == RXRPC_CALL_SLOW_START ||
(call->peer->rtt_usage < 3 && sp->hdr.seq & 1) ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
ktime_get_real()))
@ -290,11 +293,9 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
static int lose;
if ((lose++ & 7) == 7) {
trace_rxrpc_tx_data(call, sp->hdr.seq, serial,
whdr.flags, true);
rxrpc_lose_skb(skb, rxrpc_skb_tx_lost);
_leave(" = 0 [lose]");
return 0;
ret = 0;
lost = true;
goto done;
}
}
@ -319,7 +320,8 @@ int rxrpc_send_data_packet(struct rxrpc_call *call, struct sk_buff *skb)
goto send_fragmentable;
done:
trace_rxrpc_tx_data(call, sp->hdr.seq, serial, whdr.flags, false);
trace_rxrpc_tx_data(call, sp->hdr.seq, serial, whdr.flags,
retrans, lost);
if (ret >= 0) {
ktime_t now = ktime_get_real();
skb->tstamp = now;

View file

@ -131,10 +131,10 @@ int rxrpc_init_server_conn_security(struct rxrpc_connection *conn)
/* find the service */
read_lock(&local->services_lock);
hlist_for_each_entry(rx, &local->services, listen_link) {
if (rx->srx.srx_service == conn->params.service_id)
goto found_service;
}
rx = rcu_dereference_protected(local->service,
lockdep_is_held(&local->services_lock));
if (rx && rx->srx.srx_service == conn->params.service_id)
goto found_service;
/* the service appears to have died */
read_unlock(&local->services_lock);

View file

@ -144,7 +144,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
if (seq == 1 && rxrpc_is_client_call(call))
rxrpc_expose_client_call(call);
ret = rxrpc_send_data_packet(call, skb);
ret = rxrpc_send_data_packet(call, skb, false);
if (ret < 0) {
_debug("need instant resend %d", ret);
rxrpc_instant_resend(call, ix);

View file

@ -77,14 +77,9 @@ void rxrpc_lose_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
if (skb) {
int n;
CHECK_SLAB_OKAY(&skb->users);
if (op == rxrpc_skb_tx_lost) {
n = atomic_read(select_skb_count(op));
trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
} else {
n = atomic_dec_return(select_skb_count(op));
trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
kfree_skb(skb);
}
n = atomic_dec_return(select_skb_count(op));
trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
kfree_skb(skb);
}
}