Merge branches 'exp.2022.02.24a', 'fixes.2022.02.14a', 'rcu_barrier.2022.02.08a', 'rcu-tasks.2022.02.08a', 'rt.2022.02.01b', 'torture.2022.02.01b' and 'torturescript.2022.02.08a' into HEAD

exp.2022.02.24a: Expedited grace-period updates.
fixes.2022.02.14a: Miscellaneous fixes.
rcu_barrier.2022.02.08a: Make rcu_barrier() no longer exclude CPU hotplug.
rcu-tasks.2022.02.08a: RCU-tasks updates.
rt.2022.02.01b: Real-time-related updates.
torture.2022.02.01b: Torture-test updates.
torturescript.2022.02.08a: Torture-test scripting updates.
diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
index f5a27f0..8e2e651 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -4504,6 +4504,8 @@
 			(the least-favored priority).  Otherwise, when
 			RCU_BOOST is not set, valid values are 0-99 and
 			the default is zero (non-realtime operation).
+			When RCU_NOCB_CPU is set, also adjust the
+			priority of NOCB callback kthreads.
 
 	rcutree.rcu_nocb_gp_stride= [KNL]
 			Set the number of NOCB callback kthreads in
diff --git a/MAINTAINERS b/MAINTAINERS
index ea3e6c9..5ad49de 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -16298,6 +16298,8 @@
 
 READ-COPY UPDATE (RCU)
 M:	"Paul E. McKenney" <paulmck@kernel.org>
+M:	Frederic Weisbecker <frederic@kernel.org> (kernel/rcu/tree_nocb.h)
+M:	Neeraj Upadhyay <quic_neeraju@quicinc.com> (kernel/rcu/tasks.h)
 M:	Josh Triplett <josh@joshtriplett.org>
 R:	Steven Rostedt <rostedt@goodmis.org>
 R:	Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 88b42eb..e7c39c2 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -84,7 +84,7 @@ static inline int rcu_preempt_depth(void)
 
 /* Internal to kernel */
 void rcu_init(void);
-extern int rcu_scheduler_active __read_mostly;
+extern int rcu_scheduler_active;
 void rcu_sched_clock_irq(int user);
 void rcu_report_dead(unsigned int cpu);
 void rcutree_migrate_callbacks(int cpu);
@@ -924,7 +924,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
  *
  *     kvfree_rcu(ptr);
  *
- * where @ptr is a pointer to kvfree().
+ * where @ptr is the pointer to be freed by kvfree().
  *
  * Please note, head-less way of freeing is permitted to
  * use from a context that has to follow might_sleep()
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 53209d6..76665db 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -62,7 +62,7 @@ static inline void rcu_irq_exit_check_preempt(void) { }
 void exit_rcu(void);
 
 void rcu_scheduler_starting(void);
-extern int rcu_scheduler_active __read_mostly;
+extern int rcu_scheduler_active;
 void rcu_end_inkernel_boot(void);
 bool rcu_inkernel_boot_has_ended(void);
 bool rcu_is_watching(void);
diff --git a/include/linux/rcuwait.h b/include/linux/rcuwait.h
index 61c56cc..8052d34d 100644
--- a/include/linux/rcuwait.h
+++ b/include/linux/rcuwait.h
@@ -47,11 +47,7 @@ static inline void prepare_to_rcuwait(struct rcuwait *w)
 	rcu_assign_pointer(w->task, current);
 }
 
