Merge tag 'printk-for-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/printk/linux

Pull printk updates from Petr Mladek:
 "Improve the behavior during panic. The issues were found when testing
  the ongoing changes introducing atomic consoles and printk kthreads:

   - pr_flush() has to wait for the last reserved record instead of the
     last finalized one. Note that records are finalized in random order
     when generated by more CPUs in parallel.

   - Ignore non-finalized records during panic(). Messages printed on
     panic-CPU are always finalized. Messages printed by other CPUs
     might never be finalized when the CPUs get stopped.

   - Block new printk() calls on non-panic CPUs completely. Backtraces
     are printed before entering the panic mode. Later messages would
     just mess information printed by the panic CPU.

   - Do not take console_lock in console_flush_on_panic() at all. The
     original code did try_lock()/console_unlock(). The unlock part
     might cause a deadlock when panic() happened in a scheduler code.

   - Fix conversion of 64-bit sequence number for 32-bit atomic
     operations"

* tag 'printk-for-6.9' of git://git.kernel.org/pub/scm/linux/kernel/git/printk/linux:
  dump_stack: Do not get cpu_sync for panic CPU
  panic: Flush kernel log buffer at the end
  printk: Avoid non-panic CPUs writing to ringbuffer
  printk: Disable passing console lock owner completely during panic()
  printk: ringbuffer: Skip non-finalized records in panic
  printk: Wait for all reserved records with pr_flush()
  printk: ringbuffer: Cleanup reader terminology
  printk: Add this_cpu_in_panic()
  printk: For @suppress_panic_printk check for other CPU in panic
  printk: ringbuffer: Clarify special lpos values
  printk: ringbuffer: Do not skip non-finalized records with prb_next_seq()
  printk: Use prb_first_seq() as base for 32bit seq macros
  printk: Adjust mapping for 32bit seq macros
  printk: nbcon: Relocate 32bit seq macros
diff --git a/include/linux/printk.h b/include/linux/printk.h
index 8ef499a..955e318 100644
--- a/include/linux/printk.h
+++ b/include/linux/printk.h
@@ -273,6 +273,8 @@
 }
 #endif
 
+bool this_cpu_in_panic(void);
+
 #ifdef CONFIG_SMP
 extern int __printk_cpu_sync_try_get(void);
 extern void __printk_cpu_sync_wait(void);
diff --git a/kernel/panic.c b/kernel/panic.c
index 2807639..f22d8f3 100644
--- a/kernel/panic.c
+++ b/kernel/panic.c
@@ -446,6 +446,14 @@
 
 	/* Do not scroll important messages printed above */
 	suppress_printk = 1;
+
+	/*
+	 * The final messages may not have been printed if in a context that
+	 * defers printing (such as NMI) and irq_work is not available.
+	 * Explicitly flush the kernel log buffer one last time.
+	 */
+	console_flush_on_panic(CONSOLE_FLUSH_PENDING);
+
 	local_irq_enable();
 	for (i = 0; ; i += PANIC_TIMER_STEP) {
 		touch_softlockup_watchdog();
diff --git a/kernel/printk/nbcon.c b/kernel/printk/nbcon.c
index b960771..c8093bc 100644
--- a/kernel/printk/nbcon.c
+++ b/kernel/printk/nbcon.c
@@ -140,39 +140,6 @@
 	return atomic_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_state), &cur->atom, new->atom);
 }
 
