net: Introduce a new proto_ops ->read_skb()

Currently both splice() and sockmap use ->read_sock() to
read skb from receive queue, but for sockmap we only read
one entire skb at a time, so ->read_sock() is too conservative
to use. Introduce a new proto_ops ->read_skb() which supports
this sematic, with this we can finally pass the ownership of
skb to recv actors.

For non-TCP protocols, all ->read_sock() can be simply
converted to ->read_skb().

Signed-off-by: Cong Wang <cong.wang@bytedance.com>
Signed-off-by: Daniel Borkmann <daniel@iogearbox.net>
Reviewed-by: John Fastabend <john.fastabend@gmail.com>
Link: https://lore.kernel.org/bpf/20220615162014.89193-3-xiyou.wangcong@gmail.com
diff --git a/net/core/skmsg.c b/net/core/skmsg.c
index 7e03f96..f7f63b7 100644
--- a/net/core/skmsg.c
+++ b/net/core/skmsg.c
@@ -1160,21 +1160,17 @@ static void sk_psock_done_strp(struct sk_psock *psock)
 }
 #endif /* CONFIG_BPF_STREAM_PARSER */
 
-static int sk_psock_verdict_recv(read_descriptor_t *desc, struct sk_buff *skb,
-				 unsigned int offset, size_t orig_len)
+static int sk_psock_verdict_recv(struct sock *sk, struct sk_buff *skb)
 {
-	struct sock *sk = (struct sock *)desc->arg.data;
 	struct sk_psock *psock;
 	struct bpf_prog *prog;
 	int ret = __SK_DROP;
-	int len = orig_len;
+	int len = skb->len;
 
 	/* clone here so sk_eat_skb() in tcp_read_sock does not drop our data */
 	skb = skb_clone(skb, GFP_ATOMIC);
-	if (!skb) {
-		desc->error = -ENOMEM;
+	if (!skb)
 		return 0;
-	}
 
 	rcu_read_lock();
 	psock = sk_psock(sk);
@@ -1204,16 +1200,10 @@ static int sk_psock_verdict_recv(read_descriptor_t *desc, struct sk_buff *skb,
 static void sk_psock_verdict_data_ready(struct sock *sk)
 {
 	struct socket *sock = sk->sk_socket;
-	read_descriptor_t desc;
 
-	if (unlikely(!sock || !sock->ops || !sock->ops->read_sock))
+	if (unlikely(!sock || !sock->ops || !sock->ops->read_skb))
 		return;
-
-	desc.arg.data = sk;
-	desc.error = 0;
-	desc.count = 1;
-
-	sock->ops->read_sock(sk, &desc, sk_psock_verdict_recv);
+	sock->ops->read_skb(sk, sk_psock_verdict_recv);
 }
 
 void sk_psock_start_verdict(struct sock *sk, struct sk_psock *psock)