-static inline void finish_rcuwait(struct rcuwait *w)
-{
-        rcu_assign_pointer(w->task, NULL);
-	__set_current_state(TASK_RUNNING);
-}
+extern void finish_rcuwait(struct rcuwait *w);
 
 #define rcuwait_wait_event(w, condition, state)				\
 ({									\
diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h
index 670e417..90b2fb0 100644
--- a/include/trace/events/rcu.h
+++ b/include/trace/events/rcu.h
@@ -794,16 +794,15 @@ TRACE_EVENT_RCU(rcu_torture_read,
  * Tracepoint for rcu_barrier() execution.  The string "s" describes
  * the rcu_barrier phase:
  *	"Begin": rcu_barrier() started.
+ *	"CB": An rcu_barrier_callback() invoked a callback, not the last.
  *	"EarlyExit": rcu_barrier() piggybacked, thus early exit.
  *	"Inc1": rcu_barrier() piggyback check counter incremented.
- *	"OfflineNoCBQ": rcu_barrier() found offline no-CBs CPU with callbacks.
- *	"OnlineQ": rcu_barrier() found online CPU with callbacks.
- *	"OnlineNQ": rcu_barrier() found online CPU, no callbacks.
+ *	"Inc2": rcu_barrier() piggyback check counter incremented.
  *	"IRQ": An rcu_barrier_callback() callback posted on remote CPU.
  *	"IRQNQ": An rcu_barrier_callback() callback found no callbacks.
- *	"CB": An rcu_barrier_callback() invoked a callback, not the last.
  *	"LastCB": An rcu_barrier_callback() invoked the last callback.
- *	"Inc2": rcu_barrier() piggyback check counter incremented.
+ *	"NQ": rcu_barrier() found a CPU with no callbacks.
+ *	"OnlineQ": rcu_barrier() found online CPU with callbacks.
  * The "cpu" argument is the CPU or -1 if meaningless, the "cnt" argument
  * is the count of remaining callbacks, and "done" is the piggybacking count.
  */
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index e373fbe..431cee2 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -56,13 +56,13 @@ static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp)
 static inline void rcu_segcblist_set_flags(struct rcu_segcblist *rsclp,
 					   int flags)
 {
-	rsclp->flags |= flags;
+	WRITE_ONCE(rsclp->flags, rsclp->flags | flags);
 }
 
 static inline void rcu_segcblist_clear_flags(struct rcu_segcblist *rsclp,
 					     int flags)
 {
-	rsclp->flags &= ~flags;
+	WRITE_ONCE(rsclp->flags, rsclp->flags & ~flags);
 }
 
 static inline bool rcu_segcblist_test_flags(struct rcu_segcblist *rsclp,
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 422f7e4..55d049c 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -284,7 +284,7 @@ static atomic_t barrier_cbs_invoked;	/* Barrier callbacks invoked. */
 static wait_queue_head_t *barrier_cbs_wq; /* Coordinate barrier testing. */
 static DECLARE_WAIT_QUEUE_HEAD(barrier_wq);
 
-static bool rcu_fwd_cb_nodelay;		/* Short rcu_torture_delay() delays. */
+static atomic_t rcu_fwd_cb_nodelay;	/* Short rcu_torture_delay() delays. */
 
 /*
  * Allocate an element from the rcu_tortures pool.
@@ -387,7 +387,7 @@ rcu_read_delay(struct torture_random_state *rrsp, struct rt_read_seg *rtrsp)
 	 * period, and we want a long delay occasionally to trigger
 	 * force_quiescent_state. */
 
-	if (!READ_ONCE(rcu_fwd_cb_nodelay) &&
+	if (!atomic_read(&rcu_fwd_cb_nodelay) &&
 	    !(torture_random(rrsp) % (nrealreaders * 2000 * longdelay_ms))) {
 		started = cur_ops->get_gp_seq();
 		ts = rcu_trace_clock_local();
@@ -674,6 +674,7 @@ static struct rcu_torture_ops srcu_ops = {
 	.call		= srcu_torture_call,
 	.cb_barrier	= srcu_torture_barrier,
 	.stats		= srcu_torture_stats,
+	.cbflood_max	= 50000,
 	.irq_capable	= 1,
 	.no_pi_lock	= IS_ENABLED(CONFIG_TINY_SRCU),
 	.name		= "srcu"
@@ -708,6 +709,7 @@ static struct rcu_torture_ops srcud_ops = {
 	.call		= srcu_torture_call,
 	.cb_barrier	= srcu_torture_barrier,
 	.stats		= srcu_torture_stats,
+	.cbflood_max	= 50000,
 	.irq_capable	= 1,
 	.no_pi_lock	= IS_ENABLED(CONFIG_TINY_SRCU),
 	.name		= "srcud"
@@ -997,7 +999,7 @@ static int rcu_torture_boost(void *arg)
 			goto checkwait;
 
 		/* Wait for the next test interval. */
-		oldstarttime = boost_starttime;
+		oldstarttime = READ_ONCE(boost_starttime);
 		while (time_before(jiffies, oldstarttime)) {
 			schedule_timeout_interruptible(oldstarttime - jiffies);
 			if (stutter_wait("rcu_torture_boost"))
@@ -1041,10 +1043,11 @@ static int rcu_torture_boost(void *arg)
 		 * interval.  Besides, we are running at RT priority,
 		 * so delays should be relatively rare.
 		 */
-		while (oldstarttime == boost_starttime && !kthread_should_stop()) {
+		while (oldstarttime == READ_ONCE(boost_starttime) && !kthread_should_stop()) {
 			if (mutex_trylock(&boost_mutex)) {
 				if (oldstarttime == boost_starttime) {
-					boost_starttime = jiffies + test_boost_interval * HZ;
+					WRITE_ONCE(boost_starttime,
+						   jiffies + test_boost_interval * HZ);
 					n_rcu_torture_boosts++;
 				}
 				mutex_unlock(&boost_mutex);
@@ -1276,7 +1279,7 @@ rcu_torture_writer(void *arg)
 		boot_ended = rcu_inkernel_boot_has_ended();
 		stutter_waited = stutter_wait("rcu_torture_writer");
 		if (stutter_waited &&
-		    !READ_ONCE(rcu_fwd_cb_nodelay) &&
+		    !atomic_read(&rcu_fwd_cb_nodelay) &&
 		    !cur_ops->slow_gps &&
 		    !torture_must_stop() &&
 		    boot_ended)
@@ -2180,7 +2183,6 @@ static void rcu_torture_fwd_cb_hist(struct rcu_fwd *rfp)
 	for (i = ARRAY_SIZE(rfp->n_launders_hist) - 1; i > 0; i--)
 		if (rfp->n_launders_hist[i].n_launders > 0)
 			break;
-	mutex_lock(&rcu_fwd_mutex); // Serialize histograms.
 	pr_alert("%s: Callback-invocation histogram %d (duration %lu jiffies):",
 		 __func__, rfp->rcu_fwd_id, jiffies - rfp->rcu_fwd_startat);
 	gps_old = rfp->rcu_launder_gp_seq_start;
@@ -2193,7 +2195,6 @@ static void rcu_torture_fwd_cb_hist(struct rcu_fwd *rfp)
 		gps_old = gps;
 	}
 	pr_cont("\n");
-	mutex_unlock(&rcu_fwd_mutex);
 }
 
 /* Callback function for continuous-flood RCU callbacks. */
@@ -2281,6 +2282,7 @@ static void rcu_torture_fwd_prog_nr(struct rcu_fwd *rfp,
 	unsigned long stopat;
 	static DEFINE_TORTURE_RANDOM(trs);
 
+	pr_alert("%s: Starting forward-progress test %d\n", __func__, rfp->rcu_fwd_id);
 	if (!cur_ops->sync)
 		return; // Cannot do need_resched() forward progress testing without ->sync.
 	if (cur_ops->call && cur_ops->cb_barrier) {
@@ -2289,7 +2291,7 @@ static void rcu_torture_fwd_prog_nr(struct rcu_fwd *rfp,
 	}
 
 	/* Tight loop containing cond_resched(). */
-	WRITE_ONCE(rcu_fwd_cb_nodelay, true);
+	atomic_inc(&rcu_fwd_cb_nodelay);
 	cur_ops->sync(); /* Later readers see above write. */
 	if  (selfpropcb) {
 		WRITE_ONCE(fcs.stop, 0);
@@ -2325,6 +2327,7 @@ static void rcu_torture_fwd_prog_nr(struct rcu_fwd *rfp,
 	if (selfpropcb) {
 		WRITE_ONCE(fcs.stop, 1);
 		cur_ops->sync(); /* Wait for running CB to complete. */
+		pr_alert("%s: Waiting for CBs: %pS() %d\n", __func__, cur_ops->cb_barrier, rfp->rcu_fwd_id);
 		cur_ops->cb_barrier(); /* Wait for queued callbacks. */
 	}
 
@@ -2333,7 +2336,7 @@ static void rcu_torture_fwd_prog_nr(struct rcu_fwd *rfp,
 		destroy_rcu_head_on_stack(&fcs.rh);
 	}
 	schedule_timeout_uninterruptible(HZ / 10); /* Let kthreads recover. */
-	WRITE_ONCE(rcu_fwd_cb_nodelay, false);
+	atomic_dec(&rcu_fwd_cb_nodelay);
 }
 
 /* Carry out call_rcu() forward-progress testing. */
@@ -2353,13 +2356,14 @@ static void rcu_torture_fwd_prog_cr(struct rcu_fwd *rfp)
 	unsigned long stopat;
 	unsigned long stoppedat;
 
+	pr_alert("%s: Starting forward-progress test %d\n", __func__, rfp->rcu_fwd_id);
 	if (READ_ONCE(rcu_fwd_emergency_stop))
 		return; /* Get out of the way quickly, no GP wait! */
 	if (!cur_ops->call)
 		return; /* Can't do call_rcu() fwd prog without ->call. */
 
 	/* Loop continuously posting RCU callbacks. */
-	WRITE_ONCE(rcu_fwd_cb_nodelay, true);
+	atomic_inc(&rcu_fwd_cb_nodelay);
 	cur_ops->sync(); /* Later readers see above write. */
 	WRITE_ONCE(rfp->rcu_fwd_startat, jiffies);
 	stopat = rfp->rcu_fwd_startat + MAX_FWD_CB_JIFFIES;
@@ -2414,6 +2418,7 @@ static void rcu_torture_fwd_prog_cr(struct rcu_fwd *rfp)
 	n_launders_cb_snap = READ_ONCE(rfp->n_launders_cb);
 	cver = READ_ONCE(rcu_torture_current_version) - cver;
 	gps = rcutorture_seq_diff(cur_ops->get_gp_seq(), gps);
+	pr_alert("%s: Waiting for CBs: %pS() %d\n", __func__, cur_ops->cb_barrier, rfp->rcu_fwd_id);
 	cur_ops->cb_barrier(); /* Wait for callbacks to be invoked. */
 	(void)rcu_torture_fwd_prog_cbfree(rfp);
 
@@ -2427,11 +2432,13 @@ static void rcu_torture_fwd_prog_cr(struct rcu_fwd *rfp)
 			 n_launders, n_launders_sa,
 			 n_max_gps, n_max_cbs, cver, gps);
 		atomic_long_add(n_max_cbs, &rcu_fwd_max_cbs);
+		mutex_lock(&rcu_fwd_mutex); // Serialize histograms.
 		rcu_torture_fwd_cb_hist(rfp);
+		mutex_unlock(&rcu_fwd_mutex);
 	}
 	schedule_timeout_uninterruptible(HZ); /* Let CBs drain. */
 	tick_dep_clear_task(current, TICK_DEP_BIT_RCU);
-	WRITE_ONCE(rcu_fwd_cb_nodelay, false);
+	atomic_dec(&rcu_fwd_cb_nodelay);
 }
 
 
@@ -2511,7 +2518,7 @@ static int rcu_torture_fwd_prog(void *args)
 			firsttime = false;
 			WRITE_ONCE(rcu_fwd_seq, rcu_fwd_seq + 1);
 		} else {
-			while (READ_ONCE(rcu_fwd_seq) == oldseq)
+			while (READ_ONCE(rcu_fwd_seq) == oldseq && !torture_must_stop())
 				schedule_timeout_interruptible(1);
 			oldseq = READ_ONCE(rcu_fwd_seq);
 		}
@@ -2905,8 +2912,10 @@ rcu_torture_cleanup(void)
 	int i;
 
 	if (torture_cleanup_begin()) {
-		if (cur_ops->cb_barrier != NULL)
+		if (cur_ops->cb_barrier != NULL) {
+			pr_info("%s: Invoking %pS().\n", __func__, cur_ops->cb_barrier);
 			cur_ops->cb_barrier();
+		}
 		return;
 	}
 	if (!cur_ops) {
@@ -2961,8 +2970,10 @@ rcu_torture_cleanup(void)
 	 * Wait for all RCU callbacks to fire, then do torture-type-specific
 	 * cleanup operations.
 	 */
-	if (cur_ops->cb_barrier != NULL)
+	if (cur_ops->cb_barrier != NULL) {
+		pr_info("%s: Invoking %pS().\n", __func__, cur_ops->cb_barrier);
 		cur_ops->cb_barrier();
+	}
 	if (cur_ops->cleanup != NULL)
 		cur_ops->cleanup();
 
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index d64f0b1..ac17348 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -123,7 +123,7 @@ static struct rcu_tasks rt_name =							\
 	.call_func = call,								\
 	.rtpcpu = &rt_name ## __percpu,							\
 	.name = n,									\
-	.percpu_enqueue_shift = ilog2(CONFIG_NR_CPUS) + 1,				\
+	.percpu_enqueue_shift = order_base_2(CONFIG_NR_CPUS),				\
 	.percpu_enqueue_lim = 1,							\
 	.percpu_dequeue_lim = 1,							\
 	.barrier_q_mutex = __MUTEX_INITIALIZER(rt_name.barrier_q_mutex),		\
@@ -302,7 +302,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
 	if (unlikely(needadjust)) {
 		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
 		if (rtp->percpu_enqueue_lim != nr_cpu_ids) {
-			WRITE_ONCE(rtp->percpu_enqueue_shift, ilog2(nr_cpu_ids) + 1);
+			WRITE_ONCE(rtp->percpu_enqueue_shift, 0);
 			WRITE_ONCE(rtp->percpu_dequeue_lim, nr_cpu_ids);
 			smp_store_release(&rtp->percpu_enqueue_lim, nr_cpu_ids);
 			pr_info("Switching %s to per-CPU callback queuing.\n", rtp->name);
@@ -417,7 +417,7 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
 	if (rcu_task_cb_adjust && ncbs <= rcu_task_collapse_lim) {
 		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
 		if (rtp->percpu_enqueue_lim > 1) {
-			WRITE_ONCE(rtp->percpu_enqueue_shift, ilog2(nr_cpu_ids) + 1);
+			WRITE_ONCE(rtp->percpu_enqueue_shift, order_base_2(nr_cpu_ids));
 			smp_store_release(&rtp->percpu_enqueue_lim, 1);
 			rtp->percpu_dequeue_gpseq = get_state_synchronize_rcu();
 			pr_info("Starting switch %s to CPU-0 callback queuing.\n", rtp->name);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index a4c25a6..acf3985 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -87,11 +87,12 @@ static struct rcu_state rcu_state = {
 	.gp_state = RCU_GP_IDLE,
 	.gp_seq = (0UL - 300UL) << RCU_SEQ_CTR_SHIFT,
 	.barrier_mutex = __MUTEX_INITIALIZER(rcu_state.barrier_mutex),
+	.barrier_lock = __RAW_SPIN_LOCK_UNLOCKED(rcu_state.barrier_lock),
 	.name = RCU_NAME,
 	.abbr = RCU_ABBR,
 	.exp_mutex = __MUTEX_INITIALIZER(rcu_state.exp_mutex),
 	.exp_wake_mutex = __MUTEX_INITIALIZER(rcu_state.exp_wake_mutex),
-	.ofl_lock = __RAW_SPIN_LOCK_UNLOCKED(rcu_state.ofl_lock),
+	.ofl_lock = __ARCH_SPIN_LOCK_UNLOCKED,
 };
 
 /* Dump rcu_node combining tree at boot to verify correct setup. */
@@ -153,7 +154,7 @@ static void sync_sched_exp_online_cleanup(int cpu);
 static void check_cb_ovld_locked(struct rcu_data *rdp, struct rcu_node *rnp);
 static bool rcu_rdp_is_offloaded(struct rcu_data *rdp);
 
-/* rcuc/rcub kthread realtime priority */
+/* rcuc/rcub/rcuop kthread realtime priority */
 static int kthread_prio = IS_ENABLED(CONFIG_RCU_BOOST) ? 1 : 0;
 module_param(kthread_prio, int, 0444);
 
@@ -222,6 +223,16 @@ static unsigned long rcu_rnp_online_cpus(struct rcu_node *rnp)
 }
 
 /*
+ * Is the CPU corresponding to the specified rcu_data structure online
+ * from RCU's perspective?  This perspective is given by that structure's
+ * ->qsmaskinitnext field rather than by the global cpu_online_mask.
+ */
+static bool rcu_rdp_cpu_online(struct rcu_data *rdp)
+{
+	return !!(rdp->grpmask & rcu_rnp_online_cpus(rdp->mynode));
+}
+
+/*
  * Return true if an RCU grace period is in progress.  The READ_ONCE()s
  * permit this function to be invoked without holding the root rcu_node
  * structure's ->lock, but of course results can be subject to change.
@@ -1167,15 +1178,20 @@ void rcu_request_urgent_qs_task(struct task_struct *t)
 bool rcu_lockdep_current_cpu_online(void)
 {
 	struct rcu_data *rdp;
-	struct rcu_node *rnp;
 	bool ret = false;
 
 	if (in_nmi() || !rcu_scheduler_fully_active)
 		return true;
 	preempt_disable_notrace();
 	rdp = this_cpu_ptr(&rcu_data);
-	rnp = rdp->mynode;
-	if (rdp->grpmask & rcu_rnp_online_cpus(rnp) || READ_ONCE(rnp->ofl_seq) & 0x1)
+	/*
+	 * Strictly, we care here about the case where the current CPU is
+	 * in rcu_cpu_starting() and thus has an excuse for rdp->grpmask
+	 * not being up to date. So arch_spin_is_locked() might have a
+	 * false positive if it's held by some *other* CPU, but that's
+	 * OK because that just means a false *negative* on the warning.
+	 */
+	if (rcu_rdp_cpu_online(rdp) || arch_spin_is_locked(&rcu_state.ofl_lock))
 		ret = true;
 	preempt_enable_notrace();
 	return ret;
@@ -1260,8 +1276,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
 	 * For more detail, please refer to the "Hotplug CPU" section
 	 * of RCU's Requirements documentation.
 	 */
-	if (WARN_ON_ONCE(!(rdp->grpmask & rcu_rnp_online_cpus(rnp)))) {
-		bool onl;
+	if (WARN_ON_ONCE(!rcu_rdp_cpu_online(rdp))) {
 		struct rcu_node *rnp1;
 
 		pr_info("%s: grp: %d-%d level: %d ->gp_seq %ld ->completedqs %ld\n",
@@ -1270,9 +1285,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
 		for (rnp1 = rnp; rnp1; rnp1 = rnp1->parent)
 			pr_info("%s: %d:%d ->qsmask %#lx ->qsmaskinit %#lx ->qsmaskinitnext %#lx ->rcu_gp_init_mask %#lx\n",
 				__func__, rnp1->grplo, rnp1->grphi, rnp1->qsmask, rnp1->qsmaskinit, rnp1->qsmaskinitnext, rnp1->rcu_gp_init_mask);
-		onl = !!(rdp->grpmask & rcu_rnp_online_cpus(rnp));
 		pr_info("%s %d: %c online: %ld(%d) offline: %ld(%d)\n",
-			__func__, rdp->cpu, ".o"[onl],
+			__func__, rdp->cpu, ".o"[rcu_rdp_cpu_online(rdp)],
 			(long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_flags,
 			(long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_flags);
 		return 1; /* Break things loose after complaining. */
@@ -1739,7 +1753,6 @@ static void rcu_strict_gp_boundary(void *unused)
  */
 static noinline_for_stack bool rcu_gp_init(void)
 {
-	unsigned long firstseq;
 	unsigned long flags;
 	unsigned long oldmask;
 	unsigned long mask;
@@ -1782,22 +1795,17 @@ static noinline_for_stack bool rcu_gp_init(void)
 	 * of RCU's Requirements documentation.
 	 */
 	WRITE_ONCE(rcu_state.gp_state, RCU_GP_ONOFF);
+	/* Exclude CPU hotplug operations. */
 	rcu_for_each_leaf_node(rnp) {
-		// Wait for CPU-hotplug operations that might have
-		// started before this grace period did.
-		smp_mb(); // Pair with barriers used when updating ->ofl_seq to odd values.
-		firstseq = READ_ONCE(rnp->ofl_seq);
-		if (firstseq & 0x1)
-			while (firstseq == READ_ONCE(rnp->ofl_seq))
-				schedule_timeout_idle(1);  // Can't wake unless RCU is watching.
-		smp_mb(); // Pair with barriers used when updating ->ofl_seq to even values.
-		raw_spin_lock(&rcu_state.ofl_lock);
-		raw_spin_lock_irq_rcu_node(rnp);
+		local_irq_save(flags);
+		arch_spin_lock(&rcu_state.ofl_lock);
+		raw_spin_lock_rcu_node(rnp);
 		if (rnp->qsmaskinit == rnp->qsmaskinitnext &&
 		    !rnp->wait_blkd_tasks) {
 			/* Nothing to do on this leaf rcu_node structure. */
-			raw_spin_unlock_irq_rcu_node(rnp);
-			raw_spin_unlock(&rcu_state.ofl_lock);
+			raw_spin_unlock_rcu_node(rnp);
+			arch_spin_unlock(&rcu_state.ofl_lock);
+			local_irq_restore(flags);
 			continue;
 		}
 
@@ -1832,8 +1840,9 @@ static noinline_for_stack bool rcu_gp_init(void)
 				rcu_cleanup_dead_rnp(rnp);
 		}
 
-		raw_spin_unlock_irq_rcu_node(rnp);
-		raw_spin_unlock(&rcu_state.ofl_lock);
+		raw_spin_unlock_rcu_node(rnp);
+		arch_spin_unlock(&rcu_state.ofl_lock);
+		local_irq_restore(flags);
 	}
 	rcu_gp_slow(gp_preinit_delay); /* Races with CPU hotplug. */
 
@@ -2850,10 +2859,12 @@ static void rcu_cpu_kthread(unsigned int cpu)
 {
 	unsigned int *statusp = this_cpu_ptr(&rcu_data.rcu_cpu_kthread_status);
 	char work, *workp = this_cpu_ptr(&rcu_data.rcu_cpu_has_work);
+	unsigned long *j = this_cpu_ptr(&rcu_data.rcuc_activity);
 	int spincnt;
 
 	trace_rcu_utilization(TPS("Start CPU kthread@rcu_run"));
 	for (spincnt = 0; spincnt < 10; spincnt++) {
+		WRITE_ONCE(*j, jiffies);
 		local_bh_disable();
 		*statusp = RCU_KTHREAD_RUNNING;
 		local_irq_disable();
@@ -2874,6 +2885,7 @@ static void rcu_cpu_kthread(unsigned int cpu)
 	schedule_timeout_idle(2);
 	trace_rcu_utilization(TPS("End CPU kthread@rcu_yield"));
 	*statusp = RCU_KTHREAD_WAITING;
+	WRITE_ONCE(*j, jiffies);
 }
 
 static struct smp_hotplug_thread rcu_cpu_thread_spec = {
@@ -2894,7 +2906,7 @@ static int __init rcu_spawn_core_kthreads(void)
 
 	for_each_possible_cpu(cpu)
 		per_cpu(rcu_data.rcu_cpu_has_work, cpu) = 0;
-	if (!IS_ENABLED(CONFIG_RCU_BOOST) && use_softirq)
+	if (use_softirq)
 		return 0;
 	WARN_ONCE(smpboot_register_percpu_thread(&rcu_cpu_thread_spec),
 		  "%s: Could not start rcuc kthread, OOM is now expected behavior\n", __func__);
@@ -2995,72 +3007,6 @@ static void check_cb_ovld(struct rcu_data *rdp)
 	raw_spin_unlock_rcu_node(rnp);
 }
 
-/* Helper function for call_rcu() and friends.  */
-static void
-__call_rcu(struct rcu_head *head, rcu_callback_t func)
-{
-	static atomic_t doublefrees;
-	unsigned long flags;
-	struct rcu_data *rdp;
-	bool was_alldone;
-
-	/* Misaligned rcu_head! */
-	WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
-
-	if (debug_rcu_head_queue(head)) {
-		/*
-		 * Probable double call_rcu(), so leak the callback.
-		 * Use rcu:rcu_callback trace event to find the previous
-		 * time callback was passed to __call_rcu().
-		 */
-		if (atomic_inc_return(&doublefrees) < 4) {
-			pr_err("%s(): Double-freed CB %p->%pS()!!!  ", __func__, head, head->func);
-			mem_dump_obj(head);
-		}
-		WRITE_ONCE(head->func, rcu_leak_callback);
-		return;
-	}
-	head->func = func;
-	head->next = NULL;
-	local_irq_save(flags);
-	kasan_record_aux_stack_noalloc(head);
-	rdp = this_cpu_ptr(&rcu_data);
-
-	/* Add the callback to our list. */
-	if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
-		// This can trigger due to call_rcu() from offline CPU:
-		WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
-		WARN_ON_ONCE(!rcu_is_watching());
-		// Very early boot, before rcu_init().  Initialize if needed
-		// and then drop through to queue the callback.
-		if (rcu_segcblist_empty(&rdp->cblist))
-			rcu_segcblist_init(&rdp->cblist);
-	}
-
-	check_cb_ovld(rdp);
-	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
-		return; // Enqueued onto ->nocb_bypass, so just leave.
-	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
-	rcu_segcblist_enqueue(&rdp->cblist, head);
-	if (__is_kvfree_rcu_offset((unsigned long)func))
-		trace_rcu_kvfree_callback(rcu_state.name, head,
-					 (unsigned long)func,
-					 rcu_segcblist_n_cbs(&rdp->cblist));
-	else
-		trace_rcu_callback(rcu_state.name, head,
-				   rcu_segcblist_n_cbs(&rdp->cblist));
-
-	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
-
-	/* Go handle any RCU core processing required. */
-	if (unlikely(rcu_rdp_is_offloaded(rdp))) {
-		__call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
-	} else {
-		__call_rcu_core(rdp, head, flags);
-		local_irq_restore(flags);
-	}
-}
-
 /**
  * call_rcu() - Queue an RCU callback for invocation after a grace period.
  * @head: structure to be used for queueing the RCU updates.
@@ -3103,7 +3049,66 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
  */
 void call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
-	__call_rcu(head, func);
+	static atomic_t doublefrees;
+	unsigned long flags;
+	struct rcu_data *rdp;
+	bool was_alldone;
+
+	/* Misaligned rcu_head! */
+	WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
+
+	if (debug_rcu_head_queue(head)) {
+		/*
+		 * Probable double call_rcu(), so leak the callback.
+		 * Use rcu:rcu_callback trace event to find the previous
+		 * time callback was passed to call_rcu().
+		 */
+		if (atomic_inc_return(&doublefrees) < 4) {
+			pr_err("%s(): Double-freed CB %p->%pS()!!!  ", __func__, head, head->func);
+			mem_dump_obj(head);
+		}
+		WRITE_ONCE(head->func, rcu_leak_callback);
+		return;
+	}
+	head->func = func;
+	head->next = NULL;
+	kasan_record_aux_stack_noalloc(head);
+	local_irq_save(flags);
+	rdp = this_cpu_ptr(&rcu_data);
+
+	/* Add the callback to our list. */
+	if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
+		// This can trigger due to call_rcu() from offline CPU:
+		WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
+		WARN_ON_ONCE(!rcu_is_watching());
+		// Very early boot, before rcu_init().  Initialize if needed
+		// and then drop through to queue the callback.
+		if (rcu_segcblist_empty(&rdp->cblist))
+			rcu_segcblist_init(&rdp->cblist);
+	}
+
+	check_cb_ovld(rdp);
+	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+		return; // Enqueued onto ->nocb_bypass, so just leave.
+	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
+	rcu_segcblist_enqueue(&rdp->cblist, head);
+	if (__is_kvfree_rcu_offset((unsigned long)func))
+		trace_rcu_kvfree_callback(rcu_state.name, head,
+					 (unsigned long)func,
+					 rcu_segcblist_n_cbs(&rdp->cblist));
+	else
+		trace_rcu_callback(rcu_state.name, head,
+				   rcu_segcblist_n_cbs(&rdp->cblist));
+
+	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
+
+	/* Go handle any RCU core processing required. */
+	if (unlikely(rcu_rdp_is_offloaded(rdp))) {
+		__call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
+	} else {
+		__call_rcu_core(rdp, head, flags);
+		local_irq_restore(flags);
+	}
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -3984,13 +3989,16 @@ static void rcu_barrier_callback(struct rcu_head *rhp)
 }
 
 /*
- * Called with preemption disabled, and from cross-cpu IRQ context.
+ * If needed, entrain an rcu_barrier() callback on rdp->cblist.
  */
-static void rcu_barrier_func(void *cpu_in)
+static void rcu_barrier_entrain(struct rcu_data *rdp)
 {
-	uintptr_t cpu = (uintptr_t)cpu_in;
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	unsigned long gseq = READ_ONCE(rcu_state.barrier_sequence);
+	unsigned long lseq = READ_ONCE(rdp->barrier_seq_snap);
 
+	lockdep_assert_held(&rcu_state.barrier_lock);
+	if (rcu_seq_state(lseq) || !rcu_seq_state(gseq) || rcu_seq_ctr(lseq) != rcu_seq_ctr(gseq))
+		return;
 	rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence);
 	rdp->barrier_head.func = rcu_barrier_callback;
 	debug_rcu_head_queue(&rdp->barrier_head);
@@ -4000,10 +4008,26 @@ static void rcu_barrier_func(void *cpu_in)
 		atomic_inc(&rcu_state.barrier_cpu_count);
 	} else {
 		debug_rcu_head_unqueue(&rdp->barrier_head);
-		rcu_barrier_trace(TPS("IRQNQ"), -1,
-				  rcu_state.barrier_sequence);
+		rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_state.barrier_sequence);
 	}
 	rcu_nocb_unlock(rdp);
+	smp_store_release(&rdp->barrier_seq_snap, gseq);
+}
+
+/*
+ * Called with preemption disabled, and from cross-cpu IRQ context.
+ */
+static void rcu_barrier_handler(void *cpu_in)
+{
+	uintptr_t cpu = (uintptr_t)cpu_in;
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+
+	lockdep_assert_irqs_disabled();
+	WARN_ON_ONCE(cpu != rdp->cpu);
+	WARN_ON_ONCE(cpu != smp_processor_id());
+	raw_spin_lock(&rcu_state.barrier_lock);
+	rcu_barrier_entrain(rdp);
+	raw_spin_unlock(&rcu_state.barrier_lock);
 }
 
 /**
@@ -4017,6 +4041,8 @@ static void rcu_barrier_func(void *cpu_in)
 void rcu_barrier(void)
 {
 	uintptr_t cpu;
+	unsigned long flags;
+	unsigned long gseq;
 	struct rcu_data *rdp;
 	unsigned long s = rcu_seq_snap(&rcu_state.barrier_sequence);
 
@@ -4027,15 +4053,16 @@ void rcu_barrier(void)
 
 	/* Did someone else do our work for us? */
 	if (rcu_seq_done(&rcu_state.barrier_sequence, s)) {
-		rcu_barrier_trace(TPS("EarlyExit"), -1,
-				  rcu_state.barrier_sequence);
+		rcu_barrier_trace(TPS("EarlyExit"), -1, rcu_state.barrier_sequence);
 		smp_mb(); /* caller's subsequent code after above check. */
 		mutex_unlock(&rcu_state.barrier_mutex);
 		return;
 	}
 
 	/* Mark the start of the barrier operation. */
+	raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
 	rcu_seq_start(&rcu_state.barrier_sequence);
+	gseq = rcu_state.barrier_sequence;
 	rcu_barrier_trace(TPS("Inc1"), -1, rcu_state.barrier_sequence);
 
 	/*
@@ -4047,7 +4074,7 @@ void rcu_barrier(void)
 	 */
 	init_completion(&rcu_state.barrier_completion);
 	atomic_set(&rcu_state.barrier_cpu_count, 2);
-	cpus_read_lock();
+	raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
 
 	/*
 	 * Force each CPU with callbacks to register a new callback.
@@ -4056,29 +4083,31 @@ void rcu_barrier(void)
 	 */
 	for_each_possible_cpu(cpu) {
 		rdp = per_cpu_ptr(&rcu_data, cpu);
-		if (cpu_is_offline(cpu) &&
-		    !rcu_rdp_is_offloaded(rdp))
+retry:
+		if (smp_load_acquire(&rdp->barrier_seq_snap) == gseq)
 			continue;
-		if (rcu_segcblist_n_cbs(&rdp->cblist) && cpu_online(cpu)) {
-			rcu_barrier_trace(TPS("OnlineQ"), cpu,
-					  rcu_state.barrier_sequence);
-			smp_call_function_single(cpu, rcu_barrier_func, (void *)cpu, 1);
-		} else if (rcu_segcblist_n_cbs(&rdp->cblist) &&
-			   cpu_is_offline(cpu)) {
-			rcu_barrier_trace(TPS("OfflineNoCBQ"), cpu,
-					  rcu_state.barrier_sequence);
-			local_irq_disable();
-			rcu_barrier_func((void *)cpu);
-			local_irq_enable();
-		} else if (cpu_is_offline(cpu)) {
-			rcu_barrier_trace(TPS("OfflineNoCBNoQ"), cpu,
-					  rcu_state.barrier_sequence);
-		} else {
-			rcu_barrier_trace(TPS("OnlineNQ"), cpu,
-					  rcu_state.barrier_sequence);
+		raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
+		if (!rcu_segcblist_n_cbs(&rdp->cblist)) {
+			WRITE_ONCE(rdp->barrier_seq_snap, gseq);
+			raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
+			rcu_barrier_trace(TPS("NQ"), cpu, rcu_state.barrier_sequence);
+			continue;
 		}
+		if (!rcu_rdp_cpu_online(rdp)) {
+			rcu_barrier_entrain(rdp);
+			WARN_ON_ONCE(READ_ONCE(rdp->barrier_seq_snap) != gseq);
+			raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
+			rcu_barrier_trace(TPS("OfflineNoCBQ"), cpu, rcu_state.barrier_sequence);
+			continue;
+		}
+		raw_spin_unlock_irqrestore(&rcu_state.barrier_lock, flags);
+		if (smp_call_function_single(cpu, rcu_barrier_handler, (void *)cpu, 1)) {
+			schedule_timeout_uninterruptible(1);
+			goto retry;
+		}
+		WARN_ON_ONCE(READ_ONCE(rdp->barrier_seq_snap) != gseq);
+		rcu_barrier_trace(TPS("OnlineQ"), cpu, rcu_state.barrier_sequence);
 	}
-	cpus_read_unlock();
 
 	/*
 	 * Now that we have an rcu_barrier_callback() callback on each
@@ -4093,6 +4122,12 @@ void rcu_barrier(void)
 	/* Mark the end of the barrier operation. */
 	rcu_barrier_trace(TPS("Inc2"), -1, rcu_state.barrier_sequence);
 	rcu_seq_end(&rcu_state.barrier_sequence);
+	gseq = rcu_state.barrier_sequence;
+	for_each_possible_cpu(cpu) {
+		rdp = per_cpu_ptr(&rcu_data, cpu);
+
+		WRITE_ONCE(rdp->barrier_seq_snap, gseq);
+	}
 
 	/* Other rcu_barrier() invocations can now safely proceed. */
 	mutex_unlock(&rcu_state.barrier_mutex);
@@ -4140,6 +4175,7 @@ rcu_boot_init_percpu_data(int cpu)
 	INIT_WORK(&rdp->strict_work, strict_work_handler);
 	WARN_ON_ONCE(rdp->dynticks_nesting != 1);
 	WARN_ON_ONCE(rcu_dynticks_in_eqs(rcu_dynticks_snap(rdp)));
+	rdp->barrier_seq_snap = rcu_state.barrier_sequence;
 	rdp->rcu_ofl_gp_seq = rcu_state.gp_seq;
 	rdp->rcu_ofl_gp_flags = RCU_GP_CLEANED;
 	rdp->rcu_onl_gp_seq = rcu_state.gp_seq;
@@ -4287,12 +4323,13 @@ void rcu_cpu_starting(unsigned int cpu)
 
 	rnp = rdp->mynode;
 	mask = rdp->grpmask;
-	WRITE_ONCE(rnp->ofl_seq, rnp->ofl_seq + 1);
-	WARN_ON_ONCE(!(rnp->ofl_seq & 0x1));
+	local_irq_save(flags);
+	arch_spin_lock(&rcu_state.ofl_lock);
 	rcu_dynticks_eqs_online();
-	smp_mb(); // Pair with rcu_gp_cleanup()'s ->ofl_seq barrier().
-	raw_spin_lock_irqsave_rcu_node(rnp, flags);
+	raw_spin_lock(&rcu_state.barrier_lock);
+	raw_spin_lock_rcu_node(rnp);
 	WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext | mask);
+	raw_spin_unlock(&rcu_state.barrier_lock);
 	newcpu = !(rnp->expmaskinitnext & mask);
 	rnp->expmaskinitnext |= mask;
 	/* Allow lockless access for expedited grace periods. */
@@ -4304,15 +4341,18 @@ void rcu_cpu_starting(unsigned int cpu)
 
 	/* An incoming CPU should never be blocking a grace period. */
 	if (WARN_ON_ONCE(rnp->qsmask & mask)) { /* RCU waiting on incoming CPU? */
+		/* rcu_report_qs_rnp() *really* wants some flags to restore */
+		unsigned long flags2;
+
+		local_irq_save(flags2);
 		rcu_disable_urgency_upon_qs(rdp);
 		/* Report QS -after- changing ->qsmaskinitnext! */
-		rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
+		rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags2);
 	} else {
-		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+		raw_spin_unlock_rcu_node(rnp);
 	}
-	smp_mb(); // Pair with rcu_gp_cleanup()'s ->ofl_seq barrier().
-	WRITE_ONCE(rnp->ofl_seq, rnp->ofl_seq + 1);
-	WARN_ON_ONCE(rnp->ofl_seq & 0x1);
+	arch_spin_unlock(&rcu_state.ofl_lock);
+	local_irq_restore(flags);
 	smp_mb(); /* Ensure RCU read-side usage follows above initialization. */
 }
 
@@ -4326,7 +4366,7 @@ void rcu_cpu_starting(unsigned int cpu)
  */
 void rcu_report_dead(unsigned int cpu)
 {
-	unsigned long flags;
+	unsigned long flags, seq_flags;
 	unsigned long mask;
 	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 	struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rdp & rnp. */
@@ -4340,10 +4380,8 @@ void rcu_report_dead(unsigned int cpu)
 
 	/* Remove outgoing CPU from mask in the leaf rcu_node structure. */
 	mask = rdp->grpmask;
-	WRITE_ONCE(rnp->ofl_seq, rnp->ofl_seq + 1);
-	WARN_ON_ONCE(!(rnp->ofl_seq & 0x1));
-	smp_mb(); // Pair with rcu_gp_cleanup()'s ->ofl_seq barrier().
-	raw_spin_lock(&rcu_state.ofl_lock);
+	local_irq_save(seq_flags);
+	arch_spin_lock(&rcu_state.ofl_lock);
 	raw_spin_lock_irqsave_rcu_node(rnp, flags); /* Enforce GP memory-order guarantee. */
 	rdp->rcu_ofl_gp_seq = READ_ONCE(rcu_state.gp_seq);
 	rdp->rcu_ofl_gp_flags = READ_ONCE(rcu_state.gp_flags);
@@ -4354,10 +4392,8 @@ void rcu_report_dead(unsigned int cpu)
 	}
 	WRITE_ONCE(rnp->qsmaskinitnext, rnp->qsmaskinitnext & ~mask);
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-	raw_spin_unlock(&rcu_state.ofl_lock);
-	smp_mb(); // Pair with rcu_gp_cleanup()'s ->ofl_seq barrier().
-	WRITE_ONCE(rnp->ofl_seq, rnp->ofl_seq + 1);
-	WARN_ON_ONCE(rnp->ofl_seq & 0x1);
+	arch_spin_unlock(&rcu_state.ofl_lock);
+	local_irq_restore(seq_flags);
 
 	rdp->cpu_started = false;
 }
@@ -4380,7 +4416,9 @@ void rcutree_migrate_callbacks(int cpu)
 	    rcu_segcblist_empty(&rdp->cblist))
 		return;  /* No callbacks to migrate. */
 
-	local_irq_save(flags);
+	raw_spin_lock_irqsave(&rcu_state.barrier_lock, flags);
+	WARN_ON_ONCE(rcu_rdp_cpu_online(rdp));
+	rcu_barrier_entrain(rdp);
 	my_rdp = this_cpu_ptr(&rcu_data);
 	my_rnp = my_rdp->mynode;
 	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
@@ -4390,10 +4428,10 @@ void rcutree_migrate_callbacks(int cpu)
 	needwake = rcu_advance_cbs(my_rnp, rdp) ||
 		   rcu_advance_cbs(my_rnp, my_rdp);
 	rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist);
+	raw_spin_unlock(&rcu_state.barrier_lock); /* irqs remain disabled. */
 	needwake = needwake || rcu_advance_cbs(my_rnp, my_rdp);
 	rcu_segcblist_disable(&rdp->cblist);
-	WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) !=
-		     !rcu_segcblist_n_cbs(&my_rdp->cblist));
+	WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) != !rcu_segcblist_n_cbs(&my_rdp->cblist));
 	if (rcu_rdp_is_offloaded(my_rdp)) {
 		raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */
 		__call_rcu_nocb_wake(my_rdp, true, flags);
@@ -4440,26 +4478,10 @@ static int rcu_pm_notify(struct notifier_block *self,
 static int __init rcu_spawn_gp_kthread(void)
 {
 	unsigned long flags;
-	int kthread_prio_in = kthread_prio;
 	struct rcu_node *rnp;
 	struct sched_param sp;
 	struct task_struct *t;
 
-	/* Force priority into range. */
-	if (IS_ENABLED(CONFIG_RCU_BOOST) && kthread_prio < 2
-	    && IS_BUILTIN(CONFIG_RCU_TORTURE_TEST))
-		kthread_prio = 2;
-	else if (IS_ENABLED(CONFIG_RCU_BOOST) && kthread_prio < 1)
-		kthread_prio = 1;
-	else if (kthread_prio < 0)
-		kthread_prio = 0;
-	else if (kthread_prio > 99)
-		kthread_prio = 99;
-
-	if (kthread_prio != kthread_prio_in)
-		pr_alert("rcu_spawn_gp_kthread(): Limited prio to %d from %d\n",
-			 kthread_prio, kthread_prio_in);
-
 	rcu_scheduler_fully_active = 1;
 	t = kthread_create(rcu_gp_kthread, NULL, "%s", rcu_state.name);
 	if (WARN_ONCE(IS_ERR(t), "%s: Could not start grace-period kthread, OOM is now expected behavior\n", __func__))
@@ -4570,6 +4592,7 @@ static void __init rcu_init_one(void)
 			init_waitqueue_head(&rnp->exp_wq[2]);
 			init_waitqueue_head(&rnp->exp_wq[3]);
 			spin_lock_init(&rnp->exp_lock);
+			mutex_init(&rnp->boost_kthread_mutex);
 		}
 	}
 
@@ -4585,6 +4608,28 @@ static void __init rcu_init_one(void)
 }
 
 /*
+ * Force priority from the kernel command-line into range.
+ */
+static void __init sanitize_kthread_prio(void)
+{
+	int kthread_prio_in = kthread_prio;
+
+	if (IS_ENABLED(CONFIG_RCU_BOOST) && kthread_prio < 2
+	    && IS_BUILTIN(CONFIG_RCU_TORTURE_TEST))
+		kthread_prio = 2;
+	else if (IS_ENABLED(CONFIG_RCU_BOOST) && kthread_prio < 1)
+		kthread_prio = 1;
+	else if (kthread_prio < 0)
+		kthread_prio = 0;
+	else if (kthread_prio > 99)
+		kthread_prio = 99;
+
+	if (kthread_prio != kthread_prio_in)
+		pr_alert("%s: Limited prio to %d from %d\n",
+			 __func__, kthread_prio, kthread_prio_in);
+}
+
+/*
  * Compute the rcu_node tree geometry from kernel parameters.  This cannot
  * replace the definitions in tree.h because those are needed to size
  * the ->node array in the rcu_state structure.
@@ -4744,6 +4789,7 @@ void __init rcu_init(void)
 
 	kfree_rcu_batch_init();
 	rcu_bootup_announce();
+	sanitize_kthread_prio();
 	rcu_init_geometry();
 	rcu_init_one();
 	if (dump_tree)
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 486fc90..926673e 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -56,8 +56,6 @@ struct rcu_node {
 				/*  Initialized from ->qsmaskinitnext at the */
 				/*  beginning of each grace period. */
 	unsigned long qsmaskinitnext;
-	unsigned long ofl_seq;	/* CPU-hotplug operation sequence count. */
-				/* Online CPUs for next grace period. */
 	unsigned long expmask;	/* CPUs or groups that need to check in */
 				/*  to allow the current expedited GP */
 				/*  to complete. */
@@ -110,6 +108,9 @@ struct rcu_node {
 				/*  side effect, not as a lock. */
 	unsigned long boost_time;
 				/* When to start boosting (jiffies). */
+	struct mutex boost_kthread_mutex;
+				/* Exclusion for thread spawning and affinity */
+				/*  manipulation. */
 	struct task_struct *boost_kthread_task;
 				/* kthread that takes care of priority */
 				/*  boosting for this rcu_node structure. */
@@ -190,6 +191,7 @@ struct rcu_data {
 	bool rcu_forced_tick_exp;	/*   ... provide QS to expedited GP. */
 
 	/* 4) rcu_barrier(), OOM callbacks, and expediting. */
+	unsigned long barrier_seq_snap;	/* Snap of rcu_state.barrier_sequence. */
 	struct rcu_head barrier_head;
 	int exp_dynticks_snap;		/* Double-check need for IPI. */
 
@@ -203,6 +205,8 @@ struct rcu_data {
 	int nocb_defer_wakeup;		/* Defer wakeup of nocb_kthread. */
 	struct timer_list nocb_timer;	/* Enforce finite deferral. */
 	unsigned long nocb_gp_adv_time;	/* Last call_rcu() CB adv (jiffies). */
+	struct mutex nocb_gp_kthread_mutex; /* Exclusion for nocb gp kthread */
+					    /* spawning */
 
 	/* The following fields are used by call_rcu, hence own cacheline. */
 	raw_spinlock_t nocb_bypass_lock ____cacheline_internodealigned_in_smp;
@@ -237,6 +241,7 @@ struct rcu_data {
 					/* rcuc per-CPU kthread or NULL. */
 	unsigned int rcu_cpu_kthread_status;
 	char rcu_cpu_has_work;
+	unsigned long rcuc_activity;
 
 	/* 7) Diagnostic data, including RCU CPU stall warnings. */
 	unsigned int softirq_snap;	/* Snapshot of softirq activity. */
@@ -302,9 +307,8 @@ struct rcu_state {
 
 	/* The following fields are guarded by the root rcu_node's lock. */
 
-	u8	boost ____cacheline_internodealigned_in_smp;
-						/* Subject to priority boost. */
-	unsigned long gp_seq;			/* Grace-period sequence #. */
+	unsigned long gp_seq ____cacheline_internodealigned_in_smp;
+						/* Grace-period sequence #. */
 	unsigned long gp_max;			/* Maximum GP duration in */
 						/*  jiffies. */
 	struct task_struct *gp_kthread;		/* Task for grace periods. */
@@ -323,6 +327,8 @@ struct rcu_state {
 						/*  rcu_barrier(). */
 	/* End of fields guarded by barrier_mutex. */
 
+	raw_spinlock_t barrier_lock;		/* Protects ->barrier_seq_snap. */
+
 	struct mutex exp_mutex;			/* Serialize expedited GP. */
 	struct mutex exp_wake_mutex;		/* Serialize wakeup. */
 	unsigned long expedited_sequence;	/* Take a ticket. */
@@ -355,7 +361,7 @@ struct rcu_state {
 	const char *name;			/* Name of structure. */
 	char abbr;				/* Abbreviated name. */
 
-	raw_spinlock_t ofl_lock ____cacheline_internodealigned_in_smp;
+	arch_spinlock_t ofl_lock ____cacheline_internodealigned_in_smp;
 						/* Synchronize offline with */
 						/*  GP pre-initialization. */
 };
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index 237a799..60197ea 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -502,7 +502,8 @@ static void synchronize_rcu_expedited_wait(void)
 		if (synchronize_rcu_expedited_wait_once(1))
 			return;
 		rcu_for_each_leaf_node(rnp) {
-			for_each_leaf_node_cpu_mask(rnp, cpu, rnp->expmask) {
+			mask = READ_ONCE(rnp->expmask);
+			for_each_leaf_node_cpu_mask(rnp, cpu, mask) {
 				rdp = per_cpu_ptr(&rcu_data, cpu);
 				if (rdp->rcu_forced_tick_exp)
 					continue;
@@ -656,7 +657,7 @@ static void rcu_exp_handler(void *unused)
 	 */
 	if (!depth) {
 		if (!(preempt_count() & (PREEMPT_MASK | SOFTIRQ_MASK)) ||
-		    rcu_dynticks_curr_cpu_in_eqs()) {
+		    rcu_is_cpu_rrupt_from_idle()) {
 			rcu_report_exp_rdp(rdp);
 		} else {
 			WRITE_ONCE(rdp->cpu_no_qs.b.exp, true);
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index eeafb54..636d054 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -1169,7 +1169,7 @@ void __init rcu_init_nohz(void)
 	struct rcu_data *rdp;
 
 #if defined(CONFIG_NO_HZ_FULL)
-	if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
+	if (tick_nohz_full_running && !cpumask_empty(tick_nohz_full_mask))
 		need_rcu_nocb_mask = true;
 #endif /* #if defined(CONFIG_NO_HZ_FULL) */
 
@@ -1226,6 +1226,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 	raw_spin_lock_init(&rdp->nocb_gp_lock);
 	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
 	rcu_cblist_init(&rdp->nocb_bypass);
+	mutex_init(&rdp->nocb_gp_kthread_mutex);
 }
 
 /*
@@ -1238,6 +1239,7 @@ static void rcu_spawn_cpu_nocb_kthread(int cpu)
 	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 	struct rcu_data *rdp_gp;
 	struct task_struct *t;
+	struct sched_param sp;
 
 	if (!rcu_scheduler_fully_active || !rcu_nocb_is_setup)
 		return;
@@ -1247,20 +1249,30 @@ static void rcu_spawn_cpu_nocb_kthread(int cpu)
 		return;
 
 	/* If we didn't spawn the GP kthread first, reorganize! */
+	sp.sched_priority = kthread_prio;
 	rdp_gp = rdp->nocb_gp_rdp;
+	mutex_lock(&rdp_gp->nocb_gp_kthread_mutex);
 	if (!rdp_gp->nocb_gp_kthread) {
 		t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
 				"rcuog/%d", rdp_gp->cpu);
-		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
+		if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__)) {
+			mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
 			return;
+		}
 		WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
+		if (kthread_prio)
+			sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
 	}
+	mutex_unlock(&rdp_gp->nocb_gp_kthread_mutex);
 
 	/* Spawn the kthread for this CPU. */
 	t = kthread_run(rcu_nocb_cb_kthread, rdp,
 			"rcuo%c/%d", rcu_state.abbr, cpu);
 	if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
 		return;
+
+	if (kthread_prio)
+		sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
 	WRITE_ONCE(rdp->nocb_cb_kthread, t);
 	WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
 }
@@ -1348,7 +1360,7 @@ static void __init rcu_organize_nocb_kthreads(void)
  */
 void rcu_bind_current_to_nocb(void)
 {
-	if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
+	if (cpumask_available(rcu_nocb_mask) && !cpumask_empty(rcu_nocb_mask))
 		WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
 }
 EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index c5b45c2..6082dd2 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -330,7 +330,7 @@ void rcu_note_context_switch(bool preempt)
 		 * then queue the task as required based on the states
 		 * of any ongoing and expedited grace periods.
 		 */
-		WARN_ON_ONCE((rdp->grpmask & rcu_rnp_online_cpus(rnp)) == 0);
+		WARN_ON_ONCE(!rcu_rdp_cpu_online(rdp));
 		WARN_ON_ONCE(!list_empty(&t->rcu_node_entry));
 		trace_rcu_preempt_task(rcu_state.name,
 				       t->pid,
@@ -556,16 +556,16 @@ rcu_preempt_deferred_qs_irqrestore(struct task_struct *t, unsigned long flags)
 			raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 		}
 
-		/* Unboost if we were boosted. */
-		if (IS_ENABLED(CONFIG_RCU_BOOST) && drop_boost_mutex)
-			rt_mutex_futex_unlock(&rnp->boost_mtx.rtmutex);
-
 		/*
 		 * If this was the last task on the expedited lists,
 		 * then we need to report up the rcu_node hierarchy.
 		 */
 		if (!empty_exp && empty_exp_now)
 			rcu_report_exp_rnp(rnp, true);
+
+		/* Unboost if we were boosted. */
+		if (IS_ENABLED(CONFIG_RCU_BOOST) && drop_boost_mutex)
+			rt_mutex_futex_unlock(&rnp->boost_mtx.rtmutex);
 	} else {
 		local_irq_restore(flags);
 	}
@@ -773,7 +773,6 @@ dump_blkd_tasks(struct rcu_node *rnp, int ncheck)
 	int cpu;
 	int i;
 	struct list_head *lhp;
-	bool onl;
 	struct rcu_data *rdp;
 	struct rcu_node *rnp1;
 
@@ -797,9 +796,8 @@ dump_blkd_tasks(struct rcu_node *rnp, int ncheck)
 	pr_cont("\n");
 	for (cpu = rnp->grplo; cpu <= rnp->grphi; cpu++) {
 		rdp = per_cpu_ptr(&rcu_data, cpu);
-		onl = !!(rdp->grpmask & rcu_rnp_online_cpus(rnp));
 		pr_info("\t%d: %c online: %ld(%d) offline: %ld(%d)\n",
-			cpu, ".o"[onl],
+			cpu, ".o"[rcu_rdp_cpu_online(rdp)],
 			(long)rdp->rcu_onl_gp_seq, rdp->rcu_onl_gp_flags,
 			(long)rdp->rcu_ofl_gp_seq, rdp->rcu_ofl_gp_flags);
 	}
@@ -996,12 +994,15 @@ dump_blkd_tasks(struct rcu_node *rnp, int ncheck)
  */
 static void rcu_cpu_kthread_setup(unsigned int cpu)
 {
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 #ifdef CONFIG_RCU_BOOST
 	struct sched_param sp;
 
 	sp.sched_priority = kthread_prio;
 	sched_setscheduler_nocheck(current, SCHED_FIFO, &sp);
 #endif /* #ifdef CONFIG_RCU_BOOST */
+
+	WRITE_ONCE(rdp->rcuc_activity, jiffies);
 }
 
 #ifdef CONFIG_RCU_BOOST
@@ -1172,15 +1173,14 @@ static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
 	struct sched_param sp;
 	struct task_struct *t;
 
+	mutex_lock(&rnp->boost_kthread_mutex);
 	if (rnp->boost_kthread_task || !rcu_scheduler_fully_active)
-		return;
-
-	rcu_state.boost = 1;
+		goto out;
 
 	t = kthread_create(rcu_boost_kthread, (void *)rnp,
 			   "rcub/%d", rnp_index);
 	if (WARN_ON_ONCE(IS_ERR(t)))
-		return;
+		goto out;
 
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
 	rnp->boost_kthread_task = t;
@@ -1188,6 +1188,9 @@ static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
 	sp.sched_priority = kthread_prio;
 	sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
 	wake_up_process(t); /* get to TASK_INTERRUPTIBLE quickly. */
+
+ out:
+	mutex_unlock(&rnp->boost_kthread_mutex);
 }
 
 /*
@@ -1210,14 +1213,16 @@ static void rcu_boost_kthread_setaffinity(struct rcu_node *rnp, int outgoingcpu)
 		return;
 	if (!zalloc_cpumask_var(&cm, GFP_KERNEL))
 		return;
+	mutex_lock(&rnp->boost_kthread_mutex);
 	for_each_leaf_node_possible_cpu(rnp, cpu)
 		if ((mask & leaf_node_cpu_bit(rnp, cpu)) &&
 		    cpu != outgoingcpu)
 			cpumask_set_cpu(cpu, cm);
 	cpumask_and(cm, cm, housekeeping_cpumask(HK_FLAG_RCU));
-	if (cpumask_weight(cm) == 0)
+	if (cpumask_empty(cm))
 		cpumask_copy(cm, housekeeping_cpumask(HK_FLAG_RCU));
 	set_cpus_allowed_ptr(t, cm);
+	mutex_unlock(&rnp->boost_kthread_mutex);
 	free_cpumask_var(cm);
 }
 
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index 21bebf7..0c5d851 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -379,6 +379,15 @@ static bool rcu_is_gp_kthread_starving(unsigned long *jp)
 	return j > 2 * HZ;
 }
 
+static bool rcu_is_rcuc_kthread_starving(struct rcu_data *rdp, unsigned long *jp)
+{
+	unsigned long j = jiffies - READ_ONCE(rdp->rcuc_activity);
+
+	if (jp)
+		*jp = j;
+	return j > 2 * HZ;
+}
+
 /*
  * Print out diagnostic information for the specified stalled CPU.
  *
@@ -430,6 +439,29 @@ static void print_cpu_stall_info(int cpu)
 	       falsepositive ? " (false positive?)" : "");
 }
 
+static void rcuc_kthread_dump(struct rcu_data *rdp)
+{
+	int cpu;
+	unsigned long j;
+	struct task_struct *rcuc;
+
+	rcuc = rdp->rcu_cpu_kthread_task;
+	if (!rcuc)
+		return;
+
+	cpu = task_cpu(rcuc);
+	if (cpu_is_offline(cpu) || idle_cpu(cpu))
+		return;
+
+	if (!rcu_is_rcuc_kthread_starving(rdp, &j))
+		return;
+
+	pr_err("%s kthread starved for %ld jiffies\n", rcuc->comm, j);
+	sched_show_task(rcuc);
+	if (!trigger_single_cpu_backtrace(cpu))
+		dump_cpu_task(cpu);
+}
+
 /* Complain about starvation of grace-period kthread.  */
 static void rcu_check_gp_kthread_starvation(void)
 {
@@ -601,6 +633,9 @@ static void print_cpu_stall(unsigned long gps)
 	rcu_check_gp_kthread_expired_fqs_timer();
 	rcu_check_gp_kthread_starvation();
 
+	if (!use_softirq)
+		rcuc_kthread_dump(rdp);
+
 	rcu_dump_cpu_stacks();
 
 	raw_spin_lock_irqsave_rcu_node(rnp, flags);
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index 156892c..180ff9c 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -407,6 +407,13 @@ void __wait_rcu_gp(bool checktiny, int n, call_rcu_func_t *crcu_array,
 }
 EXPORT_SYMBOL_GPL(__wait_rcu_gp);
 
+void finish_rcuwait(struct rcuwait *w)
+{
+	rcu_assign_pointer(w->task, NULL);
+	__set_current_state(TASK_RUNNING);
+}
+EXPORT_SYMBOL_GPL(finish_rcuwait);
+
 #ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
 void init_rcu_head(struct rcu_head *head)
 {
diff --git a/kernel/torture.c b/kernel/torture.c
index ef27a6c..789aeb0 100644
--- a/kernel/torture.c
+++ b/kernel/torture.c
@@ -911,7 +911,7 @@ void torture_kthread_stopping(char *title)
 {
 	char buf[128];
 
-	snprintf(buf, sizeof(buf), "Stopping %s", title);
+	snprintf(buf, sizeof(buf), "%s is stopping", title);
 	VERBOSE_TOROUT_STRING(buf);
 	while (!kthread_should_stop()) {
 		torture_shutdown_absorb(title);
@@ -931,12 +931,14 @@ int _torture_create_kthread(int (*fn)(void *arg), void *arg, char *s, char *m,
 	int ret = 0;
 
 	VERBOSE_TOROUT_STRING(m);
-	*tp = kthread_run(fn, arg, "%s", s);
+	*tp = kthread_create(fn, arg, "%s", s);
 	if (IS_ERR(*tp)) {
 		ret = PTR_ERR(*tp);
 		TOROUT_ERRSTRING(f);
 		*tp = NULL;
+		return ret;
 	}
+	wake_up_process(*tp);  // Process is sleeping, so ordering provided.
 	torture_shuffle_task_register(*tp);
 	return ret;
 }