-#ifdef CONFIG_64BIT
-
-#define __seq_to_nbcon_seq(seq) (seq)
-#define __nbcon_seq_to_seq(seq) (seq)
-
-#else /* CONFIG_64BIT */
-
-#define __seq_to_nbcon_seq(seq) ((u32)seq)
-
-static inline u64 __nbcon_seq_to_seq(u32 nbcon_seq)
-{
-	u64 seq;
-	u64 rb_next_seq;
-
-	/*
-	 * The provided sequence is only the lower 32 bits of the ringbuffer
-	 * sequence. It needs to be expanded to 64bit. Get the next sequence
-	 * number from the ringbuffer and fold it.
-	 *
-	 * Having a 32bit representation in the console is sufficient.
-	 * If a console ever gets more than 2^31 records behind
-	 * the ringbuffer then this is the least of the problems.
-	 *
-	 * Also the access to the ring buffer is always safe.
-	 */
-	rb_next_seq = prb_next_seq(prb);
-	seq = rb_next_seq - ((u32)rb_next_seq - nbcon_seq);
-
-	return seq;
-}
-
-#endif /* CONFIG_64BIT */
-
 /**
  * nbcon_seq_read - Read the current console sequence
  * @con:	Console to read the sequence of
@@ -183,7 +150,7 @@
 {
 	unsigned long nbcon_seq = atomic_long_read(&ACCESS_PRIVATE(con, nbcon_seq));
 
-	return __nbcon_seq_to_seq(nbcon_seq);
+	return __ulseq_to_u64seq(prb, nbcon_seq);
 }
 
 /**
@@ -204,7 +171,7 @@
 	 */
 	u64 valid_seq = max_t(u64, seq, prb_first_valid_seq(prb));
 
-	atomic_long_set(&ACCESS_PRIVATE(con, nbcon_seq), __seq_to_nbcon_seq(valid_seq));
+	atomic_long_set(&ACCESS_PRIVATE(con, nbcon_seq), __u64seq_to_ulseq(valid_seq));
 
 	/* Clear con->seq since nbcon consoles use con->nbcon_seq instead. */
 	con->seq = 0;
@@ -223,11 +190,11 @@
  */
 static void nbcon_seq_try_update(struct nbcon_context *ctxt, u64 new_seq)
 {
-	unsigned long nbcon_seq = __seq_to_nbcon_seq(ctxt->seq);
+	unsigned long nbcon_seq = __u64seq_to_ulseq(ctxt->seq);
 	struct console *con = ctxt->console;
 
 	if (atomic_long_try_cmpxchg(&ACCESS_PRIVATE(con, nbcon_seq), &nbcon_seq,
-				    __seq_to_nbcon_seq(new_seq))) {
+				    __u64seq_to_ulseq(new_seq))) {
 		ctxt->seq = new_seq;
 	} else {
 		ctxt->seq = nbcon_seq_read(con);
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 1c6e7df..b06f63e 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -347,6 +347,29 @@
 	return unlikely(atomic_read(&panic_cpu) != PANIC_CPU_INVALID);
 }
 
+/* Return true if a panic is in progress on the current CPU. */
+bool this_cpu_in_panic(void)
+{
+	/*
+	 * We can use raw_smp_processor_id() here because it is impossible for
+	 * the task to be migrated to the panic_cpu, or away from it. If
+	 * panic_cpu has already been set, and we're not currently executing on
+	 * that CPU, then we never will be.
+	 */
+	return unlikely(atomic_read(&panic_cpu) == raw_smp_processor_id());
+}
+
+/*
+ * Return true if a panic is in progress on a remote CPU.
+ *
+ * On true, the local CPU should immediately release any printing resources
+ * that may be needed by the panic CPU.
+ */
+bool other_cpu_in_panic(void)
+{
+	return (panic_in_progress() && !this_cpu_in_panic());
+}
+
 /*
  * This is used for debugging the mess that is the VT code by
  * keeping track if we have the console semaphore held. It's
@@ -439,12 +462,6 @@
 static DEFINE_MUTEX(syslog_lock);
 
 #ifdef CONFIG_PRINTK
-/*
- * During panic, heavy printk by other CPUs can delay the
- * panic and risk deadlock on console resources.
- */
-static int __read_mostly suppress_panic_printk;
-
 DECLARE_WAIT_QUEUE_HEAD(log_wait);
 /* All 3 protected by @syslog_lock. */
 /* the next printk record to read by syslog(READ) or /proc/kmsg */
