tipc: change sk_buffer handling in tipc_link_xmit()
When the function tipc_link_xmit() is given a buffer list for
transmission, it currently consumes the list both when transmission
is successful and when it fails, except for the special case when
it encounters link congestion.
This behavior is inconsistent, and needs to be corrected if we want
to avoid problems in later commits in this series.
In this commit, we change this to let the function consume the list
only when transmission is successful, and leave the list with the
sender in all other cases. We also modifiy the socket code so that
it adapts to this change, i.e., purges the list when a non-congestion
error code is returned.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 59b2f2a..295bdc2 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -358,10 +358,9 @@
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
- if (unlikely(!skb)) {
- __skb_queue_purge(list);
+ if (unlikely(!skb))
return -EHOSTUNREACH;
- }
+
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index f8e0e2c..ea32679 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -340,7 +340,7 @@
* @link: congested link
* @list: message that was attempted sent
* Create pseudo msg to send back to user when congestion abates
- * Only consumes message if there is an error
+ * Does not consume buffer list
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
{
@@ -354,7 +354,7 @@
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
- goto err;
+ return -ENOBUFS;
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
@@ -364,15 +364,12 @@
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
- goto err;
+ return -ENOBUFS;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
return -ELINKCONG;
-err:
- __skb_queue_purge(list);
- return -ENOBUFS;
}
/**
@@ -641,8 +638,7 @@
* @link: link to use
* @list: chain of buffers containing message
*
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain, except when returning an error code,
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
@@ -666,10 +662,9 @@
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
- if (unlikely(msg_size(msg) > mtu)) {
- __skb_queue_purge(list);
+ if (unlikely(msg_size(msg) > mtu))
return -EMSGSIZE;
- }
+
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
@@ -722,7 +717,7 @@
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not be rejected
+ * messages, which will not cause link congestion
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
@@ -736,7 +731,7 @@
skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
- if (rc == -ELINKCONG)
+ if (rc)
kfree_skb(skb);
return 0;
}
@@ -748,7 +743,7 @@
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
+ * Consumes the buffer chain, except when returning error
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3a7567f..87fef25 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -686,21 +686,22 @@
do {
rc = tipc_bclink_xmit(net, pktchain);
- if (likely(rc >= 0)) {
- rc = dsz;
- break;
+ if (likely(!rc))
+ return dsz;
+
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
}
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tipc_sk(sk)->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
+ break;
+ } while (1);
return rc;
}
@@ -925,23 +926,24 @@
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
- if (likely(rc >= 0)) {
+ if (likely(!rc)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
- rc = dsz;
- break;
+ return dsz;
}
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
+ }
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tsk->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
+ break;
+ } while (1);
return rc;
}
@@ -1048,10 +1050,11 @@
tsk->sent_unacked++;
sent += send;
if (sent == dsz)
- break;
+ return dsz;
goto next;
}
if (rc == -EMSGSIZE) {
+ __skb_queue_purge(pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
@@ -1059,13 +1062,13 @@
}
if (rc != -ELINKCONG)
break;
+
tsk->link_cong = 1;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
} while (!rc);
+ __skb_queue_purge(pktchain);
return sent ? sent : rc;
}