rwsem: Implement writer lock-stealing for better scalability

Commit 5a505085f043 ("mm/rmap: Convert the struct anon_vma::mutex
to an rwsem") changed struct anon_vma::mutex to an rwsem, which
caused aim7 fork_test performance to drop by 50%.

Yuanhan Liu did the following excellent analysis:

    https://lkml.org/lkml/2013/1/29/84

and found that the regression is caused by strict, serialized,
FIFO sequential write-ownership of rwsems. Ingo suggested
implementing opportunistic lock-stealing for the front writer
task in the waitqueue.

Yuanhan Liu implemented lock-stealing for spinlock-rwsems,
which indeed recovered much of the regression - confirming
the analysis that the main factor in the regression was the
FIFO writer-fairness of rwsems.

In this patch we allow lock-stealing to happen when the first
waiter is also writer. With that change in place the
aim7 fork_test performance is fully recovered on my
Intel NHM EP, NHM EX, SNB EP 2S and 4S test-machines.

Reported-by: lkp@linux.intel.com
Reported-by: Yuanhan Liu <yuanhan.liu@linux.intel.com>
Signed-off-by: Alex Shi <alex.shi@intel.com>
Cc: David Howells <dhowells@redhat.com>
Cc: Michel Lespinasse <walken@google.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: Peter Zijlstra <a.p.zijlstra@chello.nl>
Cc: Anton Blanchard <anton@samba.org>
Cc: Arjan van de Ven <arjan@linux.intel.com>
Cc: paul.gortmaker@windriver.com
Link: https://lkml.org/lkml/2013/1/29/84
Link: http://lkml.kernel.org/r/1360069915-31619-1-git-send-email-alex.shi@intel.com
[ Small stylistic fixes, updated changelog. ]
Signed-off-by: Ingo Molnar <mingo@kernel.org>
diff --git a/lib/rwsem.c b/lib/rwsem.c
index 8337e1b9b..ad5e0df 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -2,6 +2,8 @@
  *
  * Written by David Howells (dhowells@redhat.com).
  * Derived from arch/i386/kernel/semaphore.c
+ *
+ * Writer lock-stealing by Alex Shi <alex.shi@intel.com>
  */
 #include <linux/rwsem.h>
 #include <linux/sched.h>
@@ -60,7 +62,7 @@
 	struct rwsem_waiter *waiter;
 	struct task_struct *tsk;
 	struct list_head *next;
-	signed long oldcount, woken, loop, adjustment;
+	signed long woken, loop, adjustment;
 
 	waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
 	if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
@@ -72,30 +74,8 @@
 		 */
 		goto out;
 
-	/* There's a writer at the front of the queue - try to grant it the
-	 * write lock.  However, we only wake this writer if we can transition
-	 * the active part of the count from 0 -> 1
-	 */
-	adjustment = RWSEM_ACTIVE_WRITE_BIAS;
-	if (waiter->list.next == &sem->wait_list)
-		adjustment -= RWSEM_WAITING_BIAS;
-
- try_again_write:
-	oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
-	if (oldcount & RWSEM_ACTIVE_MASK)
-		/* Someone grabbed the sem already */
-		goto undo_write;
-
-	/* We must be careful not to touch 'waiter' after we set ->task = NULL.
-	 * It is an allocated on the waiter's stack and may become invalid at
-	 * any time after that point (due to a wakeup from another source).
-	 */
-	list_del(&waiter->list);
-	tsk = waiter->task;
-	smp_mb();
-	waiter->task = NULL;
-	wake_up_process(tsk);
-	put_task_struct(tsk);
+	/* Wake up the writing waiter and let the task grab the sem: */
+	wake_up_process(waiter->task);
 	goto out;
 
  readers_only:
@@ -157,12 +137,40 @@
 
  out:
 	return sem;
+}
 
-	/* undo the change to the active count, but check for a transition
-	 * 1->0 */
- undo_write:
+/* Try to get write sem, caller holds sem->wait_lock: */
+static int try_get_writer_sem(struct rw_semaphore *sem,
+					struct rwsem_waiter *waiter)
+{
+	struct rwsem_waiter *fwaiter;
+	long oldcount, adjustment;
+
+	/* only steal when first waiter is writing */
+	fwaiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
+	if (!(fwaiter->flags & RWSEM_WAITING_FOR_WRITE))
+		return 0;
+
+	adjustment = RWSEM_ACTIVE_WRITE_BIAS;
+	/* Only one waiter in the queue: */
+	if (fwaiter == waiter && waiter->list.next == &sem->wait_list)
+		adjustment -= RWSEM_WAITING_BIAS;
+
+try_again_write:
+	oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
+	if (!(oldcount & RWSEM_ACTIVE_MASK)) {
+		/* No active lock: */
+		struct task_struct *tsk = waiter->task;
+
+		list_del(&waiter->list);
+		smp_mb();
+		put_task_struct(tsk);
+		tsk->state = TASK_RUNNING;
+		return 1;
+	}
+	/* some one grabbed the sem already */
 	if (rwsem_atomic_update(-adjustment, sem) & RWSEM_ACTIVE_MASK)
-		goto out;
+		return 0;
 	goto try_again_write;
 }
 
@@ -210,6 +218,15 @@
 	for (;;) {
 		if (!waiter.task)
 			break;
+
+		raw_spin_lock_irq(&sem->wait_lock);
+		/* Try to get the writer sem, may steal from the head writer: */
+		if (flags == RWSEM_WAITING_FOR_WRITE)
+			if (try_get_writer_sem(sem, &waiter)) {
+				raw_spin_unlock_irq(&sem->wait_lock);
+				return sem;
+			}
+		raw_spin_unlock_irq(&sem->wait_lock);
 		schedule();
 		set_task_state(tsk, TASK_UNINTERRUPTIBLE);
 	}