@@ -1835,10 +1852,23 @@
  */
 static void console_lock_spinning_enable(void)
 {
+	/*
+	 * Do not use spinning in panic(). The panic CPU wants to keep the lock.
+	 * Non-panic CPUs abandon the flush anyway.
+	 *
+	 * Just keep the lockdep annotation. The panic-CPU should avoid
+	 * taking console_owner_lock because it might cause a deadlock.
+	 * This looks like the easiest way how to prevent false lockdep
+	 * reports without handling races a lockless way.
+	 */
+	if (panic_in_progress())
+		goto lockdep;
+
 	raw_spin_lock(&console_owner_lock);
 	console_owner = current;
 	raw_spin_unlock(&console_owner_lock);
 
+lockdep:
 	/* The waiter may spin on us after setting console_owner */
 	spin_acquire(&console_owner_dep_map, 0, 0, _THIS_IP_);
 }
@@ -1863,6 +1893,22 @@
 {
 	int waiter;
 
+	/*
+	 * Ignore spinning waiters during panic() because they might get stopped
+	 * or blocked at any time,
+	 *
+	 * It is safe because nobody is allowed to start spinning during panic
+	 * in the first place. If there has been a waiter then non panic CPUs
+	 * might stay spinning. They would get stopped anyway. The panic context
+	 * will never start spinning and an interrupted spin on panic CPU will
+	 * never continue.
+	 */
+	if (panic_in_progress()) {
+		/* Keep lockdep happy. */
+		spin_release(&console_owner_dep_map, _THIS_IP_);
+		return 0;
+	}
+
 	raw_spin_lock(&console_owner_lock);
 	waiter = READ_ONCE(console_waiter);
 	console_owner = NULL;
@@ -2259,8 +2305,12 @@
 	if (unlikely(suppress_printk))
 		return 0;
 
-	if (unlikely(suppress_panic_printk) &&
-	    atomic_read(&panic_cpu) != raw_smp_processor_id())
+	/*
+	 * The messages on the panic CPU are the most important. If
+	 * non-panic CPUs are generating any messages, they will be
+	 * silently dropped.
+	 */
+	if (other_cpu_in_panic())
 		return 0;
 
 	if (level == LOGLEVEL_SCHED) {
@@ -2590,26 +2640,6 @@
 	return 0;
 }
 
-/*
- * Return true if a panic is in progress on a remote CPU.
- *
- * On true, the local CPU should immediately release any printing resources
- * that may be needed by the panic CPU.
- */
-bool other_cpu_in_panic(void)
-{
-	if (!panic_in_progress())
-		return false;
-
-	/*
-	 * We can use raw_smp_processor_id() here because it is impossible for
-	 * the task to be migrated to the panic_cpu, or away from it. If
-	 * panic_cpu has already been set, and we're not currently executing on
-	 * that CPU, then we never will be.
-	 */
-	return atomic_read(&panic_cpu) != raw_smp_processor_id();
-}
-
 /**
  * console_lock - block the console subsystem from printing
  *
@@ -2765,8 +2795,6 @@
 bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
 			     bool is_extended, bool may_suppress)
 {
-	static int panic_console_dropped;
-
 	struct printk_buffers *pbufs = pmsg->pbufs;
 	const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf);
 	const size_t outbuf_sz = sizeof(pbufs->outbuf);
@@ -2794,17 +2822,6 @@
 	pmsg->seq = r.info->seq;
 	pmsg->dropped = r.info->seq - seq;
 
-	/*
-	 * Check for dropped messages in panic here so that printk
-	 * suppression can occur as early as possible if necessary.
-	 */
-	if (pmsg->dropped &&
-	    panic_in_progress() &&
-	    panic_console_dropped++ > 10) {
-		suppress_panic_printk = 1;
-		pr_warn_once("Too many dropped messages. Suppress messages on non-panic CPUs to prevent livelock.\n");
-	}
-
 	/* Skip record that has level above the console loglevel. */
 	if (may_suppress && suppress_message_printing(r.info->level))
 		goto out;
@@ -3750,7 +3767,7 @@
 
 	might_sleep();
 
-	seq = prb_next_seq(prb);
+	seq = prb_next_reserve_seq(prb);
 
 	/* Flush the consoles so that records up to @seq are printed. */
 	console_lock();
diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index fde3386..88e8f3a 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -6,6 +6,7 @@
 #include <linux/errno.h>
 #include <linux/bug.h>
 #include "printk_ringbuffer.h"
+#include "internal.h"
 
 /**
  * DOC: printk_ringbuffer overview
@@ -303,6 +304,9 @@
  *
  *   desc_push_tail:B / desc_reserve:D
  *     set descriptor reusable (state), then push descriptor tail (id)
+ *
+ *   desc_update_last_finalized:A / desc_last_finalized_seq:A
+ *     store finalized record, then set new highest finalized sequence number
  */
 
 #define DATA_SIZE(data_ring)		_DATA_SIZE((data_ring)->size_bits)
@@ -1030,9 +1034,13 @@
 	unsigned long next_lpos;
 
 	if (size == 0) {
-		/* Specify a data-less block. */
-		blk_lpos->begin = NO_LPOS;
-		blk_lpos->next = NO_LPOS;
+		/*
+		 * Data blocks are not created for empty lines. Instead, the
+		 * reader will recognize these special lpos values and handle
+		 * it appropriately.
+		 */
+		blk_lpos->begin = EMPTY_LINE_LPOS;
+		blk_lpos->next = EMPTY_LINE_LPOS;
 		return NULL;
 	}
 
@@ -1210,10 +1218,18 @@
 
 	/* Data-less data block description. */
 	if (BLK_DATALESS(blk_lpos)) {
-		if (blk_lpos->begin == NO_LPOS && blk_lpos->next == NO_LPOS) {
+		/*
+		 * Records that are just empty lines are also valid, even
+		 * though they do not have a data block. For such records
+		 * explicitly return empty string data to signify success.
+		 */
+		if (blk_lpos->begin == EMPTY_LINE_LPOS &&
+		    blk_lpos->next == EMPTY_LINE_LPOS) {
 			*data_size = 0;
 			return "";
 		}
+
+		/* Data lost, invalid, or otherwise unavailable. */
 		return NULL;
 	}
 
@@ -1442,19 +1458,117 @@
 }
 
 /*
+ * @last_finalized_seq value guarantees that all records up to and including
+ * this sequence number are finalized and can be read. The only exception are
+ * too old records which have already been overwritten.
+ *
+ * It is also guaranteed that @last_finalized_seq only increases.
+ *
+ * Be aware that finalized records following non-finalized records are not
+ * reported because they are not yet available to the reader. For example,
+ * a new record stored via printk() will not be available to a printer if
+ * it follows a record that has not been finalized yet. However, once that
+ * non-finalized record becomes finalized, @last_finalized_seq will be
+ * appropriately updated and the full set of finalized records will be
+ * available to the printer. And since each printk() caller will either
+ * directly print or trigger deferred printing of all available unprinted
+ * records, all printk() messages will get printed.
+ */
+static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	unsigned long ulseq;
+
+	/*
+	 * Guarantee the sequence number is loaded before loading the
+	 * associated record in order to guarantee that the record can be
+	 * seen by this CPU. This pairs with desc_update_last_finalized:A.
+	 */
+	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
+					); /* LMM(desc_last_finalized_seq:A) */
+
+	return __ulseq_to_u64seq(rb, ulseq);
+}
+
+static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
+			    struct printk_record *r, unsigned int *line_count);
+
+/*
+ * Check if there are records directly following @last_finalized_seq that are
+ * finalized. If so, update @last_finalized_seq to the latest of these
+ * records. It is not allowed to skip over records that are not yet finalized.
+ */
+static void desc_update_last_finalized(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	u64 old_seq = desc_last_finalized_seq(rb);
+	unsigned long oldval;
+	unsigned long newval;
+	u64 finalized_seq;
+	u64 try_seq;
+
+try_again:
+	finalized_seq = old_seq;
+	try_seq = finalized_seq + 1;
+
+	/* Try to find later finalized records. */
+	while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
+		finalized_seq = try_seq;
+		try_seq++;
+	}
+
+	/* No update needed if no later finalized record was found. */
+	if (finalized_seq == old_seq)
+		return;
+
+	oldval = __u64seq_to_ulseq(old_seq);
+	newval = __u64seq_to_ulseq(finalized_seq);
+
+	/*
+	 * Set the sequence number of a later finalized record that has been
+	 * seen.
+	 *
+	 * Guarantee the record data is visible to other CPUs before storing
+	 * its sequence number. This pairs with desc_last_finalized_seq:A.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_last_finalized_seq:A reads from
+	 * desc_update_last_finalized:A, then desc_read:A reads from
+	 * _prb_commit:B.
+	 *
+	 * Relies on:
+	 *
+	 * RELEASE from _prb_commit:B to desc_update_last_finalized:A
+	 *    matching
+	 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
+	 *
+	 * Note: _prb_commit:B and desc_update_last_finalized:A can be
+	 *       different CPUs. However, the desc_update_last_finalized:A
+	 *       CPU (which performs the release) must have previously seen
+	 *       _prb_commit:B.
+	 */
+	if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
+				&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
+		old_seq = __ulseq_to_u64seq(rb, oldval);
+		goto try_again;
+	}
+}
+
+/*
  * Attempt to finalize a specified descriptor. If this fails, the descriptor
  * is either already final or it will finalize itself when the writer commits.
  */
