kcm: Add receive message timeout

This patch adds receive timeout for message assembly on the attached TCP
sockets. The timeout is set when a new messages is started and the whole
message has not been received by TCP (not in the receive queue). If the
completely message is subsequently received the timer is cancelled, if the
timer expires the RX side is aborted.

The timeout value is taken from the socket timeout (SO_RCVTIMEO) that is
set on a TCP socket (i.e. set by get sockopt before attaching a TCP socket
to KCM.

Signed-off-by: Tom Herbert <tom@herbertland.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/kcm/kcmproc.c b/net/kcm/kcmproc.c
index 7638b35..7380087 100644
--- a/net/kcm/kcmproc.c
+++ b/net/kcm/kcmproc.c
@@ -331,7 +331,7 @@
 		   mux_stats.rx_ready_drops);
 
 	seq_printf(seq,
-		   "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
+		   "%-8s %-10s %-16s %-10s %-16s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s %-10s\n",
 		   "Psock",
 		   "RX-Msgs",
 		   "RX-Bytes",
@@ -344,10 +344,11 @@
 		   "RX-NeedMor",
 		   "RX-BadLen",
 		   "RX-TooBig",
+		   "RX-Timeout",
 		   "TX-Aborts");
 
 	seq_printf(seq,
-		   "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u\n",
+		   "%-8s %-10llu %-16llu %-10llu %-16llu %-10llu %-10llu %-10u %-10u %-10u %-10u %-10u %-10u %-10u\n",
 		   "",
 		   psock_stats.rx_msgs,
 		   psock_stats.rx_bytes,
@@ -360,6 +361,7 @@
 		   psock_stats.rx_need_more_hdr,
 		   psock_stats.rx_bad_hdr_len,
 		   psock_stats.rx_msg_too_big,
+		   psock_stats.rx_msg_timeouts,
 		   psock_stats.tx_aborts);
 
 	return 0;
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index 8bc38d3..40662d73 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -55,6 +55,8 @@
 
 	/* Unrecoverable error in receive */
 
+	del_timer(&psock->rx_msg_timer);
+
 	if (psock->rx_stopped)
 		return;
 
@@ -351,6 +353,12 @@
 	spin_unlock_bh(&mux->rx_lock);
 }
 
+static void kcm_start_rx_timer(struct kcm_psock *psock)
+{
+	if (psock->sk->sk_rcvtimeo)
+		mod_timer(&psock->rx_msg_timer, psock->sk->sk_rcvtimeo);
+}
+
 /* Macro to invoke filter function. */
 #define KCM_RUN_FILTER(prog, ctx) \
 	(*prog->bpf_func)(ctx, prog->insnsi)
@@ -500,6 +508,10 @@
 
 			if (!len) {
 				/* Need more header to determine length */
+				if (!rxm->accum_len) {
+					/* Start RX timer for new message */
+					kcm_start_rx_timer(psock);
+				}
 				rxm->accum_len += cand_len;
 				eaten += cand_len;
 				KCM_STATS_INCR(psock->stats.rx_need_more_hdr);
@@ -540,6 +552,11 @@
 				 * but don't consume yet per tcp_read_sock.
 				 */
 
+				if (!rxm->accum_len) {
+					/* Start RX timer for new message */
+					kcm_start_rx_timer(psock);
+				}
+
 				psock->rx_need_bytes = rxm->full_len -
 						       rxm->accum_len;
 				rxm->accum_len += cand_len;
@@ -563,6 +580,7 @@
 		eaten += (cand_len - extra);
 
 		/* Hurray, we have a new message! */
+		del_timer(&psock->rx_msg_timer);
 		psock->rx_skb_head = NULL;
 		KCM_STATS_INCR(psock->stats.rx_msgs);
 
@@ -1656,6 +1674,15 @@
 	spin_unlock_bh(&mux->rx_lock);
 }
 
+static void kcm_rx_msg_timeout(unsigned long arg)
+{
+	struct kcm_psock *psock = (struct kcm_psock *)arg;
+
+	/* Message assembly timed out */
+	KCM_STATS_INCR(psock->stats.rx_msg_timeouts);
+	kcm_abort_rx_psock(psock, ETIMEDOUT, NULL);
+}
+
 static int kcm_attach(struct socket *sock, struct socket *csock,
 		      struct bpf_prog *prog)
 {
@@ -1685,6 +1712,10 @@
 	psock->mux = mux;
 	psock->sk = csk;
 	psock->bpf_prog = prog;
+
+	setup_timer(&psock->rx_msg_timer, kcm_rx_msg_timeout,
+		    (unsigned long)psock);
+
 	INIT_WORK(&psock->rx_work, psock_rx_work);
 	INIT_DELAYED_WORK(&psock->rx_delayed_work, psock_rx_delayed_work);
 
@@ -1796,6 +1827,7 @@
 
 	write_unlock_bh(&csk->sk_callback_lock);
 
+	del_timer_sync(&psock->rx_msg_timer);
 	cancel_work_sync(&psock->rx_work);
 	cancel_delayed_work_sync(&psock->rx_delayed_work);