blob: cebd879a30cbd10b75945239d4eff05bd84538d5 [file] [log] [blame]
// SPDX-License-Identifier: GPL-2.0
/*
* Generic ring buffer
*
* Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
*/
#include <linux/trace_recursion.h>
#include <linux/trace_events.h>
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
#include <linux/sched/clock.h>
#include <linux/cacheflush.h>
#include <linux/trace_seq.h>
#include <linux/spinlock.h>
#include <linux/irq_work.h>
#include <linux/security.h>
#include <linux/uaccess.h>
#include <linux/hardirq.h>
#include <linux/kthread.h> /* for self test */
#include <linux/module.h>
#include <linux/percpu.h>
#include <linux/mutex.h>
#include <linux/delay.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/hash.h>
#include <linux/list.h>
#include <linux/cpu.h>
#include <linux/oom.h>
#include <linux/mm.h>
#include <asm/local64.h>
#include <asm/local.h>
/*
* The "absolute" timestamp in the buffer is only 59 bits.
* If a clock has the 5 MSBs set, it needs to be saved and
* reinserted.
*/
#define TS_MSB (0xf8ULL << 56)
#define ABS_TS_MASK (~TS_MSB)
static void update_pages_handler(struct work_struct *work);
/*
* The ring buffer header is special. We must manually up keep it.
*/
int ring_buffer_print_entry_header(struct trace_seq *s)
{
trace_seq_puts(s, "# compressed entry header\n");
trace_seq_puts(s, "\ttype_len : 5 bits\n");
trace_seq_puts(s, "\ttime_delta : 27 bits\n");
trace_seq_puts(s, "\tarray : 32 bits\n");
trace_seq_putc(s, '\n');
trace_seq_printf(s, "\tpadding : type == %d\n",
RINGBUF_TYPE_PADDING);
trace_seq_printf(s, "\ttime_extend : type == %d\n",
RINGBUF_TYPE_TIME_EXTEND);
trace_seq_printf(s, "\ttime_stamp : type == %d\n",
RINGBUF_TYPE_TIME_STAMP);
trace_seq_printf(s, "\tdata max type_len == %d\n",
RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
return !trace_seq_has_overflowed(s);
}
/*
* The ring buffer is made up of a list of pages. A separate list of pages is
* allocated for each CPU. A writer may only write to a buffer that is
* associated with the CPU it is currently executing on. A reader may read
* from any per cpu buffer.
*
* The reader is special. For each per cpu buffer, the reader has its own
* reader page. When a reader has read the entire reader page, this reader
* page is swapped with another page in the ring buffer.
*
* Now, as long as the writer is off the reader page, the reader can do what
* ever it wants with that page. The writer will never write to that page
* again (as long as it is out of the ring buffer).
*
* Here's some silly ASCII art.
*
* +------+
* |reader| RING BUFFER
* |page |
* +------+ +---+ +---+ +---+
* | |-->| |-->| |
* +---+ +---+ +---+
* ^ |
* | |
* +---------------+
*
*
* +------+
* |reader| RING BUFFER
* |page |------------------v
* +------+ +---+ +---+ +---+
* | |-->| |-->| |
* +---+ +---+ +---+
* ^ |
* | |
* +---------------+
*
*
* +------+
* |reader| RING BUFFER
* |page |------------------v
* +------+ +---+ +---+ +---+
* ^ | |-->| |-->| |
* | +---+ +---+ +---+
* | |
* | |
* +------------------------------+
*
*
* +------+
* |buffer| RING BUFFER
* |page |------------------v
* +------+ +---+ +---+ +---+
* ^ | | | |-->| |
* | New +---+ +---+ +---+
* | Reader------^ |
* | page |
* +------------------------------+
*
*
* After we make this swap, the reader can hand this page off to the splice
* code and be done with it. It can even allocate a new page if it needs to
* and swap that into the ring buffer.
*
* We will be using cmpxchg soon to make all this lockless.
*
*/
/* Used for individual buffers (after the counter) */
#define RB_BUFFER_OFF (1 << 20)
#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
#define RB_ALIGNMENT 4U
#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
# define RB_FORCE_8BYTE_ALIGNMENT 0
# define RB_ARCH_ALIGNMENT RB_ALIGNMENT
#else
# define RB_FORCE_8BYTE_ALIGNMENT 1
# define RB_ARCH_ALIGNMENT 8U
#endif
#define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
enum {
RB_LEN_TIME_EXTEND = 8,
RB_LEN_TIME_STAMP = 8,
};
#define skip_time_extend(event) \
((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
#define extended_time(event) \
(event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
static inline bool rb_null_event(struct ring_buffer_event *event)
{
return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
}
static void rb_event_set_padding(struct ring_buffer_event *event)
{
/* padding has a NULL time_delta */
event->type_len = RINGBUF_TYPE_PADDING;
event->time_delta = 0;
}
static unsigned
rb_event_data_length(struct ring_buffer_event *event)
{
unsigned length;
if (event->type_len)
length = event->type_len * RB_ALIGNMENT;
else
length = event->array[0];
return length + RB_EVNT_HDR_SIZE;
}
/*
* Return the length of the given event. Will return
* the length of the time extend if the event is a
* time extend.
*/
static inline unsigned
rb_event_length(struct ring_buffer_event *event)
{
switch (event->type_len) {
case RINGBUF_TYPE_PADDING:
if (rb_null_event(event))
/* undefined */
return -1;
return event->array[0] + RB_EVNT_HDR_SIZE;
case RINGBUF_TYPE_TIME_EXTEND:
return RB_LEN_TIME_EXTEND;
case RINGBUF_TYPE_TIME_STAMP:
return RB_LEN_TIME_STAMP;
case RINGBUF_TYPE_DATA:
return rb_event_data_length(event);
default:
WARN_ON_ONCE(1);
}
/* not hit */
return 0;
}
/*
* Return total length of time extend and data,
* or just the event length for all other events.
*/
static inline unsigned
rb_event_ts_length(struct ring_buffer_event *event)
{
unsigned len = 0;
if (extended_time(event)) {
/* time extends include the data event after it */
len = RB_LEN_TIME_EXTEND;
event = skip_time_extend(event);
}
return len + rb_event_length(event);
}
/**
* ring_buffer_event_length - return the length of the event
* @event: the event to get the length of
*
* Returns the size of the data load of a data event.
* If the event is something other than a data event, it
* returns the size of the event itself. With the exception
* of a TIME EXTEND, where it still returns the size of the
* data load of the data event after it.
*/
unsigned ring_buffer_event_length(struct ring_buffer_event *event)
{
unsigned length;
if (extended_time(event))
event = skip_time_extend(event);
length = rb_event_length(event);
if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
return length;
length -= RB_EVNT_HDR_SIZE;
if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
length -= sizeof(event->array[0]);
return length;
}
EXPORT_SYMBOL_GPL(ring_buffer_event_length);
/* inline for ring buffer fast paths */
static __always_inline void *
rb_event_data(struct ring_buffer_event *event)
{
if (extended_time(event))
event = skip_time_extend(event);
WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
/* If length is in len field, then array[0] has the data */
if (event->type_len)
return (void *)&event->array[0];
/* Otherwise length is in array[0] and array[1] has the data */
return (void *)&event->array[1];
}
/**
* ring_buffer_event_data - return the data of the event
* @event: the event to get the data from
*/
void *ring_buffer_event_data(struct ring_buffer_event *event)
{
return rb_event_data(event);
}
EXPORT_SYMBOL_GPL(ring_buffer_event_data);
#define for_each_buffer_cpu(buffer, cpu) \
for_each_cpu(cpu, buffer->cpumask)
#define for_each_online_buffer_cpu(buffer, cpu) \
for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
#define TS_SHIFT 27
#define TS_MASK ((1ULL << TS_SHIFT) - 1)
#define TS_DELTA_TEST (~TS_MASK)
static u64 rb_event_time_stamp(struct ring_buffer_event *event)
{
u64 ts;
ts = event->array[0];
ts <<= TS_SHIFT;
ts += event->time_delta;
return ts;
}
/* Flag when events were overwritten */
#define RB_MISSED_EVENTS (1 << 31)
/* Missed count stored at end */
#define RB_MISSED_STORED (1 << 30)
#define RB_MISSED_MASK (3 << 30)
struct buffer_data_page {
u64 time_stamp; /* page time stamp */
local_t commit; /* write committed index */
unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
};
struct buffer_data_read_page {
unsigned order; /* order of the page */
struct buffer_data_page *data; /* actual data, stored in this page */
};
/*
* Note, the buffer_page list must be first. The buffer pages
* are allocated in cache lines, which means that each buffer
* page will be at the beginning of a cache line, and thus
* the least significant bits will be zero. We use this to
* add flags in the list struct pointers, to make the ring buffer
* lockless.
*/
struct buffer_page {
struct list_head list; /* list of buffer pages */
local_t write; /* index for next write */
unsigned read; /* index for next read */
local_t entries; /* entries on this page */
unsigned long real_end; /* real end of data */
unsigned order; /* order of the page */
u32 id; /* ID for external mapping */
struct buffer_data_page *page; /* Actual data page */
};
/*
* The buffer page counters, write and entries, must be reset
* atomically when crossing page boundaries. To synchronize this
* update, two counters are inserted into the number. One is
* the actual counter for the write position or count on the page.
*
* The other is a counter of updaters. Before an update happens
* the update partition of the counter is incremented. This will
* allow the updater to update the counter atomically.
*
* The counter is 20 bits, and the state data is 12.
*/
#define RB_WRITE_MASK 0xfffff
#define RB_WRITE_INTCNT (1 << 20)
static void rb_init_page(struct buffer_data_page *bpage)
{
local_set(&bpage->commit, 0);
}
static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
{
return local_read(&bpage->page->commit);
}
static void free_buffer_page(struct buffer_page *bpage)
{
free_pages((unsigned long)bpage->page, bpage->order);
kfree(bpage);
}
/*
* We need to fit the time_stamp delta into 27 bits.
*/
static inline bool test_time_stamp(u64 delta)
{
return !!(delta & TS_DELTA_TEST);
}
struct rb_irq_work {
struct irq_work work;
wait_queue_head_t waiters;
wait_queue_head_t full_waiters;
atomic_t seq;
bool waiters_pending;
bool full_waiters_pending;
bool wakeup_full;
};
/*
* Structure to hold event state and handle nested events.
*/
struct rb_event_info {
u64 ts;
u64 delta;
u64 before;
u64 after;
unsigned long length;
struct buffer_page *tail_page;
int add_timestamp;
};
/*
* Used for the add_timestamp
* NONE
* EXTEND - wants a time extend
* ABSOLUTE - the buffer requests all events to have absolute time stamps
* FORCE - force a full time stamp.
*/
enum {
RB_ADD_STAMP_NONE = 0,
RB_ADD_STAMP_EXTEND = BIT(1),
RB_ADD_STAMP_ABSOLUTE = BIT(2),
RB_ADD_STAMP_FORCE = BIT(3)
};
/*
* Used for which event context the event is in.
* TRANSITION = 0
* NMI = 1
* IRQ = 2
* SOFTIRQ = 3
* NORMAL = 4
*
* See trace_recursive_lock() comment below for more details.
*/
enum {
RB_CTX_TRANSITION,
RB_CTX_NMI,
RB_CTX_IRQ,
RB_CTX_SOFTIRQ,
RB_CTX_NORMAL,
RB_CTX_MAX
};
struct rb_time_struct {
local64_t time;
};
typedef struct rb_time_struct rb_time_t;
#define MAX_NEST 5
/*
* head_page == tail_page && head == tail then buffer is empty.
*/
struct ring_buffer_per_cpu {
int cpu;
atomic_t record_disabled;
atomic_t resize_disabled;
struct trace_buffer *buffer;
raw_spinlock_t reader_lock; /* serialize readers */
arch_spinlock_t lock;
struct lock_class_key lock_key;
struct buffer_data_page *free_page;
unsigned long nr_pages;
unsigned int current_context;
struct list_head *pages;
struct buffer_page *head_page; /* read from head */
struct buffer_page *tail_page; /* write to tail */
struct buffer_page *commit_page; /* committed pages */
struct buffer_page *reader_page;
unsigned long lost_events;
unsigned long last_overrun;
unsigned long nest;
local_t entries_bytes;
local_t entries;
local_t overrun;
local_t commit_overrun;
local_t dropped_events;
local_t committing;
local_t commits;
local_t pages_touched;
local_t pages_lost;
local_t pages_read;
long last_pages_touch;
size_t shortest_full;
unsigned long read;
unsigned long read_bytes;
rb_time_t write_stamp;
rb_time_t before_stamp;
u64 event_stamp[MAX_NEST];
u64 read_stamp;
/* pages removed since last reset */
unsigned long pages_removed;
unsigned int mapped;
struct mutex mapping_lock;
unsigned long *subbuf_ids; /* ID to subbuf VA */
struct trace_buffer_meta *meta_page;
/* ring buffer pages to update, > 0 to add, < 0 to remove */
long nr_pages_to_update;
struct list_head new_pages; /* new pages to add */
struct work_struct update_pages_work;
struct completion update_done;
struct rb_irq_work irq_work;
};
struct trace_buffer {
unsigned flags;
int cpus;
atomic_t record_disabled;
atomic_t resizing;
cpumask_var_t cpumask;
struct lock_class_key *reader_lock_key;
struct mutex mutex;
struct ring_buffer_per_cpu **buffers;
struct hlist_node node;
u64 (*clock)(void);
struct rb_irq_work irq_work;
bool time_stamp_abs;
unsigned int subbuf_size;
unsigned int subbuf_order;
unsigned int max_data_size;
};
struct ring_buffer_iter {
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long head;
unsigned long next_event;
struct buffer_page *head_page;
struct buffer_page *cache_reader_page;
unsigned long cache_read;
unsigned long cache_pages_removed;
u64 read_stamp;
u64 page_stamp;
struct ring_buffer_event *event;
size_t event_size;
int missed_events;
};
int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s)
{
struct buffer_data_page field;
trace_seq_printf(s, "\tfield: u64 timestamp;\t"
"offset:0;\tsize:%u;\tsigned:%u;\n",
(unsigned int)sizeof(field.time_stamp),
(unsigned int)is_signed_type(u64));
trace_seq_printf(s, "\tfield: local_t commit;\t"
"offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), commit),
(unsigned int)sizeof(field.commit),
(unsigned int)is_signed_type(long));
trace_seq_printf(s, "\tfield: int overwrite;\t"
"offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), commit),
1,
(unsigned int)is_signed_type(long));
trace_seq_printf(s, "\tfield: char data;\t"
"offset:%u;\tsize:%u;\tsigned:%u;\n",
(unsigned int)offsetof(typeof(field), data),
(unsigned int)buffer->subbuf_size,
(unsigned int)is_signed_type(char));
return !trace_seq_has_overflowed(s);
}
static inline void rb_time_read(rb_time_t *t, u64 *ret)
{
*ret = local64_read(&t->time);
}
static void rb_time_set(rb_time_t *t, u64 val)
{
local64_set(&t->time, val);
}
/*
* Enable this to make sure that the event passed to
* ring_buffer_event_time_stamp() is not committed and also
* is on the buffer that it passed in.
*/
//#define RB_VERIFY_EVENT
#ifdef RB_VERIFY_EVENT
static struct list_head *rb_list_head(struct list_head *list);
static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
void *event)
{
struct buffer_page *page = cpu_buffer->commit_page;
struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
struct list_head *next;
long commit, write;
unsigned long addr = (unsigned long)event;
bool done = false;
int stop = 0;
/* Make sure the event exists and is not committed yet */
do {
if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
done = true;
commit = local_read(&page->page->commit);
write = local_read(&page->write);
if (addr >= (unsigned long)&page->page->data[commit] &&
addr < (unsigned long)&page->page->data[write])
return;
next = rb_list_head(page->list.next);
page = list_entry(next, struct buffer_page, list);
} while (!done);
WARN_ON_ONCE(1);
}
#else
static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
void *event)
{
}
#endif
/*
* The absolute time stamp drops the 5 MSBs and some clocks may
* require them. The rb_fix_abs_ts() will take a previous full
* time stamp, and add the 5 MSB of that time stamp on to the
* saved absolute time stamp. Then they are compared in case of
* the unlikely event that the latest time stamp incremented
* the 5 MSB.
*/
static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
{
if (save_ts & TS_MSB) {
abs |= save_ts & TS_MSB;
/* Check for overflow */
if (unlikely(abs < save_ts))
abs += 1ULL << 59;
}
return abs;
}
static inline u64 rb_time_stamp(struct trace_buffer *buffer);
/**
* ring_buffer_event_time_stamp - return the event's current time stamp
* @buffer: The buffer that the event is on
* @event: the event to get the time stamp of
*
* Note, this must be called after @event is reserved, and before it is
* committed to the ring buffer. And must be called from the same
* context where the event was reserved (normal, softirq, irq, etc).
*
* Returns the time stamp associated with the current event.
* If the event has an extended time stamp, then that is used as
* the time stamp to return.
* In the highly unlikely case that the event was nested more than
* the max nesting, then the write_stamp of the buffer is returned,
* otherwise current time is returned, but that really neither of
* the last two cases should ever happen.
*/
u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
struct ring_buffer_event *event)
{
struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
unsigned int nest;
u64 ts;
/* If the event includes an absolute time, then just use that */
if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
ts = rb_event_time_stamp(event);
return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
}
nest = local_read(&cpu_buffer->committing);
verify_event(cpu_buffer, event);
if (WARN_ON_ONCE(!nest))
goto fail;
/* Read the current saved nesting level time stamp */
if (likely(--nest < MAX_NEST))
return cpu_buffer->event_stamp[nest];
/* Shouldn't happen, warn if it does */
WARN_ONCE(1, "nest (%d) greater than max", nest);
fail:
rb_time_read(&cpu_buffer->write_stamp, &ts);
return ts;
}
/**
* ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer
* @buffer: The ring_buffer to get the number of pages from
* @cpu: The cpu of the ring_buffer to get the number of pages from
*
* Returns the number of pages that have content in the ring buffer.
*/
size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
{
size_t read;
size_t lost;
size_t cnt;
read = local_read(&buffer->buffers[cpu]->pages_read);
lost = local_read(&buffer->buffers[cpu]->pages_lost);
cnt = local_read(&buffer->buffers[cpu]->pages_touched);
if (WARN_ON_ONCE(cnt < lost))
return 0;
cnt -= lost;
/* The reader can read an empty page, but not more than that */
if (cnt < read) {
WARN_ON_ONCE(read > cnt + 1);
return 0;
}
return cnt - read;
}
static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
{
struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
size_t nr_pages;
size_t dirty;
nr_pages = cpu_buffer->nr_pages;
if (!nr_pages || !full)
return true;
/*
* Add one as dirty will never equal nr_pages, as the sub-buffer
* that the writer is on is not counted as dirty.
* This is needed if "buffer_percent" is set to 100.
*/
dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
return (dirty * 100) >= (full * nr_pages);
}
/*
* rb_wake_up_waiters - wake up tasks waiting for ring buffer input
*
* Schedules a delayed work to wake up any task that is blocked on the
* ring buffer waiters queue.
*/
static void rb_wake_up_waiters(struct irq_work *work)
{
struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
/* For waiters waiting for the first wake up */
(void)atomic_fetch_inc_release(&rbwork->seq);
wake_up_all(&rbwork->waiters);
if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
/* Only cpu_buffer sets the above flags */
struct ring_buffer_per_cpu *cpu_buffer =
container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
/* Called from interrupt context */
raw_spin_lock(&cpu_buffer->reader_lock);
rbwork->wakeup_full = false;
rbwork->full_waiters_pending = false;
/* Waking up all waiters, they will reset the shortest full */
cpu_buffer->shortest_full = 0;
raw_spin_unlock(&cpu_buffer->reader_lock);
wake_up_all(&rbwork->full_waiters);
}
}
/**
* ring_buffer_wake_waiters - wake up any waiters on this ring buffer
* @buffer: The ring buffer to wake waiters on
* @cpu: The CPU buffer to wake waiters on
*
* In the case of a file that represents a ring buffer is closing,
* it is prudent to wake up any waiters that are on this.
*/
void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct rb_irq_work *rbwork;
if (!buffer)
return;
if (cpu == RING_BUFFER_ALL_CPUS) {
/* Wake up individual ones too. One level recursion */
for_each_buffer_cpu(buffer, cpu)
ring_buffer_wake_waiters(buffer, cpu);
rbwork = &buffer->irq_work;
} else {
if (WARN_ON_ONCE(!buffer->buffers))
return;
if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
return;
cpu_buffer = buffer->buffers[cpu];
/* The CPU buffer may not have been initialized yet */
if (!cpu_buffer)
return;
rbwork = &cpu_buffer->irq_work;
}
/* This can be called in any context */
irq_work_queue(&rbwork->work);
}
static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
{
struct ring_buffer_per_cpu *cpu_buffer;
bool ret = false;
/* Reads of all CPUs always waits for any data */
if (cpu == RING_BUFFER_ALL_CPUS)
return !ring_buffer_empty(buffer);
cpu_buffer = buffer->buffers[cpu];
if (!ring_buffer_empty_cpu(buffer, cpu)) {
unsigned long flags;
bool pagebusy;
if (!full)
return true;
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
ret = !pagebusy && full_hit(buffer, cpu, full);
if (!ret && (!cpu_buffer->shortest_full ||
cpu_buffer->shortest_full > full)) {
cpu_buffer->shortest_full = full;
}
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
}
return ret;
}
static inline bool
rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
int cpu, int full, ring_buffer_cond_fn cond, void *data)
{
if (rb_watermark_hit(buffer, cpu, full))
return true;
if (cond(data))
return true;
/*
* The events can happen in critical sections where
* checking a work queue can cause deadlocks.
* After adding a task to the queue, this flag is set
* only to notify events to try to wake up the queue
* using irq_work.
*
* We don't clear it even if the buffer is no longer
* empty. The flag only causes the next event to run
* irq_work to do the work queue wake up. The worse
* that can happen if we race with !trace_empty() is that
* an event will cause an irq_work to try to wake up
* an empty queue.
*
* There's no reason to protect this flag either, as
* the work queue and irq_work logic will do the necessary
* synchronization for the wake ups. The only thing
* that is necessary is that the wake up happens after
* a task has been queued. It's OK for spurious wake ups.
*/
if (full)
rbwork->full_waiters_pending = true;
else
rbwork->waiters_pending = true;
return false;
}
struct rb_wait_data {
struct rb_irq_work *irq_work;
int seq;
};
/*
* The default wait condition for ring_buffer_wait() is to just to exit the
* wait loop the first time it is woken up.
*/
static bool rb_wait_once(void *data)
{
struct rb_wait_data *rdata = data;
struct rb_irq_work *rbwork = rdata->irq_work;
return atomic_read_acquire(&rbwork->seq) != rdata->seq;
}
/**
* ring_buffer_wait - wait for input to the ring buffer
* @buffer: buffer to wait on
* @cpu: the cpu buffer to wait on
* @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
* @cond: condition function to break out of wait (NULL to run once)
* @data: the data to pass to @cond.
*
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
* as data is added to any of the @buffer's cpu buffers. Otherwise
* it will wait for data to be added to a specific cpu buffer.
*/
int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
ring_buffer_cond_fn cond, void *data)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct wait_queue_head *waitq;
struct rb_irq_work *rbwork;
struct rb_wait_data rdata;
int ret = 0;
/*
* Depending on what the caller is waiting for, either any
* data in any cpu buffer, or a specific buffer, put the
* caller on the appropriate wait queue.
*/
if (cpu == RING_BUFFER_ALL_CPUS) {
rbwork = &buffer->irq_work;
/* Full only makes sense on per cpu reads */
full = 0;
} else {
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return -ENODEV;
cpu_buffer = buffer->buffers[cpu];
rbwork = &cpu_buffer->irq_work;
}
if (full)
waitq = &rbwork->full_waiters;
else
waitq = &rbwork->waiters;
/* Set up to exit loop as soon as it is woken */
if (!cond) {
cond = rb_wait_once;
rdata.irq_work = rbwork;
rdata.seq = atomic_read_acquire(&rbwork->seq);
data = &rdata;
}
ret = wait_event_interruptible((*waitq),
rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
return ret;
}
/**
* ring_buffer_poll_wait - poll on buffer input
* @buffer: buffer to wait on
* @cpu: the cpu buffer to wait on
* @filp: the file descriptor
* @poll_table: The poll descriptor
* @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
*
* If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
* as data is added to any of the @buffer's cpu buffers. Otherwise
* it will wait for data to be added to a specific cpu buffer.
*
* Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
* zero otherwise.
*/
__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
struct file *filp, poll_table *poll_table, int full)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct rb_irq_work *rbwork;
if (cpu == RING_BUFFER_ALL_CPUS) {
rbwork = &buffer->irq_work;
full = 0;
} else {
if (!cpumask_test_cpu(cpu, buffer->cpumask))
return EPOLLERR;
cpu_buffer = buffer->buffers[cpu];
rbwork = &cpu_buffer->irq_work;
}
if (full) {
poll_wait(filp, &rbwork->full_waiters, poll_table);
if (rb_watermark_hit(buffer, cpu, full))
return EPOLLIN | EPOLLRDNORM;
/*
* Only allow full_waiters_pending update to be seen after
* the shortest_full is set (in rb_watermark_hit). If the
* writer sees the full_waiters_pending flag set, it will
* compare the amount in the ring buffer to shortest_full.
* If the amount in the ring buffer is greater than the
* shortest_full percent, it will call the irq_work handler
* to wake up this list. The irq_handler will reset shortest_full
* back to zero. That's done under the reader_lock, but
* the below smp_mb() makes sure that the update to
* full_waiters_pending doesn't leak up into the above.
*/
smp_mb();
rbwork->full_waiters_pending = true;
return 0;
}
poll_wait(filp, &rbwork->waiters, poll_table);
rbwork->waiters_pending = true;
/*
* There's a tight race between setting the waiters_pending and
* checking if the ring buffer is empty. Once the waiters_pending bit
* is set, the next event will wake the task up, but we can get stuck
* if there's only a single event in.
*
* FIXME: Ideally, we need a memory barrier on the writer side as well,
* but adding a memory barrier to all events will cause too much of a
* performance hit in the fast path. We only need a memory barrier when
* the buffer goes from empty to having content. But as this race is
* extremely small, and it's not a problem if another event comes in, we
* will fix it later.
*/
smp_mb();
if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
(cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
return EPOLLIN | EPOLLRDNORM;
return 0;
}
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
#define RB_WARN_ON(b, cond) \
({ \
int _____ret = unlikely(cond); \
if (_____ret) { \
if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
struct ring_buffer_per_cpu *__b = \
(void *)b; \
atomic_inc(&__b->buffer->record_disabled); \
} else \
atomic_inc(&b->record_disabled); \
WARN_ON(1); \
} \
_____ret; \
})
/* Up this if you want to test the TIME_EXTENTS and normalization */
#define DEBUG_SHIFT 0
static inline u64 rb_time_stamp(struct trace_buffer *buffer)
{
u64 ts;
/* Skip retpolines :-( */
if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local))
ts = trace_clock_local();
else
ts = buffer->clock();
/* shift to debug/test normalization and TIME_EXTENTS */
return ts << DEBUG_SHIFT;
}
u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
{
u64 time;
preempt_disable_notrace();
time = rb_time_stamp(buffer);
preempt_enable_notrace();
return time;
}
EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
int cpu, u64 *ts)
{
/* Just stupid testing the normalize function and deltas */
*ts >>= DEBUG_SHIFT;
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
/*
* Making the ring buffer lockless makes things tricky.
* Although writes only happen on the CPU that they are on,
* and they only need to worry about interrupts. Reads can
* happen on any CPU.
*
* The reader page is always off the ring buffer, but when the
* reader finishes with a page, it needs to swap its page with
* a new one from the buffer. The reader needs to take from
* the head (writes go to the tail). But if a writer is in overwrite
* mode and wraps, it must push the head page forward.
*
* Here lies the problem.
*
* The reader must be careful to replace only the head page, and
* not another one. As described at the top of the file in the
* ASCII art, the reader sets its old page to point to the next
* page after head. It then sets the page after head to point to
* the old reader page. But if the writer moves the head page
* during this operation, the reader could end up with the tail.
*
* We use cmpxchg to help prevent this race. We also do something
* special with the page before head. We set the LSB to 1.
*
* When the writer must push the page forward, it will clear the
* bit that points to the head page, move the head, and then set
* the bit that points to the new head page.
*
* We also don't want an interrupt coming in and moving the head
* page on another writer. Thus we use the second LSB to catch
* that too. Thus:
*
* head->list->prev->next bit 1 bit 0
* ------- -------
* Normal page 0 0
* Points to head page 0 1
* New head page 1 0
*
* Note we can not trust the prev pointer of the head page, because:
*
* +----+ +-----+ +-----+
* | |------>| T |---X--->| N |
* | |<------| | | |
* +----+ +-----+ +-----+
* ^ ^ |
* | +-----+ | |
* +----------| R |----------+ |
* | |<-----------+
* +-----+
*
* Key: ---X--> HEAD flag set in pointer
* T Tail page
* R Reader page
* N Next page
*
* (see __rb_reserve_next() to see where this happens)
*
* What the above shows is that the reader just swapped out
* the reader page with a page in the buffer, but before it
* could make the new header point back to the new page added
* it was preempted by a writer. The writer moved forward onto
* the new page added by the reader and is about to move forward
* again.
*
* You can see, it is legitimate for the previous pointer of
* the head (or any page) not to point back to itself. But only
* temporarily.
*/
#define RB_PAGE_NORMAL 0UL
#define RB_PAGE_HEAD 1UL
#define RB_PAGE_UPDATE 2UL
#define RB_FLAG_MASK 3UL
/* PAGE_MOVED is not part of the mask */
#define RB_PAGE_MOVED 4UL
/*
* rb_list_head - remove any bit
*/
static struct list_head *rb_list_head(struct list_head *list)
{
unsigned long val = (unsigned long)list;
return (struct list_head *)(val & ~RB_FLAG_MASK);
}
/*
* rb_is_head_page - test if the given page is the head page
*
* Because the reader may move the head_page pointer, we can
* not trust what the head page is (it may be pointing to
* the reader page). But if the next page is a header page,
* its flags will be non zero.
*/
static inline int
rb_is_head_page(struct buffer_page *page, struct list_head *list)
{
unsigned long val;
val = (unsigned long)list->next;
if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
return RB_PAGE_MOVED;
return val & RB_FLAG_MASK;
}
/*
* rb_is_reader_page
*
* The unique thing about the reader page, is that, if the
* writer is ever on it, the previous pointer never points
* back to the reader page.
*/
static bool rb_is_reader_page(struct buffer_page *page)
{
struct list_head *list = page->list.prev;
return rb_list_head(list->next) != &page->list;
}
/*
* rb_set_list_to_head - set a list_head to be pointing to head.
*/
static void rb_set_list_to_head(struct list_head *list)
{
unsigned long *ptr;
ptr = (unsigned long *)&list->next;
*ptr |= RB_PAGE_HEAD;
*ptr &= ~RB_PAGE_UPDATE;
}
/*
* rb_head_page_activate - sets up head page
*/
static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *head;
head = cpu_buffer->head_page;
if (!head)
return;
/*
* Set the previous list pointer to have the HEAD flag.
*/
rb_set_list_to_head(head->list.prev);
}
static void rb_list_head_clear(struct list_head *list)
{
unsigned long *ptr = (unsigned long *)&list->next;
*ptr &= ~RB_FLAG_MASK;
}
/*
* rb_head_page_deactivate - clears head page ptr (for free list)
*/
static void
rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *hd;
/* Go through the whole list and clear any pointers found. */
rb_list_head_clear(cpu_buffer->pages);
list_for_each(hd, cpu_buffer->pages)
rb_list_head_clear(hd);
}
static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag, int new_flag)
{
struct list_head *list;
unsigned long val = (unsigned long)&head->list;
unsigned long ret;
list = &prev->list;
val &= ~RB_FLAG_MASK;
ret = cmpxchg((unsigned long *)&list->next,
val | old_flag, val | new_flag);
/* check if the reader took the page */
if ((ret & ~RB_FLAG_MASK) != val)
return RB_PAGE_MOVED;
return ret & RB_FLAG_MASK;
}
static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_UPDATE);
}
static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_HEAD);
}
static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *head,
struct buffer_page *prev,
int old_flag)
{
return rb_head_page_set(cpu_buffer, head, prev,
old_flag, RB_PAGE_NORMAL);
}
static inline void rb_inc_page(struct buffer_page **bpage)
{
struct list_head *p = rb_list_head((*bpage)->list.next);
*bpage = list_entry(p, struct buffer_page, list);
}
static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
struct buffer_page *head;
struct buffer_page *page;
struct list_head *list;
int i;
if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
return NULL;
/* sanity check */
list = cpu_buffer->pages;
if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
return NULL;
page = head = cpu_buffer->head_page;
/*
* It is possible that the writer moves the header behind
* where we started, and we miss in one loop.
* A second loop should grab the header, but we'll do
* three loops just because I'm paranoid.
*/
for (i = 0; i < 3; i++) {
do {
if (rb_is_head_page(page, page->list.prev)) {
cpu_buffer->head_page = page;
return page;
}
rb_inc_page(&page);
} while (page != head);
}
RB_WARN_ON(cpu_buffer, 1);
return NULL;
}
static bool rb_head_page_replace(struct buffer_page *old,
struct buffer_page *new)
{
unsigned long *ptr = (unsigned long *)&old->list.prev->next;
unsigned long val;
val = *ptr & ~RB_FLAG_MASK;
val |= RB_PAGE_HEAD;
return try_cmpxchg(ptr, &val, (unsigned long)&new->list);
}
/*
* rb_tail_page_update - move the tail page forward
*/
static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *tail_page,
struct buffer_page *next_page)
{
unsigned long old_entries;
unsigned long old_write;
/*
* The tail page now needs to be moved forward.
*
* We need to reset the tail page, but without messing
* with possible erasing of data brought in by interrupts
* that have moved the tail page and are currently on it.
*
* We add a counter to the write field to denote this.
*/
old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
/*
* Just make sure we have seen our old_write and synchronize
* with any interrupts that come in.
*/
barrier();
/*
* If the tail page is still the same as what we think
* it is, then it is up to us to update the tail
* pointer.
*/
if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
/* Zero the write counter */
unsigned long val = old_write & ~RB_WRITE_MASK;
unsigned long eval = old_entries & ~RB_WRITE_MASK;
/*
* This will only succeed if an interrupt did
* not come in and change it. In which case, we
* do not want to modify it.
*
* We add (void) to let the compiler know that we do not care
* about the return value of these functions. We use the
* cmpxchg to only update if an interrupt did not already
* do it for us. If the cmpxchg fails, we don't care.
*/
(void)local_cmpxchg(&next_page->write, old_write, val);
(void)local_cmpxchg(&next_page->entries, old_entries, eval);
/*
* No need to worry about races with clearing out the commit.
* it only can increment when a commit takes place. But that
* only happens in the outer most nested commit.
*/
local_set(&next_page->page->commit, 0);
/* Either we update tail_page or an interrupt does */
if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
local_inc(&cpu_buffer->pages_touched);
}
}
static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *bpage)
{
unsigned long val = (unsigned long)bpage;
RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
}
/**
* rb_check_pages - integrity check of buffer pages
* @cpu_buffer: CPU buffer with pages to test
*
* As a safety measure we check to make sure the data pages have not
* been corrupted.
*
* Callers of this function need to guarantee that the list of pages doesn't get
* modified during the check. In particular, if it's possible that the function
* is invoked with concurrent readers which can swap in a new reader page then
* the caller should take cpu_buffer->reader_lock.
*/
static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *head = rb_list_head(cpu_buffer->pages);
struct list_head *tmp;
if (RB_WARN_ON(cpu_buffer,
rb_list_head(rb_list_head(head->next)->prev) != head))
return;
if (RB_WARN_ON(cpu_buffer,
rb_list_head(rb_list_head(head->prev)->next) != head))
return;
for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
if (RB_WARN_ON(cpu_buffer,
rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
return;
if (RB_WARN_ON(cpu_buffer,
rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
return;
}
}
static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
long nr_pages, struct list_head *pages)
{
struct buffer_page *bpage, *tmp;
bool user_thread = current->mm != NULL;
gfp_t mflags;
long i;
/*
* Check if the available memory is there first.
* Note, si_mem_available() only gives us a rough estimate of available
* memory. It may not be accurate. But we don't care, we just want
* to prevent doing any allocation when it is obvious that it is
* not going to succeed.
*/
i = si_mem_available();
if (i < nr_pages)
return -ENOMEM;
/*
* __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
* gracefully without invoking oom-killer and the system is not
* destabilized.
*/
mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
/*
* If a user thread allocates too much, and si_mem_available()
* reports there's enough memory, even though there is not.
* Make sure the OOM killer kills this thread. This can happen
* even with RETRY_MAYFAIL because another task may be doing
* an allocation after this task has taken all memory.
* This is the task the OOM killer needs to take out during this
* loop, even if it was triggered by an allocation somewhere else.
*/
if (user_thread)
set_current_oom_origin();
for (i = 0; i < nr_pages; i++) {
struct page *page;
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
mflags, cpu_to_node(cpu_buffer->cpu));
if (!bpage)
goto free_pages;
rb_check_bpage(cpu_buffer, bpage);
list_add(&bpage->list, pages);
page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
mflags | __GFP_COMP | __GFP_ZERO,
cpu_buffer->buffer->subbuf_order);
if (!page)
goto free_pages;
bpage->page = page_address(page);
bpage->order = cpu_buffer->buffer->subbuf_order;
rb_init_page(bpage->page);
if (user_thread && fatal_signal_pending(current))
goto free_pages;
}
if (user_thread)
clear_current_oom_origin();
return 0;
free_pages:
list_for_each_entry_safe(bpage, tmp, pages, list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
if (user_thread)
clear_current_oom_origin();
return -ENOMEM;
}
static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long nr_pages)
{
LIST_HEAD(pages);
WARN_ON(!nr_pages);
if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
return -ENOMEM;
/*
* The ring buffer page list is a circular list that does not
* start and end with a list head. All page list items point to
* other pages.
*/
cpu_buffer->pages = pages.next;
list_del(&pages);
cpu_buffer->nr_pages = nr_pages;
rb_check_pages(cpu_buffer);
return 0;
}
static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct buffer_page *bpage;
struct page *page;
int ret;
cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
if (!cpu_buffer)
return NULL;
cpu_buffer->cpu = cpu;
cpu_buffer->buffer = buffer;
raw_spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
init_completion(&cpu_buffer->update_done);
init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&cpu_buffer->irq_work.waiters);
init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
mutex_init(&cpu_buffer->mapping_lock);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
if (!bpage)
goto fail_free_buffer;
rb_check_bpage(cpu_buffer, bpage);
cpu_buffer->reader_page = bpage;
page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
cpu_buffer->buffer->subbuf_order);
if (!page)
goto fail_free_reader;
bpage->page = page_address(page);
rb_init_page(bpage->page);
INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
INIT_LIST_HEAD(&cpu_buffer->new_pages);
ret = rb_allocate_pages(cpu_buffer, nr_pages);
if (ret < 0)
goto fail_free_reader;
cpu_buffer->head_page
= list_entry(cpu_buffer->pages, struct buffer_page, list);
cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
rb_head_page_activate(cpu_buffer);
return cpu_buffer;
fail_free_reader:
free_buffer_page(cpu_buffer->reader_page);
fail_free_buffer:
kfree(cpu_buffer);
return NULL;
}
static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *head = cpu_buffer->pages;
struct buffer_page *bpage, *tmp;
irq_work_sync(&cpu_buffer->irq_work.work);
free_buffer_page(cpu_buffer->reader_page);
if (head) {
rb_head_page_deactivate(cpu_buffer);
list_for_each_entry_safe(bpage, tmp, head, list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
bpage = list_entry(head, struct buffer_page, list);
free_buffer_page(bpage);
}
free_page((unsigned long)cpu_buffer->free_page);
kfree(cpu_buffer);
}
/**
* __ring_buffer_alloc - allocate a new ring_buffer
* @size: the size in bytes per cpu that is needed.
* @flags: attributes to set for the ring buffer.
* @key: ring buffer reader_lock_key.
*
* Currently the only flag that is available is the RB_FL_OVERWRITE
* flag. This flag means that the buffer will overwrite old data
* when the buffer wraps. If this flag is not set, the buffer will
* drop data when the tail hits the head.
*/
struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
struct lock_class_key *key)
{
struct trace_buffer *buffer;
long nr_pages;
int bsize;
int cpu;
int ret;
/* keep it in its own cache line */
buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
GFP_KERNEL);
if (!buffer)
return NULL;
if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
goto fail_free_buffer;
/* Default buffer page size - one system page */
buffer->subbuf_order = 0;
buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
/* Max payload is buffer page size - header (8bytes) */
buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
buffer->flags = flags;
buffer->clock = trace_clock_local;
buffer->reader_lock_key = key;
init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
init_waitqueue_head(&buffer->irq_work.waiters);
/* need at least two pages */
if (nr_pages < 2)
nr_pages = 2;
buffer->cpus = nr_cpu_ids;
bsize = sizeof(void *) * nr_cpu_ids;
buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
GFP_KERNEL);
if (!buffer->buffers)
goto fail_free_cpumask;
cpu = raw_smp_processor_id();
cpumask_set_cpu(cpu, buffer->cpumask);
buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
if (!buffer->buffers[cpu])
goto fail_free_buffers;
ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
if (ret < 0)
goto fail_free_buffers;
mutex_init(&buffer->mutex);
return buffer;
fail_free_buffers:
for_each_buffer_cpu(buffer, cpu) {
if (buffer->buffers[cpu])
rb_free_cpu_buffer(buffer->buffers[cpu]);
}
kfree(buffer->buffers);
fail_free_cpumask:
free_cpumask_var(buffer->cpumask);
fail_free_buffer:
kfree(buffer);
return NULL;
}
EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
/**
* ring_buffer_free - free a ring buffer.
* @buffer: the buffer to free.
*/
void
ring_buffer_free(struct trace_buffer *buffer)
{
int cpu;
cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
irq_work_sync(&buffer->irq_work.work);
for_each_buffer_cpu(buffer, cpu)
rb_free_cpu_buffer(buffer->buffers[cpu]);
kfree(buffer->buffers);
free_cpumask_var(buffer->cpumask);
kfree(buffer);
}
EXPORT_SYMBOL_GPL(ring_buffer_free);
void ring_buffer_set_clock(struct trace_buffer *buffer,
u64 (*clock)(void))
{
buffer->clock = clock;
}
void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
{
buffer->time_stamp_abs = abs;
}
bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
{
return buffer->time_stamp_abs;
}
static inline unsigned long rb_page_entries(struct buffer_page *bpage)
{
return local_read(&bpage->entries) & RB_WRITE_MASK;
}
static inline unsigned long rb_page_write(struct buffer_page *bpage)
{
return local_read(&bpage->write) & RB_WRITE_MASK;
}
static bool
rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
{
struct list_head *tail_page, *to_remove, *next_page;
struct buffer_page *to_remove_page, *tmp_iter_page;
struct buffer_page *last_page, *first_page;
unsigned long nr_removed;
unsigned long head_bit;
int page_entries;
head_bit = 0;
raw_spin_lock_irq(&cpu_buffer->reader_lock);
atomic_inc(&cpu_buffer->record_disabled);
/*
* We don't race with the readers since we have acquired the reader
* lock. We also don't race with writers after disabling recording.
* This makes it easy to figure out the first and the last page to be
* removed from the list. We unlink all the pages in between including
* the first and last pages. This is done in a busy loop so that we
* lose the least number of traces.
* The pages are freed after we restart recording and unlock readers.
*/
tail_page = &cpu_buffer->tail_page->list;
/*
* tail page might be on reader page, we remove the next page
* from the ring buffer
*/
if (cpu_buffer->tail_page == cpu_buffer->reader_page)
tail_page = rb_list_head(tail_page->next);
to_remove = tail_page;
/* start of pages to remove */
first_page = list_entry(rb_list_head(to_remove->next),
struct buffer_page, list);
for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
to_remove = rb_list_head(to_remove)->next;
head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
}
/* Read iterators need to reset themselves when some pages removed */
cpu_buffer->pages_removed += nr_removed;
next_page = rb_list_head(to_remove)->next;
/*
* Now we remove all pages between tail_page and next_page.
* Make sure that we have head_bit value preserved for the
* next page
*/
tail_page->next = (struct list_head *)((unsigned long)next_page |
head_bit);
next_page = rb_list_head(next_page);
next_page->prev = tail_page;
/* make sure pages points to a valid page in the ring buffer */
cpu_buffer->pages = next_page;
/* update head page */
if (head_bit)
cpu_buffer->head_page = list_entry(next_page,
struct buffer_page, list);
/* pages are removed, resume tracing and then free the pages */
atomic_dec(&cpu_buffer->record_disabled);
raw_spin_unlock_irq(&cpu_buffer->reader_lock);
RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
/* last buffer page to remove */
last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
list);
tmp_iter_page = first_page;
do {
cond_resched();
to_remove_page = tmp_iter_page;
rb_inc_page(&tmp_iter_page);
/* update the counters */
page_entries = rb_page_entries(to_remove_page);
if (page_entries) {
/*
* If something was added to this page, it was full
* since it is not the tail page. So we deduct the
* bytes consumed in ring buffer from here.
* Increment overrun to account for the lost events.
*/
local_add(page_entries, &cpu_buffer->overrun);
local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
local_inc(&cpu_buffer->pages_lost);
}
/*
* We have already removed references to this list item, just
* free up the buffer_page and its page
*/
free_buffer_page(to_remove_page);
nr_removed--;
} while (to_remove_page != last_page);
RB_WARN_ON(cpu_buffer, nr_removed);
return nr_removed == 0;
}
static bool
rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
struct list_head *pages = &cpu_buffer->new_pages;
unsigned long flags;
bool success;
int retries;
/* Can be called at early boot up, where interrupts must not been enabled */
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
/*
* We are holding the reader lock, so the reader page won't be swapped
* in the ring buffer. Now we are racing with the writer trying to
* move head page and the tail page.
* We are going to adapt the reader page update process where:
* 1. We first splice the start and end of list of new pages between
* the head page and its previous page.
* 2. We cmpxchg the prev_page->next to point from head page to the
* start of new pages list.
* 3. Finally, we update the head->prev to the end of new list.
*
* We will try this process 10 times, to make sure that we don't keep
* spinning.
*/
retries = 10;
success = false;
while (retries--) {
struct list_head *head_page, *prev_page;
struct list_head *last_page, *first_page;
struct list_head *head_page_with_bit;
struct buffer_page *hpage = rb_set_head_page(cpu_buffer);
if (!hpage)
break;
head_page = &hpage->list;
prev_page = head_page->prev;
first_page = pages->next;
last_page = pages->prev;
head_page_with_bit = (struct list_head *)
((unsigned long)head_page | RB_PAGE_HEAD);
last_page->next = head_page_with_bit;
first_page->prev = prev_page;
/* caution: head_page_with_bit gets updated on cmpxchg failure */
if (try_cmpxchg(&prev_page->next,
&head_page_with_bit, first_page)) {
/*
* yay, we replaced the page pointer to our new list,
* now, we just have to update to head page's prev
* pointer to point to end of list
*/
head_page->prev = last_page;
success = true;
break;
}
}
if (success)
INIT_LIST_HEAD(pages);
/*
* If we weren't successful in adding in new pages, warn and stop
* tracing
*/
RB_WARN_ON(cpu_buffer, !success);
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
/* free pages if they weren't inserted */
if (!success) {
struct buffer_page *bpage, *tmp;
list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
}
return success;
}
static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
bool success;
if (cpu_buffer->nr_pages_to_update > 0)
success = rb_insert_pages(cpu_buffer);
else
success = rb_remove_pages(cpu_buffer,
-cpu_buffer->nr_pages_to_update);
if (success)
cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
}
static void update_pages_handler(struct work_struct *work)
{
struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
struct ring_buffer_per_cpu, update_pages_work);
rb_update_pages(cpu_buffer);
complete(&cpu_buffer->update_done);
}
/**
* ring_buffer_resize - resize the ring buffer
* @buffer: the buffer to resize.
* @size: the new size.
* @cpu_id: the cpu buffer to resize
*
* Minimum size is 2 * buffer->subbuf_size.
*
* Returns 0 on success and < 0 on failure.
*/
int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
int cpu_id)
{
struct ring_buffer_per_cpu *cpu_buffer;
unsigned long nr_pages;
int cpu, err;
/*
* Always succeed at resizing a non-existent buffer:
*/
if (!buffer)
return 0;
/* Make sure the requested buffer exists */
if (cpu_id != RING_BUFFER_ALL_CPUS &&
!cpumask_test_cpu(cpu_id, buffer->cpumask))
return 0;
nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
/* we need a minimum of two pages */
if (nr_pages < 2)
nr_pages = 2;
/* prevent another thread from changing buffer sizes */
mutex_lock(&buffer->mutex);
atomic_inc(&buffer->resizing);
if (cpu_id == RING_BUFFER_ALL_CPUS) {
/*
* Don't succeed if resizing is disabled, as a reader might be
* manipulating the ring buffer and is expecting a sane state while
* this is true.
*/
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
if (atomic_read(&cpu_buffer->resize_disabled)) {
err = -EBUSY;
goto out_err_unlock;
}
}
/* calculate the pages to update */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
cpu_buffer->nr_pages_to_update = nr_pages -
cpu_buffer->nr_pages;
/*
* nothing more to do for removing pages or no update
*/
if (cpu_buffer->nr_pages_to_update <= 0)
continue;
/*
* to add pages, make sure all new pages can be
* allocated without receiving ENOMEM
*/
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
&cpu_buffer->new_pages)) {
/* not enough memory for new pages */
err = -ENOMEM;
goto out_err;
}
cond_resched();
}
cpus_read_lock();
/*
* Fire off all the required work handlers
* We can't schedule on offline CPUs, but it's not necessary
* since we can change their buffer sizes without any race.
*/
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
if (!cpu_buffer->nr_pages_to_update)
continue;
/* Can't run something on an offline CPU. */
if (!cpu_online(cpu)) {
rb_update_pages(cpu_buffer);
cpu_buffer->nr_pages_to_update = 0;
} else {
/* Run directly if possible. */
migrate_disable();
if (cpu != smp_processor_id()) {
migrate_enable();
schedule_work_on(cpu,
&cpu_buffer->update_pages_work);
} else {
update_pages_handler(&cpu_buffer->update_pages_work);
migrate_enable();
}
}
}
/* wait for all the updates to complete */
for_each_buffer_cpu(buffer, cpu) {
cpu_buffer = buffer->buffers[cpu];
if (!cpu_buffer->nr_pages_to_update)
continue;
if (cpu_online(cpu))
wait_for_completion(&cpu_buffer->update_done);
cpu_buffer->nr_pages_to_update = 0;
}
cpus_read_unlock();
} else {
cpu_buffer = buffer->buffers[cpu_id];
if (nr_pages == cpu_buffer->nr_pages)
goto out;
/*
* Don't succeed if resizing is disabled, as a reader might be
* manipulating the ring buffer and is expecting a sane state while
* this is true.
*/
if (atomic_read(&cpu_buffer->resize_disabled)) {
err = -EBUSY;
goto out_err_unlock;
}
cpu_buffer->nr_pages_to_update = nr_pages -
cpu_buffer->nr_pages;
INIT_LIST_HEAD(&cpu_buffer->new_pages);
if (cpu_buffer->nr_pages_to_update > 0 &&
__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
&cpu_buffer->new_pages)) {
err = -ENOMEM;
goto out_err;
}
cpus_read_lock();
/* Can't run something on an offline CPU. */
if (!cpu_online(cpu_id))
rb_update_pages(cpu_buffer);
else {
/* Run directly if possible. */
migrate_disable();
if (cpu_id == smp_processor_id()) {
rb_update_pages(cpu_buffer);
migrate_enable();
} else {
migrate_enable();
schedule_work_on(cpu_id,
&cpu_buffer->update_pages_work);
wait_for_completion(&cpu_buffer->update_done);
}
}
cpu_buffer->nr_pages_to_update = 0;
cpus_read_unlock();
}
out:
/*
* The ring buffer resize can happen with the ring buffer
* enabled, so that the update disturbs the tracing as little
* as possible. But if the buffer is disabled, we do not need
* to worry about that, and we can take the time to verify
* that the buffer is not corrupt.
*/
if (atomic_read(&buffer->record_disabled)) {
atomic_inc(&buffer->record_disabled);
/*
* Even though the buffer was disabled, we must make sure
* that it is truly disabled before calling rb_check_pages.
* There could have been a race between checking
* record_disable and incrementing it.
*/
synchronize_rcu();
for_each_buffer_cpu(buffer, cpu) {
unsigned long flags;
cpu_buffer = buffer->buffers[cpu];
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
rb_check_pages(cpu_buffer);
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
}
atomic_dec(&buffer->record_disabled);
}
atomic_dec(&buffer->resizing);
mutex_unlock(&buffer->mutex);
return 0;
out_err:
for_each_buffer_cpu(buffer, cpu) {
struct buffer_page *bpage, *tmp;
cpu_buffer = buffer->buffers[cpu];
cpu_buffer->nr_pages_to_update = 0;
if (list_empty(&cpu_buffer->new_pages))
continue;
list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
list) {
list_del_init(&bpage->list);
free_buffer_page(bpage);
}
}
out_err_unlock:
atomic_dec(&buffer->resizing);
mutex_unlock(&buffer->mutex);
return err;
}
EXPORT_SYMBOL_GPL(ring_buffer_resize);
void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
{
mutex_lock(&buffer->mutex);
if (val)
buffer->flags |= RB_FL_OVERWRITE;
else
buffer->flags &= ~RB_FL_OVERWRITE;
mutex_unlock(&buffer->mutex);
}
EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
{
return bpage->page->data + index;
}
static __always_inline struct ring_buffer_event *
rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
{
return __rb_page_index(cpu_buffer->reader_page,
cpu_buffer->reader_page->read);
}
static struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter *iter)
{
struct ring_buffer_event *event;
struct buffer_page *iter_head_page = iter->head_page;
unsigned long commit;
unsigned length;
if (iter->head != iter->next_event)
return iter->event;
/*
* When the writer goes across pages, it issues a cmpxchg which
* is a mb(), which will synchronize with the rmb here.
* (see rb_tail_page_update() and __rb_reserve_next())
*/
commit = rb_page_commit(iter_head_page);
smp_rmb();
/* An event needs to be at least 8 bytes in size */
if (iter->head > commit - 8)
goto reset;
event = __rb_page_index(iter_head_page, iter->head);
length = rb_event_length(event);
/*
* READ_ONCE() doesn't work on functions and we don't want the
* compiler doing any crazy optimizations with length.
*/
barrier();
if ((iter->head + length) > commit || length > iter->event_size)
/* Writer corrupted the read? */
goto reset;
memcpy(iter->event, event, length);
/*
* If the page stamp is still the same after this rmb() then the
* event was safely copied without the writer entering the page.
*/
smp_rmb();
/* Make sure the page didn't change since we read this */
if (iter->page_stamp != iter_head_page->page->time_stamp ||
commit > rb_page_commit(iter_head_page))
goto reset;
iter->next_event = iter->head + length;
return iter->event;
reset:
/* Reset to the beginning */
iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
iter->head = 0;
iter->next_event = 0;
iter->missed_events = 1;
return NULL;
}
/* Size is determined by what has been committed */
static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
{
return rb_page_commit(bpage) & ~RB_MISSED_MASK;
}
static __always_inline unsigned
rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
{
return rb_page_commit(cpu_buffer->commit_page);
}
static __always_inline unsigned
rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event)
{
unsigned long addr = (unsigned long)event;
addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1;
return addr - BUF_PAGE_HDR_SIZE;
}
static void rb_inc_iter(struct ring_buffer_iter *iter)
{
struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
/*
* The iterator could be on the reader page (it starts there).
* But the head could have moved, since the reader was
* found. Check for this case and assign the iterator
* to the head page instead of next.
*/
if (iter->head_page == cpu_buffer->reader_page)
iter->head_page = rb_set_head_page(cpu_buffer);
else
rb_inc_page(&iter->head_page);
iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
iter->head = 0;
iter->next_event = 0;
}
/*
* rb_handle_head_page - writer hit the head page
*
* Returns: +1 to retry page
* 0 to continue
* -1 on error
*/
static int
rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
struct buffer_page *tail_page,
struct buffer_page *next_page)
{
struct buffer_page *new_head;
int entries;
int type;
int ret;
entries = rb_page_entries(next_page);
/*
* The hard part is here. We need to move the head
* forward, and protect against both readers on
* other CPUs and writers coming in via interrupts.
*/
type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
RB_PAGE_HEAD);
/*
* type can be one of four:
* NORMAL - an interrupt already moved it for us
* HEAD - we are the first to get here.
* UPDATE - we are the interrupt interrupting
* a current move.
* MOVED - a reader on another CPU moved the next
* pointer to its reader page. Give up
* and try again.
*/
switch (type) {
case RB_PAGE_HEAD:
/*
* We changed the head to UPDATE, thus
* it is our responsibility to update
* the counters.
*/
local_add(entries, &cpu_buffer->overrun);
local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
local_inc(&cpu_buffer->pages_lost);
/*
* The entries will be zeroed out when we move the
* tail page.
*/
/* still more to do */
break;
case RB_PAGE_UPDATE:
/*
* This is an interrupt that interrupt the
* previous update. Still more to do.
*/
break;
case RB_PAGE_NORMAL:
/*
* An interrupt came in before the update
* and processed this for us.
* Nothing left to do.
*/
return 1;
case RB_PAGE_MOVED:
/*
* The reader is on another CPU and just did
* a swap with our next_page.
* Try again.
*/
return 1;
default:
RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
return -1;
}
/*
* Now that we are here, the old head pointer is
* set to UPDATE. This will keep the reader from
* swapping the head page with the reader page.
* The reader (on another CPU) will spin till
* we are finished.
*
* We just need to protect against interrupts
* doing the job. We will set the next pointer
* to HEAD. After that, we set the old pointer
* to NORMAL, but only if it was HEAD before.
* otherwise we are an interrupt, and only
* want the outer most commit to reset it.
*/
new_head = next_page;
rb_inc_page(&new_head);
ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
RB_PAGE_NORMAL);
/*
* Valid returns are:
* HEAD - an interrupt came in and already set it.
* NORMAL - One of two things:
* 1) We really set it.
* 2) A bunch of interrupts came in and moved
* the page forward again.
*/
switch (ret) {
case RB_PAGE_HEAD:
case RB_PAGE_NORMAL:
/* OK */
break;
default:
RB_WARN_ON(cpu_buffer, 1);
return -1;
}
/*
* It is possible that an interrupt came in,
* set the head up, then more interrupts came in
* and moved it again. When we get back here,
* the page would have been set to NORMAL but we
* just set it back to HEAD.
*
* How do you detect this? Well, if that happened
* the tail page would have moved.
*/
if (ret == RB_PAGE_NORMAL) {
struct buffer_page *buffer_tail_page;
buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
/*
* If the tail had moved passed next, then we need
* to reset the pointer.
*/
if (buffer_tail_page != tail_page &&
buffer_tail_page != next_page)
rb_head_page_set_normal(cpu_buffer, new_head,
next_page,
RB_PAGE_HEAD);
}
/*
* If this was the outer most commit (the one that
* changed the original pointer from HEAD to UPDATE),
* then it is up to us to reset it to NORMAL.
*/
if (type == RB_PAGE_HEAD) {
ret = rb_head_page_set_normal(cpu_buffer, next_page,
tail_page,
RB_PAGE_UPDATE);
if (RB_WARN_ON(cpu_buffer,
ret != RB_PAGE_UPDATE))
return -1;
}
return 0;
}
static inline void
rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long tail, struct rb_event_info *info)
{
unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
struct buffer_page *tail_page = info->tail_page;
struct ring_buffer_event *event;
unsigned long length = info->length;
/*
* Only the event that crossed the page boundary
* must fill the old tail_page with padding.
*/
if (tail >= bsize) {
/*
* If the page was filled, then we still need
* to update the real_end. Reset it to zero
* and the reader will ignore it.
*/
if (tail == bsize)
tail_page->real_end = 0;
local_sub(length, &tail_page->write);
return;
}
event = __rb_page_index(tail_page, tail);
/*
* Save the original length to the meta data.
* This will be used by the reader to add lost event
* counter.
*/
tail_page->real_end = tail;
/*
* If this event is bigger than the minimum size, then
* we need to be careful that we don't subtract the
* write counter enough to allow another writer to slip
* in on this page.
* We put in a discarded commit instead, to make sure
* that this space is not used again, and this space will
* not be accounted into 'entries_bytes'.
*
* If we are less than the minimum size, we don't need to
* worry about it.
*/
if (tail > (bsize - RB_EVNT_MIN_SIZE)) {
/* No room for any events */
/* Mark the rest of the page with padding */
rb_event_set_padding(event);
/* Make sure the padding is visible before the write update */
smp_wmb();
/* Set the write back to the previous setting */
local_sub(length, &tail_page->write);
return;
}
/* Put in a discarded event */
event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE;
event->type_len = RINGBUF_TYPE_PADDING;
/* time delta must be non zero */
event->time_delta = 1;
/* account for padding bytes */
local_add(bsize - tail, &cpu_buffer->entries_bytes);
/* Make sure the padding is visible before the tail_page->write update */
smp_wmb();
/* Set write to end of buffer */
length = (tail + length) - bsize;
local_sub(length, &tail_page->write);
}
static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
/*
* This is the slow path, force gcc not to inline it.
*/
static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
unsigned long tail, struct rb_event_info *info)
{
struct buffer_page *tail_page = info->tail_page;
struct buffer_page *commit_page = cpu_buffer->commit_page;
struct trace_buffer *buffer = cpu_buffer->buffer;
struct buffer_page *next_page;
int ret;
next_page = tail_page;
rb_inc_page(&next_page);
/*
* If for some reason, we had an interrupt storm that made
* it all the way around the buffer, bail, and warn
* about it.
*/
if (unlikely(next_page == commit_page)) {
local_inc(&cpu_buffer->commit_overrun);
goto out_reset;
}
/*
* This is where the fun begins!
*
* We are fighting against races between a reader that
* could be on another CPU trying to swap its reader
* page with the buffer head.
*
* We are also fighting against interrupts coming in and
* moving the head or tail on us as well.
*
* If the next page is the head page then we have filled
* the buffer, unless the commit page is still on the
* reader page.
*/
if (rb_is_head_page(next_page, &tail_page->list)) {
/*
* If the commit is not on the reader page, then
* move the header page.
*/
if (!rb_is_reader_page(cpu_buffer->commit_page)) {
/*
* If we are not in overwrite mode,
* this is easy, just stop here.
*/
if (!(buffer->flags & RB_FL_OVERWRITE)) {
local_inc(&cpu_buffer->dropped_events);
goto out_reset;
}
ret = rb_handle_head_page(cpu_buffer,
tail_page,
next_page);
if (ret < 0)
goto out_reset;
if (ret)
goto out_again;
} else {
/*
* We need to be careful here too. The
* commit page could still be on the reader
* page. We could have a small buffer, and
* have filled up the buffer with events
* from interrupts and such, and wrapped.
*
* Note, if the tail page is also on the
* reader_page, we let it move out.
*/
if (unlikely((cpu_buffer->commit_page !=
cpu_buffer->tail_page) &&
(cpu_buffer->commit_page ==
cpu_buffer->reader_page))) {
local_inc(&cpu_buffer->commit_overrun);
goto out_reset;
}
}
}
rb_tail_page_update(cpu_buffer, tail_page, next_page);
out_again:
rb_reset_tail(cpu_buffer, tail, info);
/* Commit what we have for now. */
rb_end_commit(cpu_buffer);
/* rb_end_commit() decs committing */
local_inc(&cpu_buffer->committing);
/* fail and let the caller try again */
return ERR_PTR(-EAGAIN);
out_reset:
/* reset write */
rb_reset_tail(cpu_buffer, tail, info);
return NULL;
}
/* Slow path */
static struct ring_buffer_event *
rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event, u64 delta, bool abs)
{
if (abs)
event->type_len = RINGBUF_TYPE_TIME_STAMP;
else
event->type_len = RINGBUF_TYPE_TIME_EXTEND;
/* Not the first event on the page, or not delta? */
if (abs || rb_event_index(cpu_buffer, event)) {
event->time_delta = delta & TS_MASK;
event->array[0] = delta >> TS_SHIFT;
} else {
/* nope, just zero it */
event->time_delta = 0;
event->array[0] = 0;
}
return skip_time_extend(event);
}
#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
static inline bool sched_clock_stable(void)
{
return true;
}
#endif
static void
rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
struct rb_event_info *info)
{
u64 write_stamp;
WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
(unsigned long long)info->delta,
(unsigned long long)info->ts,
(unsigned long long)info->before,
(unsigned long long)info->after,
(unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}),
sched_clock_stable() ? "" :
"If you just came from a suspend/resume,\n"
"please switch to the trace global clock:\n"
" echo global > /sys/kernel/tracing/trace_clock\n"
"or add trace_clock=global to the kernel command line\n");
}
static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event **event,
struct rb_event_info *info,
u64 *delta,
unsigned int *length)
{
bool abs = info->add_timestamp &
(RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
if (unlikely(info->delta > (1ULL << 59))) {
/*
* Some timers can use more than 59 bits, and when a timestamp
* is added to the buffer, it will lose those bits.
*/
if (abs && (info->ts & TS_MSB)) {
info->delta &= ABS_TS_MASK;
/* did the clock go backwards */
} else if (info->before == info->after && info->before > info->ts) {
/* not interrupted */
static int once;
/*
* This is possible with a recalibrating of the TSC.
* Do not produce a call stack, but just report it.
*/
if (!once) {
once++;
pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
info->before, info->ts);
}
} else
rb_check_timestamp(cpu_buffer, info);
if (!abs)
info->delta = 0;
}
*event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs);
*length -= RB_LEN_TIME_EXTEND;
*delta = 0;
}
/**
* rb_update_event - update event type and data
* @cpu_buffer: The per cpu buffer of the @event
* @event: the event to update
* @info: The info to update the @event with (contains length and delta)
*
* Update the type and data fields of the @event. The length
* is the actual size that is written to the ring buffer,
* and with this, we can determine what to place into the
* data field.
*/
static void
rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event,
struct rb_event_info *info)
{
unsigned length = info->length;
u64 delta = info->delta;
unsigned int nest = local_read(&cpu_buffer->committing) - 1;
if (!WARN_ON_ONCE(nest >= MAX_NEST))
cpu_buffer->event_stamp[nest] = info->ts;
/*
* If we need to add a timestamp, then we
* add it to the start of the reserved space.
*/
if (unlikely(info->add_timestamp))
rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
event->time_delta = delta;
length -= RB_EVNT_HDR_SIZE;
if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
event->type_len = 0;
event->array[0] = length;
} else
event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
}
static unsigned rb_calculate_event_length(unsigned length)
{
struct ring_buffer_event event; /* Used only for sizeof array */
/* zero length can cause confusions */
if (!length)
length++;
if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
length += sizeof(event.array[0]);
length += RB_EVNT_HDR_SIZE;
length = ALIGN(length, RB_ARCH_ALIGNMENT);
/*
* In case the time delta is larger than the 27 bits for it
* in the header, we need to add a timestamp. If another
* event comes in when trying to discard this one to increase
* the length, then the timestamp will be added in the allocated
* space of this event. If length is bigger than the size needed
* for the TIME_EXTEND, then padding has to be used. The events
* length must be either RB_LEN_TIME_EXTEND, or greater than or equal
* to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
* As length is a multiple of 4, we only need to worry if it
* is 12 (RB_LEN_TIME_EXTEND + 4).
*/
if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
length += RB_ALIGNMENT;
return length;
}
static inline bool
rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
struct ring_buffer_event *event)
{
unsigned long new_index, old_index;
struct buffer_page *bpage;
unsigned long addr;
new_index = rb_event_index(cpu_buffer, event);
old_index = new_index + rb_event_ts_length(event);
addr = (unsigned long)event;
addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
bpage = READ_ONCE(cpu_buffer->tail_page);
/*
* Make sure the tail_page is still the same and
* the next write location is the end of this event
*/
if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
unsigned long write_mask =
local_read(&bpage->write) & ~RB_WRITE_MASK;
unsigned long event_length = rb_event_length(event);
/*
* For the before_stamp to be different than the write_stamp
* to make sure that the next event adds an absolute
* value and does not rely on the saved write stamp, which
* is now going to be bogus.
*
* By setting the before_stamp to zero, the next event
* is not going to use the write_stamp and will instead
* create an absolute timestamp. This means there's no
* reason to update the wirte_stamp!
*/
rb_time_set(&cpu_buffer->before_stamp, 0);
/*
* If an event were to come in now, it would see that the
* write_stamp and the before_stamp are different, and assume
* that this event just added itself before updating
* the write stamp. The interrupting event will fix the
* write stamp for us, and use an absolute timestamp.
*/
/*
* This is on the tail page. It is possible that
* a write could come in and move the tail page
* and write to the next page. That is fine
* because we just shorten what is on this page.
*/
old_index += write_mask;
new_index += write_mask;
/* caution: old_index gets updated on cmpxchg failure */
if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) {
/* update counters */
local_sub(event_length, &cpu_buffer->entries_bytes);
return true;
}
}
/* could not discard */
return false;
}
static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
local_inc(&cpu_buffer->committing);
local_inc(&cpu_buffer->commits);
}
static __always_inline void
rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long max_count;
/*
* We only race with interrupts and NMIs on this CPU.
* If we own the commit event, then we can commit
* all others that interrupted us, since the interruptions
* are in stack format (they finish before they come
* back to us). This allows us to do a simple loop to
* assign the commit to the tail.
*/
again:
max_count = cpu_buffer->nr_pages * 100;
while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
if (RB_WARN_ON(cpu_buffer, !(--max_count)))
return;
if (RB_WARN_ON(cpu_buffer,
rb_is_reader_page(cpu_buffer->tail_page)))
return;
/*
* No need for a memory barrier here, as the update
* of the tail_page did it for this page.
*/
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
rb_inc_page(&cpu_buffer->commit_page);
/* add barrier to keep gcc from optimizing too much */
barrier();
}
while (rb_commit_index(cpu_buffer) !=
rb_page_write(cpu_buffer->commit_page)) {
/* Make sure the readers see the content of what is committed. */
smp_wmb();
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
RB_WARN_ON(cpu_buffer,
local_read(&cpu_buffer->commit_page->page->commit) &
~RB_WRITE_MASK);
barrier();
}
/* again, keep gcc from optimizing */
barrier();
/*
* If an interrupt came in just after the first while loop
* and pushed the tail page forward, we will be left with
* a dangling commit that will never go forward.
*/
if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
goto again;
}
static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long commits;
if (RB_WARN_ON(cpu_buffer,
!local_read(&cpu_buffer->committing)))
return;
again:
commits = local_read(&cpu_buffer->commits);
/* synchronize with interrupts */
barrier();
if (local_read(&cpu_buffer->committing) == 1)
rb_set_commit_to_write(cpu_buffer);
local_dec(&cpu_buffer->committing);
/* synchronize with interrupts */
barrier();
/*
* Need to account for interrupts coming in between the
* updating of the commit page and the clearing of the
* committing counter.
*/
if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
!local_read(&cpu_buffer->committing)) {
local_inc(&cpu_buffer->committing);
goto again;
}
}
static inline void rb_event_discard(struct ring_buffer_event *event)
{
if (extended_time(event))
event = skip_time_extend(event);
/* array[0] holds the actual length for the discarded event */
event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
event->type_len = RINGBUF_TYPE_PADDING;
/* time delta must be non zero */
if (!event->time_delta)
event->time_delta = 1;
}
static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
{
local_inc(&cpu_buffer->entries);
rb_end_commit(cpu_buffer);
}
static __always_inline void
rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
{
if (buffer->irq_work.waiters_pending) {
buffer->irq_work.waiters_pending = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&buffer->irq_work.work);
}
if (cpu_buffer->irq_work.waiters_pending) {
cpu_buffer->irq_work.waiters_pending = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&cpu_buffer->irq_work.work);
}
if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
return;
if (cpu_buffer->reader_page == cpu_buffer->commit_page)
return;
if (!cpu_buffer->irq_work.full_waiters_pending)
return;
cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
return;
cpu_buffer->irq_work.wakeup_full = true;
cpu_buffer->irq_work.full_waiters_pending = false;
/* irq_work_queue() supplies it's own memory barriers */
irq_work_queue(&cpu_buffer->irq_work.work);
}
#ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
# define do_ring_buffer_record_recursion() \
do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
#else
# define do_ring_buffer_record_recursion() do { } while (0)
#endif
/*
* The lock and unlock are done within a preempt disable section.
* The current_context per_cpu variable can only be modified
* by the current task between lock and unlock. But it can
* be modified more than once via an interrupt. To pass this
* information from the lock to the unlock without having to
* access the 'in_interrupt()' functions again (which do show
* a bit of overhead in something as critical as function tracing,
* we use a bitmask trick.
*
* bit 1 = NMI context
* bit 2 = IRQ context
* bit 3 = SoftIRQ context
* bit 4 = normal context.
*
* This works because this is the order of contexts that can
* preempt other contexts. A SoftIRQ never preempts an IRQ
* context.
*
* When the context is determined, the corresponding bit is
* checked and set (if it was set, then a recursion of that context
* happened).
*
* On unlock, we need to clear this bit. To do so, just subtract
* 1 from the current_context and AND it to itself.
*
* (binary)
* 101 - 1 = 100
* 101 & 100 = 100 (clearing bit zero)
*
* 1010 - 1 = 1001
* 1010 & 1001 = 1000 (clearing bit 1)
*
* The least significant bit can be cleared this way, and it
* just so happens that it is the same bit corresponding to
* the current context.
*
* Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
* is set when a recursion is detected at the current context, and if
* the TRANSITION bit is already set, it will fail the recursion.
* This is needed because there's a lag between the changing of
* interrupt context and updating the preempt count. In this case,
* a false positive will be found. To handle this, one extra recursion
* is allowed, and this is done by the TRANSITION bit. If the TRANSITION
* bit is already set, then it is considered a recursion and the function
* ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
*
* On the trace_recursive_unlock(), the TRANSITION bit will be the first
* to be cleared. Even if it wasn't the context that set it. That is,
* if an interrupt comes in while NORMAL bit is set and the ring buffer
* is called before preempt_count() is updated, since the check will
* be on the NORMAL bit, the TRANSITION bit will then be set. If an
* NMI then comes in, it will set the NMI bit, but when the NMI code
* does the trace_recursive_unlock() it will clear the TRANSITION bit
* and leave the NMI bit set. But this is fine, because the interrupt
* code that set the TRANSITION bit will then clear the NMI bit when it
* calls trace_recursive_unlock(). If another NMI comes in, it will
* set the TRANSITION bit and continue.
*
* Note: The TRANSITION bit only handles a single transition between context.
*/
static __always_inline bool
trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned int val = cpu_buffer->current_context;
int bit = interrupt_context_level();
bit = RB_CTX_NORMAL - bit;
if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
/*
* It is possible that this was called by transitioning
* between interrupt context, and preempt_count() has not
* been updated yet. In this case, use the TRANSITION bit.
*/
bit = RB_CTX_TRANSITION;
if (val & (1 << (bit + cpu_buffer->nest))) {
do_ring_buffer_record_recursion();
return true;
}
}
val |= (1 << (bit + cpu_buffer->nest));
cpu_buffer->current_context = val;
return false;
}
static __always_inline void
trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
{
cpu_buffer->current_context &=
cpu_buffer->current_context - (1 << cpu_buffer->nest);
}
/* The recursive locking above uses 5 bits */
#define NESTED_BITS 5
/**
* ring_buffer_nest_start - Allow to trace while nested
* @buffer: The ring buffer to modify
*
* The ring buffer has a safety mechanism to prevent recursion.
* But there may be a case where a trace needs to be done while
* tracing something else. In this case, calling this function
* will allow this function to nest within a currently active
* ring_buffer_lock_reserve().
*
* Call this function before calling another ring_buffer_lock_reserve() and
* call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
*/
void ring_buffer_nest_start(struct trace_buffer *buffer)
{
struct ring_buffer_per_cpu *cpu_buffer;
int cpu;
/* Enabled by ring_buffer_nest_end() */
preempt_disable_notrace();
cpu = raw_smp_processor_id();
cpu_buffer = buffer->buffers[cpu];
/* This is the shift value for the above recursive locking */
cpu_buffer->nest += NESTED_BITS;
}
/**
* ring_buffer_nest_end - Allow to trace while nested
* @buffer: The ring buffer to modify
*
* Must be called after ring_buffer_nest_start() and after the
* ring_buffer_unlock_commit().
*/
void ring_buffer_nest_end(struct trace_buffer *buffer)
{
struct ring_buffer_per_cpu *cpu_buffer;
int cpu;
/* disabled by ring_buffer_nest_start() */
cpu = raw_smp_processor_id();
cpu_buffer = buffer->buffers[cpu];
/* This is the shift value for the above recursive locking */
cpu_buffer->nest -= NESTED_BITS;
preempt_enable_notrace();
}
/**
* ring_buffer_unlock_commit - commit a reserved
* @buffer: The buffer to commit to
*
* This commits the data to the ring buffer, and releases any locks held.
*
* Must be paired with ring_buffer_lock_reserve.
*/
int ring_buffer_unlock_commit(struct trace_buffer *buffer)
{
struct ring_buffer_per_cpu *cpu_buffer;
int cpu = raw_smp_processor_id();
cpu_buffer = buffer->buffers[cpu];
rb_commit(cpu_buffer);
rb_wakeups(buffer, cpu_buffer);
trace_recursive_unlock(cpu_buffer);
preempt_enable_notrace();
return 0;
}
EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
/* Special value to validate all deltas on a page. */
#define CHECK_FULL_PAGE 1L
#ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
static const char *show_irq_str(int bits)
{
const char *type[] = {
".", // 0
"s", // 1
"h", // 2
"Hs", // 3
"n", // 4
"Ns", // 5
"Nh", // 6
"NHs", // 7
};
return type[bits];
}
/* Assume this is an trace event */
static const char *show_flags(struct ring_buffer_event *event)
{
struct trace_entry *entry;
int bits = 0;
if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
return "X";
entry = ring_buffer_event_data(event);
if (entry->flags & TRACE_FLAG_SOFTIRQ)
bits |= 1;
if (entry->flags & TRACE_FLAG_HARDIRQ)
bits |= 2;
if (entry->flags & TRACE_FLAG_NMI)
bits |= 4;
return show_irq_str(bits);
}
static const char *show_irq(struct ring_buffer_event *event)
{
struct trace_entry *entry;
if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
return "";
entry = ring_buffer_event_data(event);
if (entry->flags & TRACE_FLAG_IRQS_OFF)
return "d";
return "";
}
static const char *show_interrupt_level(void)
{
unsigned long pc = preempt_count();
unsigned char level = 0;
if (pc & SOFTIRQ_OFFSET)
level |= 1;
if (pc & HARDIRQ_MASK)
level |= 2;
if (pc & NMI_MASK)
level |= 4;
return show_irq_str(level);
}
static void dump_buffer_page(struct buffer_data_page *bpage,
struct rb_event_info *info,
unsigned long tail)
{
struct ring_buffer_event *event;
u64 ts, delta;
int e;
ts = bpage->time_stamp;
pr_warn(" [%lld] PAGE TIME STAMP\n", ts);
for (e = 0; e < tail; e += rb_event_length(event)) {
event = (struct ring_buffer_event *)(bpage->data + e);
switch (event->type_len) {
case RINGBUF_TYPE_TIME_EXTEND:
delta = rb_event_time_stamp(event);
ts += delta;
pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n",
e, ts, delta);
break;
case RINGBUF_TYPE_TIME_STAMP:
delta = rb_event_time_stamp(event);
ts = rb_fix_abs_ts(delta, ts);
pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n",
e, ts, delta);
break;
case RINGBUF_TYPE_PADDING:
ts += event->time_delta;
pr_warn(" 0x%x: [%lld] delta:%d PADDING\n",
e, ts, event->time_delta);
break;
case RINGBUF_TYPE_DATA:
ts += event->time_delta;
pr_warn(" 0x%x: [%lld] delta:%d %s%s\n",
e, ts, event->time_delta,
show_flags(event), show_irq(event));
break;
default:
break;
}
}
pr_warn("expected end:0x%lx last event actually ended at:0x%x\n",