-static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
+static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
 {
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
 	unsigned long prev_state_val = DESC_SV(id, desc_committed);
 	struct prb_desc *d = to_desc(desc_ring, id);
 
-	atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
-			DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
-
-	/* Best effort to remember the last finalized @id. */
-	atomic_long_set(&desc_ring->last_finalized_id, id);
+	if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
+			DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
+		desc_update_last_finalized(rb);
+	}
 }
 
 /**
@@ -1550,7 +1664,7 @@
 	 * readers. (For seq==0 there is no previous descriptor.)
 	 */
 	if (info->seq > 0)
-		desc_make_final(desc_ring, DESC_ID(id - 1));
+		desc_make_final(rb, DESC_ID(id - 1));
 
 	r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
 	/* If text data allocation fails, a data-less record is committed. */
@@ -1643,7 +1757,7 @@
 	 */
 	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
 	if (head_id != e->id)
-		desc_make_final(desc_ring, e->id);
+		desc_make_final(e->rb, e->id);
 }
 
 /**
@@ -1663,12 +1777,9 @@
  */
 void prb_final_commit(struct prb_reserved_entry *e)
 {
-	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
-
 	_prb_commit(e, desc_finalized);
 
-	/* Best effort to remember the last finalized @id. */
-	atomic_long_set(&desc_ring->last_finalized_id, e->id);
+	desc_update_last_finalized(e->rb);
 }
 
 /*
@@ -1832,7 +1943,7 @@
 }
 
 /* Get the sequence number of the tail descriptor. */
