net/smc: use separate work queues for different worker types

There are 6 types of workers which exist per smc connection. 3 of them
are used for listen and handshake processing, another 2 are used for
close and abort processing and 1 is the tx worker that moves calls to
sleeping functions into a worker.
To prevent flooding of the system work queue when many connections are
opened or closed at the same time (some pattern uperf implements), move
those workers to one of 3 smc-specific work queues. Two work queues are
module-global and used for handshake and close workers. The third work
queue is defined per link group and used by the tx workers that may
sleep waiting for resources of this link group.
And in smc_llc_enqueue() queue the llc_event_work work to the system
prio work queue because its critical that this work is started fast.

Reviewed-by: Ursula Braun <ubraun@linux.ibm.com>
Signed-off-by: Karsten Graul <kgraul@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 9f3e148..f5becec 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -55,6 +55,9 @@ static DEFINE_MUTEX(smc_client_lgr_pending);	/* serialize link group
 						 * creation on client
 						 */
 
+struct workqueue_struct	*smc_hs_wq;	/* wq for handshake work */
+struct workqueue_struct	*smc_close_wq;	/* wq for close work */
+
 static void smc_tcp_listen_work(struct work_struct *);
 static void smc_connect_work(struct work_struct *);
 
@@ -905,7 +908,7 @@ static int smc_connect(struct socket *sock, struct sockaddr *addr,
 	if (smc->use_fallback)
 		goto out;
 	if (flags & O_NONBLOCK) {
-		if (schedule_work(&smc->connect_work))
+		if (queue_work(smc_hs_wq, &smc->connect_work))
 			smc->connect_nonblock = 1;
 		rc = -EINPROGRESS;
 	} else {
@@ -1412,7 +1415,7 @@ static void smc_tcp_listen_work(struct work_struct *work)
 		new_smc->sk.sk_sndbuf = lsmc->sk.sk_sndbuf;
 		new_smc->sk.sk_rcvbuf = lsmc->sk.sk_rcvbuf;
 		sock_hold(&new_smc->sk); /* sock_put in passive closing */
-		if (!schedule_work(&new_smc->smc_listen_work))
+		if (!queue_work(smc_hs_wq, &new_smc->smc_listen_work))
 			sock_put(&new_smc->sk);
 	}
 
@@ -1432,7 +1435,7 @@ static void smc_clcsock_data_ready(struct sock *listen_clcsock)
 	lsmc->clcsk_data_ready(listen_clcsock);
 	if (lsmc->sk.sk_state == SMC_LISTEN) {
 		sock_hold(&lsmc->sk); /* sock_put in smc_tcp_listen_work() */
-		if (!schedule_work(&lsmc->tcp_listen_work))
+		if (!queue_work(smc_hs_wq, &lsmc->tcp_listen_work))
 			sock_put(&lsmc->sk);
 	}
 }
@@ -1800,8 +1803,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname,
 		    sk->sk_state != SMC_LISTEN &&
 		    sk->sk_state != SMC_CLOSED) {
 			if (val)
-				mod_delayed_work(system_wq, &smc->conn.tx_work,
-						 0);
+				mod_delayed_work(smc->conn.lgr->tx_wq,
+						 &smc->conn.tx_work, 0);
 		}
 		break;
 	case TCP_CORK:
@@ -1809,8 +1812,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname,
 		    sk->sk_state != SMC_LISTEN &&
 		    sk->sk_state != SMC_CLOSED) {
 			if (!val)
-				mod_delayed_work(system_wq, &smc->conn.tx_work,
-						 0);
+				mod_delayed_work(smc->conn.lgr->tx_wq,
+						 &smc->conn.tx_work, 0);
 		}
 		break;
 	case TCP_DEFER_ACCEPT:
@@ -2093,10 +2096,19 @@ static int __init smc_init(void)
 	if (rc)
 		goto out_pernet_subsys;
 
+	rc = -ENOMEM;
+	smc_hs_wq = alloc_workqueue("smc_hs_wq", 0, 0);
+	if (!smc_hs_wq)
+		goto out_pnet;
+
+	smc_close_wq = alloc_workqueue("smc_close_wq", 0, 0);
+	if (!smc_close_wq)
+		goto out_alloc_hs_wq;
+
 	rc = smc_core_init();
 	if (rc) {
 		pr_err("%s: smc_core_init fails with %d\n", __func__, rc);
-		goto out_pnet;
+		goto out_alloc_wqs;
 	}
 
 	rc = smc_llc_init();
@@ -2148,6 +2160,10 @@ static int __init smc_init(void)
 	proto_unregister(&smc_proto);
 out_core:
 	smc_core_exit();
+out_alloc_wqs:
+	destroy_workqueue(smc_close_wq);
+out_alloc_hs_wq:
+	destroy_workqueue(smc_hs_wq);
 out_pnet:
 	smc_pnet_exit();
 out_pernet_subsys:
@@ -2162,6 +2178,8 @@ static void __exit smc_exit(void)
 	sock_unregister(PF_SMC);
 	smc_core_exit();
 	smc_ib_unregister_client();
+	destroy_workqueue(smc_close_wq);
+	destroy_workqueue(smc_hs_wq);
 	proto_unregister(&smc_proto6);
 	proto_unregister(&smc_proto);
 	smc_pnet_exit();