jbd2: Change j_state_lock to be a rwlock_t

Lockstat reports have shown that j_state_lock is a major source of
lock contention, especially on systems with more than 4 CPU cores.  So
change it to be a read/write spinlock.

Signed-off-by: "Theodore Ts'o" <tytso@mit.edu>
diff --git a/fs/jbd2/transaction.c b/fs/jbd2/transaction.c
index 9c64c7e..6630651 100644
--- a/fs/jbd2/transaction.c
+++ b/fs/jbd2/transaction.c
@@ -124,36 +124,38 @@
 
 	jbd_debug(3, "New handle %p going live.\n", handle);
 
-repeat:
-
 	/*
 	 * We need to hold j_state_lock until t_updates has been incremented,
 	 * for proper journal barrier handling
 	 */
-	spin_lock(&journal->j_state_lock);
-repeat_locked:
+repeat:
+	read_lock(&journal->j_state_lock);
 	if (is_journal_aborted(journal) ||
 	    (journal->j_errno != 0 && !(journal->j_flags & JBD2_ACK_ERR))) {
-		spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
 		kfree(new_transaction);
 		return -EROFS;
 	}
 
 	/* Wait on the journal's transaction barrier if necessary */
 	if (journal->j_barrier_count) {
-		spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
 		wait_event(journal->j_wait_transaction_locked,
 				journal->j_barrier_count == 0);
 		goto repeat;
 	}
 
 	if (!journal->j_running_transaction) {
-		if (!new_transaction) {
-			spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
+		if (!new_transaction)
 			goto alloc_transaction;
+		write_lock(&journal->j_state_lock);
+		if (!journal->j_running_transaction) {
+			jbd2_get_transaction(journal, new_transaction);
+			new_transaction = NULL;
 		}
-		jbd2_get_transaction(journal, new_transaction);
-		new_transaction = NULL;
+		write_unlock(&journal->j_state_lock);
+		goto repeat;
 	}
 
 	transaction = journal->j_running_transaction;
@@ -167,7 +169,7 @@
 
 		prepare_to_wait(&journal->j_wait_transaction_locked,
 					&wait, TASK_UNINTERRUPTIBLE);
-		spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
 		schedule();
 		finish_wait(&journal->j_wait_transaction_locked, &wait);
 		goto repeat;
@@ -194,7 +196,7 @@
 		prepare_to_wait(&journal->j_wait_transaction_locked, &wait,
 				TASK_UNINTERRUPTIBLE);
 		__jbd2_log_start_commit(journal, transaction->t_tid);
-		spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
 		schedule();
 		finish_wait(&journal->j_wait_transaction_locked, &wait);
 		goto repeat;
@@ -228,8 +230,12 @@
 	if (__jbd2_log_space_left(journal) < jbd_space_needed(journal)) {
 		jbd_debug(2, "Handle %p waiting for checkpoint...\n", handle);
 		spin_unlock(&transaction->t_handle_lock);
-		__jbd2_log_wait_for_space(journal);
-		goto repeat_locked;
+		read_unlock(&journal->j_state_lock);
+		write_lock(&journal->j_state_lock);
+		if (__jbd2_log_space_left(journal) < jbd_space_needed(journal))
+			__jbd2_log_wait_for_space(journal);
+		write_unlock(&journal->j_state_lock);
+		goto repeat;
 	}
 
 	/* OK, account for the buffers that this operation expects to
@@ -250,7 +256,7 @@
 		  atomic_read(&transaction->t_outstanding_credits),
 		  __jbd2_log_space_left(journal));
 	spin_unlock(&transaction->t_handle_lock);
-	spin_unlock(&journal->j_state_lock);
+	read_unlock(&journal->j_state_lock);
 
 	lock_map_acquire(&handle->h_lockdep_map);
 	kfree(new_transaction);
@@ -362,7 +368,7 @@
 
 	result = 1;
 
-	spin_lock(&journal->j_state_lock);
+	read_lock(&journal->j_state_lock);
 
 	/* Don't extend a locked-down transaction! */
 	if (handle->h_transaction->t_state != T_RUNNING) {
@@ -394,7 +400,7 @@
 unlock:
 	spin_unlock(&transaction->t_handle_lock);
 error_out:
-	spin_unlock(&journal->j_state_lock);
+	read_unlock(&journal->j_state_lock);
 out:
 	return result;
 }
@@ -432,7 +438,7 @@
 	J_ASSERT(atomic_read(&transaction->t_updates) > 0);
 	J_ASSERT(journal_current_handle() == handle);
 
-	spin_lock(&journal->j_state_lock);
+	read_lock(&journal->j_state_lock);
 	spin_lock(&transaction->t_handle_lock);
 	atomic_sub(handle->h_buffer_credits,
 		   &transaction->t_outstanding_credits);
@@ -442,7 +448,7 @@
 
 	jbd_debug(2, "restarting handle %p\n", handle);
 	__jbd2_log_start_commit(journal, transaction->t_tid);
-	spin_unlock(&journal->j_state_lock);
+	read_unlock(&journal->j_state_lock);
 
 	lock_map_release(&handle->h_lockdep_map);
 	handle->h_buffer_credits = nblocks;
@@ -472,7 +478,7 @@
 {
 	DEFINE_WAIT(wait);
 
-	spin_lock(&journal->j_state_lock);
+	write_lock(&journal->j_state_lock);
 	++journal->j_barrier_count;
 
 	/* Wait until there are no running updates */
@@ -490,12 +496,12 @@
 		prepare_to_wait(&journal->j_wait_updates, &wait,
 				TASK_UNINTERRUPTIBLE);
 		spin_unlock(&transaction->t_handle_lock);
-		spin_unlock(&journal->j_state_lock);
+		write_unlock(&journal->j_state_lock);
 		schedule();
 		finish_wait(&journal->j_wait_updates, &wait);
-		spin_lock(&journal->j_state_lock);
+		write_lock(&journal->j_state_lock);
 	}
-	spin_unlock(&journal->j_state_lock);
+	write_unlock(&journal->j_state_lock);
 
 	/*
 	 * We have now established a barrier against other normal updates, but
@@ -519,9 +525,9 @@
 	J_ASSERT(journal->j_barrier_count != 0);
 
 	mutex_unlock(&journal->j_barrier);
-	spin_lock(&journal->j_state_lock);
+	write_lock(&journal->j_state_lock);
 	--journal->j_barrier_count;
-	spin_unlock(&journal->j_state_lock);
+	write_unlock(&journal->j_state_lock);
 	wake_up(&journal->j_wait_transaction_locked);
 }
 
@@ -1314,9 +1320,9 @@
 
 		journal->j_last_sync_writer = pid;
 
-		spin_lock(&journal->j_state_lock);
+		read_lock(&journal->j_state_lock);
 		commit_time = journal->j_average_commit_time;
-		spin_unlock(&journal->j_state_lock);
+		read_unlock(&journal->j_state_lock);
 
 		trans_time = ktime_to_ns(ktime_sub(ktime_get(),
 						   transaction->t_start_time));
@@ -1748,7 +1754,7 @@
 		goto zap_buffer_unlocked;
 
 	/* OK, we have data buffer in journaled mode */
-	spin_lock(&journal->j_state_lock);
+	write_lock(&journal->j_state_lock);
 	jbd_lock_bh_state(bh);
 	spin_lock(&journal->j_list_lock);
 
@@ -1801,7 +1807,7 @@
 			jbd2_journal_put_journal_head(jh);
 			spin_unlock(&journal->j_list_lock);
 			jbd_unlock_bh_state(bh);
-			spin_unlock(&journal->j_state_lock);
+			write_unlock(&journal->j_state_lock);
 			return ret;
 		} else {
 			/* There is no currently-running transaction. So the
@@ -1815,7 +1821,7 @@
 				jbd2_journal_put_journal_head(jh);
 				spin_unlock(&journal->j_list_lock);
 				jbd_unlock_bh_state(bh);
-				spin_unlock(&journal->j_state_lock);
+				write_unlock(&journal->j_state_lock);
 				return ret;
 			} else {
 				/* The orphan record's transaction has
@@ -1839,7 +1845,7 @@
 		jbd2_journal_put_journal_head(jh);
 		spin_unlock(&journal->j_list_lock);
 		jbd_unlock_bh_state(bh);
-		spin_unlock(&journal->j_state_lock);
+		write_unlock(&journal->j_state_lock);
 		return 0;
 	} else {
 		/* Good, the buffer belongs to the running transaction.
@@ -1858,7 +1864,7 @@
 zap_buffer_no_jh:
 	spin_unlock(&journal->j_list_lock);
 	jbd_unlock_bh_state(bh);
-	spin_unlock(&journal->j_state_lock);
+	write_unlock(&journal->j_state_lock);
 zap_buffer_unlocked:
 	clear_buffer_dirty(bh);
 	J_ASSERT_BH(bh, !buffer_jbddirty(bh));
@@ -2165,9 +2171,9 @@
 	/* Locks are here just to force reading of recent values, it is
 	 * enough that the transaction was not committing before we started
 	 * a transaction adding the inode to orphan list */
-	spin_lock(&journal->j_state_lock);
+	read_lock(&journal->j_state_lock);
 	commit_trans = journal->j_committing_transaction;
-	spin_unlock(&journal->j_state_lock);
+	read_unlock(&journal->j_state_lock);
 	spin_lock(&journal->j_list_lock);
 	inode_trans = jinode->i_transaction;
 	spin_unlock(&journal->j_list_lock);