| // SPDX-License-Identifier: GPL-2.0-only |
| /* |
| * Copyright 2023 Red Hat |
| */ |
| |
| #include "funnel-workqueue.h" |
| |
| #include <linux/atomic.h> |
| #include <linux/cache.h> |
| #include <linux/completion.h> |
| #include <linux/err.h> |
| #include <linux/kthread.h> |
| #include <linux/percpu.h> |
| |
| #include "funnel-queue.h" |
| #include "logger.h" |
| #include "memory-alloc.h" |
| #include "numeric.h" |
| #include "permassert.h" |
| #include "string-utils.h" |
| |
| #include "completion.h" |
| #include "status-codes.h" |
| |
| static DEFINE_PER_CPU(unsigned int, service_queue_rotor); |
| |
| /** |
| * DOC: Work queue definition. |
| * |
| * There are two types of work queues: simple, with one worker thread, and round-robin, which uses |
| * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly). |
| * Externally, both are represented via the same common sub-structure, though there's actually not |
| * a great deal of overlap between the two types internally. |
| */ |
| struct vdo_work_queue { |
| /* Name of just the work queue (e.g., "cpuQ12") */ |
| char *name; |
| bool round_robin_mode; |
| struct vdo_thread *owner; |
| /* Life cycle functions, etc */ |
| const struct vdo_work_queue_type *type; |
| }; |
| |
| struct simple_work_queue { |
| struct vdo_work_queue common; |
| struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1]; |
| void *private; |
| |
| /* |
| * The fields above are unchanged after setup but often read, and are good candidates for |
| * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned. |
| * The fields below are often modified as we sleep and wake, so we want a separate cache |
| * line for performance. |
| */ |
| |
| /* Any (0 or 1) worker threads waiting for new work to do */ |
| wait_queue_head_t waiting_worker_threads ____cacheline_aligned; |
| /* Hack to reduce wakeup calls if the worker thread is running */ |
| atomic_t idle; |
| |
| /* These are infrequently used so in terms of performance we don't care where they land. */ |
| struct task_struct *thread; |
| /* Notify creator once worker has initialized */ |
| struct completion *started; |
| }; |
| |
| struct round_robin_work_queue { |
| struct vdo_work_queue common; |
| struct simple_work_queue **service_queues; |
| unsigned int num_service_queues; |
| }; |
| |
| static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue) |
| { |
| return ((queue == NULL) ? |
| NULL : container_of(queue, struct simple_work_queue, common)); |
| } |
| |
| static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue) |
| { |
| return ((queue == NULL) ? |
| NULL : |
| container_of(queue, struct round_robin_work_queue, common)); |
| } |
| |
| /* Processing normal completions. */ |
| |
| /* |
| * Dequeue and return the next waiting completion, if any. |
| * |
| * We scan the funnel queues from highest priority to lowest, once; there is therefore a race |
| * condition where a high-priority completion can be enqueued followed by a lower-priority one, and |
| * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict |
| * enforcement of priorities becomes necessary, this function will need fixing. |
| */ |
| static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue) |
| { |
| int i; |
| |
| for (i = queue->common.type->max_priority; i >= 0; i--) { |
| struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]); |
| |
| if (link != NULL) |
| return container_of(link, struct vdo_completion, work_queue_entry_link); |
| } |
| |
| return NULL; |
| } |
| |
| static void enqueue_work_queue_completion(struct simple_work_queue *queue, |
| struct vdo_completion *completion) |
| { |
| VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL, |
| "completion %px (fn %px) to enqueue (%px) is not already queued (%px)", |
| completion, completion->callback, queue, completion->my_queue); |
| if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY) |
| completion->priority = queue->common.type->default_priority; |
| |
| if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority, |
| "priority is in range for queue") != VDO_SUCCESS) |
| completion->priority = 0; |
| |
| completion->my_queue = &queue->common; |
| |
| /* Funnel queue handles the synchronization for the put. */ |
| vdo_funnel_queue_put(queue->priority_lists[completion->priority], |
| &completion->work_queue_entry_link); |
| |
| /* |
| * Due to how funnel queue synchronization is handled (just atomic operations), the |
| * simplest safe implementation here would be to wake-up any waiting threads after |
| * enqueueing each item. Even if the funnel queue is not empty at the time of adding an |
| * item to the queue, the consumer thread may not see this since it is not guaranteed to |
| * have the same view of the queue as a producer thread. |
| * |
| * However, the above is wasteful so instead we attempt to minimize the number of thread |
| * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be |
| * able to determine when the worker thread might be asleep or going to sleep. We use |
| * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for |
| * waking the worker thread, so multiple wakeups aren't tried at once. |
| * |
| * This was tuned for some x86 boxes that were handy; it's untested whether doing the read |
| * first is any better or worse for other platforms, even other x86 configurations. |
| */ |
| smp_mb(); |
| if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1)) |
| return; |
| |
| /* There's a maximum of one thread in this list. */ |
| wake_up(&queue->waiting_worker_threads); |
| } |
| |
| static void run_start_hook(struct simple_work_queue *queue) |
| { |
| if (queue->common.type->start != NULL) |
| queue->common.type->start(queue->private); |
| } |
| |
| static void run_finish_hook(struct simple_work_queue *queue) |
| { |
| if (queue->common.type->finish != NULL) |
| queue->common.type->finish(queue->private); |
| } |
| |
| /* |
| * Wait for the next completion to process, or until kthread_should_stop indicates that it's time |
| * for us to shut down. |
| * |
| * If kthread_should_stop says it's time to stop but we have pending completions return a |
| * completion. |
| * |
| * Also update statistics relating to scheduler interactions. |
| */ |
| static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue) |
| { |
| struct vdo_completion *completion; |
| DEFINE_WAIT(wait); |
| |
| while (true) { |
| prepare_to_wait(&queue->waiting_worker_threads, &wait, |
| TASK_INTERRUPTIBLE); |
| /* |
| * Don't set the idle flag until a wakeup will not be lost. |
| * |
| * Force synchronization between setting the idle flag and checking the funnel |
| * queue; the producer side will do them in the reverse order. (There's still a |
| * race condition we've chosen to allow, because we've got a timeout below that |
| * unwedges us if we hit it, but this may narrow the window a little.) |
| */ |
| atomic_set(&queue->idle, 1); |
| smp_mb(); /* store-load barrier between "idle" and funnel queue */ |
| |
| completion = poll_for_completion(queue); |
| if (completion != NULL) |
| break; |
| |
| /* |
| * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up |
| * above. Otherwise, schedule() will put the thread to sleep and might miss a |
| * wakeup from kthread_stop() call in vdo_finish_work_queue(). |
| */ |
| if (kthread_should_stop()) |
| break; |
| |
| schedule(); |
| |
| /* |
| * Most of the time when we wake, it should be because there's work to do. If it |
| * was a spurious wakeup, continue looping. |
| */ |
| completion = poll_for_completion(queue); |
| if (completion != NULL) |
| break; |
| } |
| |
| finish_wait(&queue->waiting_worker_threads, &wait); |
| atomic_set(&queue->idle, 0); |
| |
| return completion; |
| } |
| |
| static void process_completion(struct simple_work_queue *queue, |
| struct vdo_completion *completion) |
| { |
| if (VDO_ASSERT(completion->my_queue == &queue->common, |
| "completion %px from queue %px marked as being in this queue (%px)", |
| completion, queue, completion->my_queue) == VDO_SUCCESS) |
| completion->my_queue = NULL; |
| |
| vdo_run_completion(completion); |
| } |
| |
| static void service_work_queue(struct simple_work_queue *queue) |
| { |
| run_start_hook(queue); |
| |
| while (true) { |
| struct vdo_completion *completion = poll_for_completion(queue); |
| |
| if (completion == NULL) |
| completion = wait_for_next_completion(queue); |
| |
| if (completion == NULL) { |
| /* No completions but kthread_should_stop() was triggered. */ |
| break; |
| } |
| |
| process_completion(queue, completion); |
| |
| /* |
| * Be friendly to a CPU that has other work to do, if the kernel has told us to. |
| * This speeds up some performance tests; that "other work" might include other VDO |
| * threads. |
| */ |
| if (need_resched()) |
| cond_resched(); |
| } |
| |
| run_finish_hook(queue); |
| } |
| |
| static int work_queue_runner(void *ptr) |
| { |
| struct simple_work_queue *queue = ptr; |
| |
| complete(queue->started); |
| service_work_queue(queue); |
| return 0; |
| } |
| |
| /* Creation & teardown */ |
| |
| static void free_simple_work_queue(struct simple_work_queue *queue) |
| { |
| unsigned int i; |
| |
| for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++) |
| vdo_free_funnel_queue(queue->priority_lists[i]); |
| vdo_free(queue->common.name); |
| vdo_free(queue); |
| } |
| |
| static void free_round_robin_work_queue(struct round_robin_work_queue *queue) |
| { |
| struct simple_work_queue **queue_table = queue->service_queues; |
| unsigned int count = queue->num_service_queues; |
| unsigned int i; |
| |
| queue->service_queues = NULL; |
| |
| for (i = 0; i < count; i++) |
| free_simple_work_queue(queue_table[i]); |
| vdo_free(queue_table); |
| vdo_free(queue->common.name); |
| vdo_free(queue); |
| } |
| |
| void vdo_free_work_queue(struct vdo_work_queue *queue) |
| { |
| if (queue == NULL) |
| return; |
| |
| vdo_finish_work_queue(queue); |
| |
| if (queue->round_robin_mode) |
| free_round_robin_work_queue(as_round_robin_work_queue(queue)); |
| else |
| free_simple_work_queue(as_simple_work_queue(queue)); |
| } |
| |
| static int make_simple_work_queue(const char *thread_name_prefix, const char *name, |
| struct vdo_thread *owner, void *private, |
| const struct vdo_work_queue_type *type, |
| struct simple_work_queue **queue_ptr) |
| { |
| DECLARE_COMPLETION_ONSTACK(started); |
| struct simple_work_queue *queue; |
| int i; |
| struct task_struct *thread = NULL; |
| int result; |
| |
| VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY), |
| "queue priority count %u within limit %u", type->max_priority, |
| VDO_WORK_Q_MAX_PRIORITY); |
| |
| result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue); |
| if (result != VDO_SUCCESS) |
| return result; |
| |
| queue->private = private; |
| queue->started = &started; |
| queue->common.type = type; |
| queue->common.owner = owner; |
| init_waitqueue_head(&queue->waiting_worker_threads); |
| |
| result = vdo_duplicate_string(name, "queue name", &queue->common.name); |
| if (result != VDO_SUCCESS) { |
| vdo_free(queue); |
| return -ENOMEM; |
| } |
| |
| for (i = 0; i <= type->max_priority; i++) { |
| result = vdo_make_funnel_queue(&queue->priority_lists[i]); |
| if (result != VDO_SUCCESS) { |
| free_simple_work_queue(queue); |
| return result; |
| } |
| } |
| |
| thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix, |
| queue->common.name); |
| if (IS_ERR(thread)) { |
| free_simple_work_queue(queue); |
| return (int) PTR_ERR(thread); |
| } |
| |
| queue->thread = thread; |
| |
| /* |
| * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to |
| * errors elsewhere) could cause it to never get as far as running VDO, skipping the |
| * cleanup code. |
| * |
| * Eventually we should just make that path safe too, and then we won't need this |
| * synchronization. |
| */ |
| wait_for_completion(&started); |
| |
| *queue_ptr = queue; |
| return VDO_SUCCESS; |
| } |
| |
| /** |
| * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will |
| * be distributed to them in round-robin fashion. |
| * |
| * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless |
| * of the actual number of queues and threads allocated here, code outside of the queue |
| * implementation will treat this as a single zone. |
| */ |
| int vdo_make_work_queue(const char *thread_name_prefix, const char *name, |
| struct vdo_thread *owner, const struct vdo_work_queue_type *type, |
| unsigned int thread_count, void *thread_privates[], |
| struct vdo_work_queue **queue_ptr) |
| { |
| struct round_robin_work_queue *queue; |
| int result; |
| char thread_name[TASK_COMM_LEN]; |
| unsigned int i; |
| |
| if (thread_count == 1) { |
| struct simple_work_queue *simple_queue; |
| void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL); |
| |
| result = make_simple_work_queue(thread_name_prefix, name, owner, context, |
| type, &simple_queue); |
| if (result == VDO_SUCCESS) |
| *queue_ptr = &simple_queue->common; |
| return result; |
| } |
| |
| result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue", |
| &queue); |
| if (result != VDO_SUCCESS) |
| return result; |
| |
| result = vdo_allocate(thread_count, struct simple_work_queue *, |
| "subordinate work queues", &queue->service_queues); |
| if (result != VDO_SUCCESS) { |
| vdo_free(queue); |
| return result; |
| } |
| |
| queue->num_service_queues = thread_count; |
| queue->common.round_robin_mode = true; |
| queue->common.owner = owner; |
| |
| result = vdo_duplicate_string(name, "queue name", &queue->common.name); |
| if (result != VDO_SUCCESS) { |
| vdo_free(queue->service_queues); |
| vdo_free(queue); |
| return -ENOMEM; |
| } |
| |
| *queue_ptr = &queue->common; |
| |
| for (i = 0; i < thread_count; i++) { |
| void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL); |
| |
| snprintf(thread_name, sizeof(thread_name), "%s%u", name, i); |
| result = make_simple_work_queue(thread_name_prefix, thread_name, owner, |
| context, type, &queue->service_queues[i]); |
| if (result != VDO_SUCCESS) { |
| queue->num_service_queues = i; |
| /* Destroy previously created subordinates. */ |
| vdo_free_work_queue(vdo_forget(*queue_ptr)); |
| return result; |
| } |
| } |
| |
| return VDO_SUCCESS; |
| } |
| |
| static void finish_simple_work_queue(struct simple_work_queue *queue) |
| { |
| if (queue->thread == NULL) |
| return; |
| |
| /* Tells the worker thread to shut down and waits for it to exit. */ |
| kthread_stop(queue->thread); |
| queue->thread = NULL; |
| } |
| |
| static void finish_round_robin_work_queue(struct round_robin_work_queue *queue) |
| { |
| struct simple_work_queue **queue_table = queue->service_queues; |
| unsigned int count = queue->num_service_queues; |
| unsigned int i; |
| |
| for (i = 0; i < count; i++) |
| finish_simple_work_queue(queue_table[i]); |
| } |
| |
| /* No enqueueing of completions should be done once this function is called. */ |
| void vdo_finish_work_queue(struct vdo_work_queue *queue) |
| { |
| if (queue == NULL) |
| return; |
| |
| if (queue->round_robin_mode) |
| finish_round_robin_work_queue(as_round_robin_work_queue(queue)); |
| else |
| finish_simple_work_queue(as_simple_work_queue(queue)); |
| } |
| |
| /* Debugging dumps */ |
| |
| static void dump_simple_work_queue(struct simple_work_queue *queue) |
| { |
| const char *thread_status = "no threads"; |
| char task_state_report = '-'; |
| |
| if (queue->thread != NULL) { |
| task_state_report = task_state_to_char(queue->thread); |
| thread_status = atomic_read(&queue->idle) ? "idle" : "running"; |
| } |
| |
| vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name, |
| thread_status, task_state_report); |
| |
| /* ->waiting_worker_threads wait queue status? anyone waiting? */ |
| } |
| |
| /* |
| * Write to the buffer some info about the completion, for logging. Since the common use case is |
| * dumping info about a lot of completions to syslog all at once, the format favors brevity over |
| * readability. |
| */ |
| void vdo_dump_work_queue(struct vdo_work_queue *queue) |
| { |
| if (queue->round_robin_mode) { |
| struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); |
| unsigned int i; |
| |
| for (i = 0; i < round_robin->num_service_queues; i++) |
| dump_simple_work_queue(round_robin->service_queues[i]); |
| } else { |
| dump_simple_work_queue(as_simple_work_queue(queue)); |
| } |
| } |
| |
| static void get_function_name(void *pointer, char *buffer, size_t buffer_length) |
| { |
| if (pointer == NULL) { |
| /* |
| * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We |
| * sometimes use this when logging lots of data; don't be so verbose. |
| */ |
| strscpy(buffer, "-", buffer_length); |
| } else { |
| /* |
| * Use a pragma to defeat gcc's format checking, which doesn't understand that |
| * "%ps" actually does support a precision spec in Linux kernel code. |
| */ |
| char *space; |
| |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wformat" |
| snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer); |
| #pragma GCC diagnostic pop |
| |
| space = strchr(buffer, ' '); |
| if (space != NULL) |
| *space = '\0'; |
| } |
| } |
| |
| void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, |
| size_t length) |
| { |
| size_t current_length = |
| scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN, |
| (completion->my_queue == NULL ? "-" : completion->my_queue->name)); |
| |
| if (current_length < length - 1) { |
| get_function_name((void *) completion->callback, buffer + current_length, |
| length - current_length); |
| } |
| } |
| |
| /* Completion submission */ |
| /* |
| * If the completion has a timeout that has already passed, the timeout handler function may be |
| * invoked by this function. |
| */ |
| void vdo_enqueue_work_queue(struct vdo_work_queue *queue, |
| struct vdo_completion *completion) |
| { |
| /* |
| * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue |
| * on. |
| */ |
| struct simple_work_queue *simple_queue = NULL; |
| |
| if (!queue->round_robin_mode) { |
| simple_queue = as_simple_work_queue(queue); |
| } else { |
| struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); |
| |
| /* |
| * It shouldn't be a big deal if the same rotor gets used for multiple work queues. |
| * Any patterns that might develop are likely to be disrupted by random ordering of |
| * multiple completions and migration between cores, unless the load is so light as |
| * to be regular in ordering of tasks and the threads are confined to individual |
| * cores; with a load that light we won't care. |
| */ |
| unsigned int rotor = this_cpu_inc_return(service_queue_rotor); |
| unsigned int index = rotor % round_robin->num_service_queues; |
| |
| simple_queue = round_robin->service_queues[index]; |
| } |
| |
| enqueue_work_queue_completion(simple_queue, completion); |
| } |
| |
| /* Misc */ |
| |
| /* |
| * Return the work queue pointer recorded at initialization time in the work-queue stack handle |
| * initialized on the stack of the current thread, if any. |
| */ |
| static struct simple_work_queue *get_current_thread_work_queue(void) |
| { |
| /* |
| * In interrupt context, if a vdo thread is what got interrupted, the calls below will find |
| * the queue for the thread which was interrupted. However, the interrupted thread may have |
| * been processing a completion, in which case starting to process another would violate |
| * our concurrency assumptions. |
| */ |
| if (in_interrupt()) |
| return NULL; |
| |
| if (kthread_func(current) != work_queue_runner) |
| /* Not a VDO work queue thread. */ |
| return NULL; |
| |
| return kthread_data(current); |
| } |
| |
| struct vdo_work_queue *vdo_get_current_work_queue(void) |
| { |
| struct simple_work_queue *queue = get_current_thread_work_queue(); |
| |
| return (queue == NULL) ? NULL : &queue->common; |
| } |
| |
| struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue) |
| { |
| return queue->owner; |
| } |
| |
| /** |
| * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work |
| * queue, or NULL if none or if the current thread is not a |
| * work queue thread. |
| */ |
| void *vdo_get_work_queue_private_data(void) |
| { |
| struct simple_work_queue *queue = get_current_thread_work_queue(); |
| |
| return (queue != NULL) ? queue->private : NULL; |
| } |
| |
| bool vdo_work_queue_type_is(struct vdo_work_queue *queue, |
| const struct vdo_work_queue_type *type) |
| { |
| return (queue->type == type); |
| } |