-static u64 prb_first_seq(struct printk_ringbuffer *rb)
+u64 prb_first_seq(struct printk_ringbuffer *rb)
 {
 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
 	enum desc_state d_state;
@@ -1875,12 +1986,123 @@
 	return seq;
 }
 
-/*
- * Non-blocking read of a record. Updates @seq to the last finalized record
- * (which may have no data available).
+/**
+ * prb_next_reserve_seq() - Get the sequence number after the most recently
+ *                  reserved record.
  *
- * See the description of prb_read_valid() and prb_read_valid_info()
- * for details.
+ * @rb:  The ringbuffer to get the sequence number from.
+ *
+ * This is the public function available to readers to see what sequence
+ * number will be assigned to the next reserved record.
+ *
+ * Note that depending on the situation, this value can be equal to or
+ * higher than the sequence number returned by prb_next_seq().
+ *
+ * Context: Any context.
+ * Return: The sequence number that will be assigned to the next record
+ *         reserved.
+ */
+u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	unsigned long last_finalized_id;
+	atomic_long_t *state_var;
+	u64 last_finalized_seq;
+	unsigned long head_id;
+	struct prb_desc desc;
+	unsigned long diff;
+	struct prb_desc *d;
+	int err;
+
+	/*
+	 * It may not be possible to read a sequence number for @head_id.
+	 * So the ID of @last_finailzed_seq is used to calculate what the
+	 * sequence number of @head_id will be.
+	 */
+
+try_again:
+	last_finalized_seq = desc_last_finalized_seq(rb);
+
+	/*
+	 * @head_id is loaded after @last_finalized_seq to ensure that
+	 * it points to the record with @last_finalized_seq or newer.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_last_finalized_seq:A reads from
+	 * desc_update_last_finalized:A, then
+	 * prb_next_reserve_seq:A reads from desc_reserve:D.
+	 *
+	 * Relies on:
+	 *
+	 * RELEASE from desc_reserve:D to desc_update_last_finalized:A
+	 *    matching
+	 * ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A
+	 *
+	 * Note: desc_reserve:D and desc_update_last_finalized:A can be
+	 *       different CPUs. However, the desc_update_last_finalized:A CPU
+	 *       (which performs the release) must have previously seen
+	 *       desc_read:C, which implies desc_reserve:D can be seen.
+	 */
+	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */
+
+	d = to_desc(desc_ring, last_finalized_seq);
+	state_var = &d->state_var;
+
+	/* Extract the ID, used to specify the descriptor to read. */
+	last_finalized_id = DESC_ID(atomic_long_read(state_var));
+
+	/* Ensure @last_finalized_id is correct. */
+	err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc);
+
+	if (err == -EINVAL) {
+		if (last_finalized_seq == 0) {
+			/*
+			 * No record has been finalized or even reserved yet.
+			 *
+			 * The @head_id is initialized such that the first
+			 * increment will yield the first record (seq=0).
+			 * Handle it separately to avoid a negative @diff
+			 * below.
+			 */
+			if (head_id == DESC0_ID(desc_ring->count_bits))
+				return 0;
+
+			/*
+			 * One or more descriptors are already reserved. Use
+			 * the descriptor ID of the first one (@seq=0) for
+			 * the @diff below.
+			 */
+			last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1;
+		} else {
+			/* Record must have been overwritten. Try again. */
+			goto try_again;
+		}
+	}
+
+	/* Diff of known descriptor IDs to compute related sequence numbers. */
+	diff = head_id - last_finalized_id;
+
+	/*
+	 * @head_id points to the most recently reserved record, but this
+	 * function returns the sequence number that will be assigned to the
+	 * next (not yet reserved) record. Thus +1 is needed.
+	 */
+	return (last_finalized_seq + diff + 1);
+}
+
+/*
+ * Non-blocking read of a record.
+ *
+ * On success @seq is updated to the record that was read and (if provided)
+ * @r and @line_count will contain the read/calculated data.
+ *
+ * On failure @seq is updated to a record that is not yet available to the
+ * reader, but it will be the next record available to the reader.
+ *
+ * Note: When the current CPU is in panic, this function will skip over any
+ *       non-existent/non-finalized records in order to allow the panic CPU
+ *       to print any and all records that have been finalized.
  */
 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
 			    struct printk_record *r, unsigned int *line_count)
