diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 00bc0e620532..eae58a6b121c 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, rcu_read_unlock(); } +/* tipc_bearer_xmit() -send buffer to destination over bearer + */ +void tipc_bearer_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr *dst) +{ + struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_bearer *b; + struct sk_buff *skb, *tmp; + + if (skb_queue_empty(xmitq)) + return; + + rcu_read_lock(); + b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + if (likely(b)) { + skb_queue_walk_safe(xmitq, skb, tmp) { + __skb_dequeue(xmitq); + b->media->send_msg(net, skb, b, dst); + /* Until we remove cloning in tipc_l2_send_msg(): */ + kfree_skb(skb); + } + } + rcu_read_unlock(); +} + /** * tipc_l2_rcv_msg - handle incoming TIPC message from an interface * @buf: the received packet diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index dc714d977768..6426f242f626 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void); void tipc_bearer_stop(struct net *net); void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, struct tipc_media_addr *dest); +void tipc_bearer_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr *dst); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/link.c b/net/tipc/link.c index ea32679b6737..c052437a7cfa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list) /* This really cannot happen... */ if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); - tipc_link_reset(link); return -ENOBUFS; } /* Non-blocking sender: */ @@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link, return 0; } +/** + * tipc_link_xmit(): enqueue buffer list according to queue situation + * @link: link to use + * @list: chain of buffers containing message + * @xmitq: returned list of packets to be sent by caller + * + * Consumes the buffer chain, except when returning -ELINKCONG, + * since the caller then may want to make more send attempts. + * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS + * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted + */ +int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb_peek(list)); + unsigned int maxwin = l->window; + unsigned int i, imp = msg_importance(hdr); + unsigned int mtu = l->mtu; + u16 ack = l->rcv_nxt - 1; + u16 seqno = l->snd_nxt; + u16 bc_last_in = l->owner->bclink.last_in; + struct sk_buff_head *transmq = &l->transmq; + struct sk_buff_head *backlogq = &l->backlogq; + struct sk_buff *skb, *_skb, *bskb; + + /* Match msg importance against this and all higher backlog limits: */ + for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) { + if (unlikely(l->backlog[i].len >= l->backlog[i].limit)) + return link_schedule_user(l, list); + } + if (unlikely(msg_size(hdr) > mtu)) + return -EMSGSIZE; + + /* Prepare each packet for sending, and add to relevant queue: */ + while (skb_queue_len(list)) { + skb = skb_peek(list); + hdr = buf_msg(skb); + msg_set_seqno(hdr, seqno); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_last_in); + + if (likely(skb_queue_len(transmq) < maxwin)) { + _skb = skb_clone(skb, GFP_ATOMIC); + if (!_skb) + return -ENOBUFS; + __skb_dequeue(list); + __skb_queue_tail(transmq, skb); + __skb_queue_tail(xmitq, _skb); + l->rcv_unacked = 0; + seqno++; + continue; + } + if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { + kfree_skb(__skb_dequeue(list)); + l->stats.sent_bundled++; + continue; + } + if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { + kfree_skb(__skb_dequeue(list)); + __skb_queue_tail(backlogq, bskb); + l->backlog[msg_importance(buf_msg(bskb))].len++; + l->stats.sent_bundled++; + l->stats.sent_bundles++; + continue; + } + l->backlog[imp].len += skb_queue_len(list); + skb_queue_splice_tail_init(list, backlogq); + } + l->snd_nxt = seqno; + return 0; +} + static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) { skb_queue_head_init(list); @@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb) return __tipc_link_xmit(link->owner->net, link, &head); } -/* tipc_link_xmit_skb(): send single buffer to destination - * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE - * messages, which will not cause link congestion - * The only exception is datagram messages rerouted after secondary - * lookup, which are rare and safe to dispose of anyway. - * TODO: Return real return value, and let callers use - * tipc_wait_for_sendpkt() where applicable - */ -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, - u32 selector) -{ - struct sk_buff_head head; - int rc; - - skb2list(skb, &head); - rc = tipc_link_xmit(net, &head, dnode, selector); - if (rc) - kfree_skb(skb); - return 0; -} - -/** - * tipc_link_xmit() is the general link level function for message sending - * @net: the applicable net namespace - * @list: chain of buffers containing message - * @dsz: amount of user data to be sent - * @dnode: address of destination node - * @selector: a number used for deterministic link selection - * Consumes the buffer chain, except when returning error - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE - */ -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, - u32 selector) -{ - struct tipc_link *link = NULL; - struct tipc_node *node; - int rc = -EHOSTUNREACH; - - node = tipc_node_find(net, dnode); - if (node) { - tipc_node_lock(node); - link = node_active_link(node, selector & 1); - if (link) - rc = __tipc_link_xmit(net, link, list); - tipc_node_unlock(node); - tipc_node_put(node); - } - if (link) - return rc; - - if (likely(in_own_node(net, dnode))) { - tipc_sk_rcv(net, list); - return 0; - } - - __skb_queue_purge(list); - return rc; -} - /* * tipc_link_sync_xmit - synchronize broadcast link endpoints. * diff --git a/net/tipc/link.h b/net/tipc/link.h index 9c71d9e42e93..7add2b90361d 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr); void tipc_link_purge_backlog(struct tipc_link *l); void tipc_link_reset_all(struct tipc_node *node); void tipc_link_reset(struct tipc_link *l_ptr); -int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, - u32 selector); -int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest, - u32 selector); int __tipc_link_xmit(struct net *net, struct tipc_link *link, struct sk_buff_head *list); +int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, + struct sk_buff_head *xmitq); void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, u32 gap, u32 tolerance, u32 priority); void tipc_link_push_packets(struct tipc_link *l_ptr); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 3a1539e96294..e6018b7eb197 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb) if (!oskb) break; msg_set_destnode(buf_msg(oskb), dnode); - tipc_link_xmit_skb(net, oskb, dnode, dnode); + tipc_node_xmit_skb(net, oskb, dnode, dnode); } rcu_read_unlock(); @@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode) &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); rcu_read_unlock(); - tipc_link_xmit(net, &head, dnode, dnode); + tipc_node_xmit(net, &head, dnode, dnode); } static void tipc_publ_subscribe(struct net *net, struct publication *publ, diff --git a/net/tipc/node.c b/net/tipc/node.c index 19729645d494..ad759bb034e7 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -563,6 +563,84 @@ msg_full: return -EMSGSIZE; } +static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, + int *bearer_id, + struct tipc_media_addr **maddr) +{ + int id = n->active_links[sel & 1]; + + if (unlikely(id < 0)) + return NULL; + + *bearer_id = id; + *maddr = &n->links[id].maddr; + return n->links[id].link; +} + +/** + * tipc_node_xmit() is the general link level function for message sending + * @net: the applicable net namespace + * @list: chain of buffers containing message + * @dnode: address of destination node + * @selector: a number used for deterministic link selection + * Consumes the buffer chain, except when returning -ELINKCONG + * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE + */ +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, + u32 dnode, int selector) +{ + struct tipc_link *l = NULL; + struct tipc_node *n; + struct sk_buff_head xmitq; + struct tipc_media_addr *maddr; + int bearer_id; + int rc = -EHOSTUNREACH; + + __skb_queue_head_init(&xmitq); + n = tipc_node_find(net, dnode); + if (likely(n)) { + tipc_node_lock(n); + l = tipc_node_select_link(n, selector, &bearer_id, &maddr); + if (likely(l)) + rc = tipc_link_xmit(l, list, &xmitq); + if (unlikely(rc == -ENOBUFS)) + tipc_link_reset(l); + tipc_node_unlock(n); + tipc_node_put(n); + } + if (likely(!rc)) { + tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); + return 0; + } + if (likely(in_own_node(net, dnode))) { + tipc_sk_rcv(net, list); + return 0; + } + return rc; +} + +/* tipc_node_xmit_skb(): send single buffer to destination + * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE + * messages, which will not be rejected + * The only exception is datagram messages rerouted after secondary + * lookup, which are rare and safe to dispose of anyway. + * TODO: Return real return value, and let callers use + * tipc_wait_for_sendpkt() where applicable + */ +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, + u32 selector) +{ + struct sk_buff_head head; + int rc; + + skb_queue_head_init(&head); + __skb_queue_tail(&head, skb); + rc = tipc_node_xmit(net, &head, dnode, selector); + if (rc == -ELINKCONG) + kfree_skb(skb); + return 0; +} + int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) { int err; diff --git a/net/tipc/node.h b/net/tipc/node.h index 74f278adada3..86b7c740cf84 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n); int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, char *linkname, size_t len); void tipc_node_unlock(struct tipc_node *node); +int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, + int selector); +int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, + u32 selector); int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 87fef25f6519..5b0b08d58fcc 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk) while ((skb = __skb_dequeue(&sk->sk_receive_queue))) { if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0); + tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0); } } @@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock) } if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, TIPC_ERR_NO_PORT)) - tipc_link_xmit_skb(net, skb, dnode, 0); + tipc_node_xmit_skb(net, skb, dnode, 0); } } @@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock) tsk_own_node(tsk), tsk_peer_port(tsk), tsk->portid, TIPC_ERR_NO_PORT); if (skb) - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); tipc_node_remove_conn(net, dnode, tsk->portid); } @@ -925,7 +925,7 @@ new_mtu: do { skb = skb_peek(pktchain); TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; - rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid); + rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid); if (likely(!rc)) { if (sock->state != SS_READY) sock->state = SS_CONNECTING; @@ -1045,7 +1045,7 @@ next: return rc; do { if (likely(!tsk_conn_cong(tsk))) { - rc = tipc_link_xmit(net, pktchain, dnode, portid); + rc = tipc_node_xmit(net, pktchain, dnode, portid); if (likely(!rc)) { tsk->sent_unacked++; sent += send; @@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) return; msg = buf_msg(skb); msg_set_msgcnt(msg, ack); - tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg)); + tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); } static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) return 0; } if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err)) - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); return 0; } @@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) continue; xmit: - tipc_link_xmit_skb(net, skb, dnode, dport); + tipc_node_xmit_skb(net, skb, dnode, dport); } return err ? -EHOSTUNREACH : 0; } @@ -2092,7 +2092,7 @@ restart: } if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, TIPC_CONN_SHUTDOWN)) - tipc_link_xmit_skb(net, skb, dnode, + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); } else { dnode = tsk_peer_node(tsk); @@ -2102,7 +2102,7 @@ restart: 0, dnode, tsk_own_node(tsk), tsk_peer_port(tsk), tsk->portid, TIPC_CONN_SHUTDOWN); - tipc_link_xmit_skb(net, skb, dnode, tsk->portid); + tipc_node_xmit_skb(net, skb, dnode, tsk->portid); } tsk->connected = 0; sock->state = SS_DISCONNECTING; @@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data) } bh_unlock_sock(sk); if (skb) - tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); + tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); exit: sock_put(sk); }