@@ -1899,12 +2121,32 @@
 			*seq = tail_seq;
 
 		} else if (err == -ENOENT) {
-			/* Record exists, but no data available. Skip. */
+			/* Record exists, but the data was lost. Skip. */
 			(*seq)++;
 
 		} else {
-			/* Non-existent/non-finalized record. Must stop. */
-			return false;
+			/*
+			 * Non-existent/non-finalized record. Must stop.
+			 *
+			 * For panic situations it cannot be expected that
+			 * non-finalized records will become finalized. But
+			 * there may be other finalized records beyond that
+			 * need to be printed for a panic situation. If this
+			 * is the panic CPU, skip this
+			 * non-existent/non-finalized record unless it is
+			 * at or beyond the head, in which case it is not
+			 * possible to continue.
+			 *
+			 * Note that new messages printed on panic CPU are
+			 * finalized when we are here. The only exception
+			 * might be the last message without trailing newline.
+			 * But it would have the sequence number returned
+			 * by "prb_next_reserve_seq() - 1".
+			 */
+			if (this_cpu_in_panic() && ((*seq + 1) < prb_next_reserve_seq(rb)))
+				(*seq)++;
+			else
+				return false;
 		}
 	}
 
@@ -1932,7 +2174,7 @@
  * On success, the reader must check r->info.seq to see which record was
  * actually read. This allows the reader to detect dropped records.
  *
- * Failure means @seq refers to a not yet written record.
+ * Failure means @seq refers to a record not yet available to the reader.
  */
 bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
 		    struct printk_record *r)
@@ -1962,7 +2204,7 @@
  * On success, the reader must check info->seq to see which record meta data
  * was actually read. This allows the reader to detect dropped records.
  *
- * Failure means @seq refers to a not yet written record.
+ * Failure means @seq refers to a record not yet available to the reader.
  */
 bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
 			 struct printk_info *info, unsigned int *line_count)
@@ -2008,7 +2250,9 @@
  * newest sequence number available to readers will be.
  *
  * This provides readers a sequence number to jump to if all currently
- * available records should be skipped.
+ * available records should be skipped. It is guaranteed that all records
+ * previous to the returned value have been finalized and are (or were)
+ * available to the reader.
  *
  * Context: Any context.
  * Return: The sequence number of the next newest (not yet available) record
@@ -2016,34 +2260,19 @@
  */
 u64 prb_next_seq(struct printk_ringbuffer *rb)
 {
-	struct prb_desc_ring *desc_ring = &rb->desc_ring;
-	enum desc_state d_state;
-	unsigned long id;
 	u64 seq;
 
-	/* Check if the cached @id still points to a valid @seq. */
-	id = atomic_long_read(&desc_ring->last_finalized_id);
-	d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
+	seq = desc_last_finalized_seq(rb);
 
-	if (d_state == desc_finalized || d_state == desc_reusable) {
-		/*
-		 * Begin searching after the last finalized record.
-		 *
-		 * On 0, the search must begin at 0 because of hack#2
-		 * of the bootstrapping phase it is not known if a
-		 * record at index 0 exists.
-		 */
-		if (seq != 0)
-			seq++;
-	} else {
-		/*
-		 * The information about the last finalized sequence number
-		 * has gone. It should happen only when there is a flood of
-		 * new messages and the ringbuffer is rapidly recycled.
-		 * Give up and start from the beginning.
-		 */
-		seq = 0;
-	}
+	/*
+	 * Begin searching after the last finalized record.
+	 *
+	 * On 0, the search must begin at 0 because of hack#2
+	 * of the bootstrapping phase it is not known if a
+	 * record at index 0 exists.
+	 */
+	if (seq != 0)
+		seq++;
 
 	/*
 	 * The information about the last finalized @seq might be inaccurate.
@@ -2085,7 +2314,7 @@
 	rb->desc_ring.infos = infos;
 	atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
 	atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
-	atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits));
+	atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
 
 	rb->text_data_ring.size_bits = textbits;
 	rb->text_data_ring.data = text_buf;
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index 18cd25e..52626d0 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -75,7 +75,7 @@
 	struct printk_info	*infos;
 	atomic_long_t		head_id;
 	atomic_long_t		tail_id;
-	atomic_long_t		last_finalized_id;
+	atomic_long_t		last_finalized_seq;
 };
 
 /*
@@ -127,8 +127,22 @@
 #define DESC_SV(id, state)	(((unsigned long)state << DESC_FLAGS_SHIFT) | id)
 #define DESC_ID_MASK		(~DESC_FLAGS_MASK)
 #define DESC_ID(sv)		((sv) & DESC_ID_MASK)
+
+/*
+ * Special data block logical position values (for fields of
+ * @prb_desc.text_blk_lpos).
+ *
+ * - Bit0 is used to identify if the record has no data block. (Implemented in
+ *   the LPOS_DATALESS() macro.)
+ *
+ * - Bit1 specifies the reason for not having a data block.
+ *
+ * These special values could never be real lpos values because of the
+ * meta data and alignment padding of data blocks. (See to_blk_size() for
+ * details.)
+ */
 #define FAILED_LPOS		0x1
-#define NO_LPOS			0x3
+#define EMPTY_LINE_LPOS		0x3
 
 #define FAILED_BLK_LPOS	\
 {				\
@@ -259,7 +273,7 @@
 		.infos		= &_##name##_infos[0],						\
 		.head_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
 		.tail_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
-		.last_finalized_id = ATOMIC_INIT(DESC0_ID(descbits)),				\
+		.last_finalized_seq = ATOMIC_INIT(0),						\
 	},											\
 	.text_data_ring = {									\
 		.size_bits	= (avgtextbits) + (descbits),					\
@@ -378,7 +392,41 @@
 bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
 			 struct printk_info *info, unsigned int *line_count);
 
+u64 prb_first_seq(struct printk_ringbuffer *rb);
 u64 prb_first_valid_seq(struct printk_ringbuffer *rb);
 u64 prb_next_seq(struct printk_ringbuffer *rb);
+u64 prb_next_reserve_seq(struct printk_ringbuffer *rb);
+
+#ifdef CONFIG_64BIT
+
+#define __u64seq_to_ulseq(u64seq) (u64seq)
+#define __ulseq_to_u64seq(rb, ulseq) (ulseq)
+
+#else /* CONFIG_64BIT */
+
+#define __u64seq_to_ulseq(u64seq) ((u32)u64seq)
+
+static inline u64 __ulseq_to_u64seq(struct printk_ringbuffer *rb, u32 ulseq)
+{
+	u64 rb_first_seq = prb_first_seq(rb);
+	u64 seq;
+
+	/*
+	 * The provided sequence is only the lower 32 bits of the ringbuffer
+	 * sequence. It needs to be expanded to 64bit. Get the first sequence
+	 * number from the ringbuffer and fold it.
+	 *
+	 * Having a 32bit representation in the console is sufficient.
+	 * If a console ever gets more than 2^31 records behind
+	 * the ringbuffer then this is the least of the problems.
+	 *
+	 * Also the access to the ring buffer is always safe.
+	 */
+	seq = rb_first_seq - (s32)((u32)rb_first_seq - ulseq);
+
+	return seq;
+}
+
+#endif /* CONFIG_64BIT */
 
 #endif /* _KERNEL_PRINTK_RINGBUFFER_H */
diff --git a/lib/dump_stack.c b/lib/dump_stack.c
index 83471e8..222c6d6 100644
--- a/lib/dump_stack.c
+++ b/lib/dump_stack.c
@@ -96,15 +96,25 @@
  */
 asmlinkage __visible void dump_stack_lvl(const char *log_lvl)
 {
+	bool in_panic = this_cpu_in_panic();
 	unsigned long flags;
 
 	/*
 	 * Permit this cpu to perform nested stack dumps while serialising
-	 * against other CPUs
+	 * against other CPUs, unless this CPU is in panic.
+	 *
+	 * When in panic, non-panic CPUs are not permitted to store new
+	 * printk messages so there is no need to synchronize the output.
+	 * This avoids potential deadlock in panic() if another CPU is
+	 * holding and unable to release the printk_cpu_sync.
 	 */
-	printk_cpu_sync_get_irqsave(flags);
+	if (!in_panic)
+		printk_cpu_sync_get_irqsave(flags);
+
 	__dump_stack(log_lvl);
-	printk_cpu_sync_put_irqrestore(flags);
+
+	if (!in_panic)
+		printk_cpu_sync_put_irqrestore(flags);
 }
 EXPORT_SYMBOL(dump_stack_lvl);