blob: 37863879e987ded686c869ff2d959b193ed05446 [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
30};
31
32enum {
33 IO_WQ_BIT_EXIT = 0, /* wq exiting */
34 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
35};
36
37enum {
38 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
39};
40
41/*
42 * One for each thread in a wqe pool
43 */
44struct io_worker {
45 refcount_t ref;
46 unsigned flags;
47 struct hlist_nulls_node nulls_node;
48 struct task_struct *task;
49 wait_queue_head_t wait;
50 struct io_wqe *wqe;
51 struct io_wq_work *cur_work;
52
53 struct rcu_head rcu;
54 struct mm_struct *mm;
55};
56
57struct io_wq_nulls_list {
58 struct hlist_nulls_head head;
59 unsigned long nulls;
60};
61
62#if BITS_PER_LONG == 64
63#define IO_WQ_HASH_ORDER 6
64#else
65#define IO_WQ_HASH_ORDER 5
66#endif
67
68/*
69 * Per-node worker thread pool
70 */
71struct io_wqe {
72 struct {
73 spinlock_t lock;
74 struct list_head work_list;
75 unsigned long hash_map;
76 unsigned flags;
77 } ____cacheline_aligned_in_smp;
78
79 int node;
80 unsigned nr_workers;
81 unsigned max_workers;
82 atomic_t nr_running;
83
84 struct io_wq_nulls_list free_list;
85 struct io_wq_nulls_list busy_list;
86
87 struct io_wq *wq;
88};
89
90/*
91 * Per io_wq state
92 */
93struct io_wq {
94 struct io_wqe **wqes;
95 unsigned long state;
96 unsigned nr_wqes;
97
98 struct task_struct *manager;
99 struct mm_struct *mm;
100 refcount_t refs;
101 struct completion done;
102};
103
104static void io_wq_free_worker(struct rcu_head *head)
105{
106 struct io_worker *worker = container_of(head, struct io_worker, rcu);
107
108 kfree(worker);
109}
110
111static bool io_worker_get(struct io_worker *worker)
112{
113 return refcount_inc_not_zero(&worker->ref);
114}
115
116static void io_worker_release(struct io_worker *worker)
117{
118 if (refcount_dec_and_test(&worker->ref))
119 wake_up_process(worker->task);
120}
121
122/*
123 * Note: drops the wqe->lock if returning true! The caller must re-acquire
124 * the lock in that case. Some callers need to restart handling if this
125 * happens, so we can't just re-acquire the lock on behalf of the caller.
126 */
127static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
128{
129 /*
130 * If we have an active mm, we need to drop the wq lock before unusing
131 * it. If we do, return true and let the caller retry the idle loop.
132 */
133 if (worker->mm) {
134 __acquire(&wqe->lock);
135 spin_unlock_irq(&wqe->lock);
136 __set_current_state(TASK_RUNNING);
137 set_fs(KERNEL_DS);
138 unuse_mm(worker->mm);
139 mmput(worker->mm);
140 worker->mm = NULL;
141 return true;
142 }
143
144 return false;
145}
146
147static void io_worker_exit(struct io_worker *worker)
148{
149 struct io_wqe *wqe = worker->wqe;
150 bool all_done = false;
151
152 /*
153 * If we're not at zero, someone else is holding a brief reference
154 * to the worker. Wait for that to go away.
155 */
156 set_current_state(TASK_INTERRUPTIBLE);
157 if (!refcount_dec_and_test(&worker->ref))
158 schedule();
159 __set_current_state(TASK_RUNNING);
160
161 preempt_disable();
162 current->flags &= ~PF_IO_WORKER;
163 if (worker->flags & IO_WORKER_F_RUNNING)
164 atomic_dec(&wqe->nr_running);
165 worker->flags = 0;
166 preempt_enable();
167
168 spin_lock_irq(&wqe->lock);
169 hlist_nulls_del_rcu(&worker->nulls_node);
170 if (__io_worker_unuse(wqe, worker)) {
171 __release(&wqe->lock);
172 spin_lock_irq(&wqe->lock);
173 }
174 wqe->nr_workers--;
175 all_done = !wqe->nr_workers;
176 spin_unlock_irq(&wqe->lock);
177
178 /* all workers gone, wq exit can proceed */
179 if (all_done && refcount_dec_and_test(&wqe->wq->refs))
180 complete(&wqe->wq->done);
181
182 call_rcu(&worker->rcu, io_wq_free_worker);
183}
184
185static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
186{
187 allow_kernel_signal(SIGINT);
188
189 current->flags |= PF_IO_WORKER;
190
191 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
192 atomic_inc(&wqe->nr_running);
193}
194
195/*
196 * Worker will start processing some work. Move it to the busy list, if
197 * it's currently on the freelist
198 */
199static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
200 struct io_wq_work *work)
201 __must_hold(wqe->lock)
202{
203 if (worker->flags & IO_WORKER_F_FREE) {
204 worker->flags &= ~IO_WORKER_F_FREE;
205 hlist_nulls_del_init_rcu(&worker->nulls_node);
206 hlist_nulls_add_head_rcu(&worker->nulls_node,
207 &wqe->busy_list.head);
208 }
209 worker->cur_work = work;
210}
211
212/*
213 * No work, worker going to sleep. Move to freelist, and unuse mm if we
214 * have one attached. Dropping the mm may potentially sleep, so we drop
215 * the lock in that case and return success. Since the caller has to
216 * retry the loop in that case (we changed task state), we don't regrab
217 * the lock if we return success.
218 */
219static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
220 __must_hold(wqe->lock)
221{
222 if (!(worker->flags & IO_WORKER_F_FREE)) {
223 worker->flags |= IO_WORKER_F_FREE;
224 hlist_nulls_del_init_rcu(&worker->nulls_node);
225 hlist_nulls_add_head_rcu(&worker->nulls_node,
226 &wqe->free_list.head);
227 }
228
229 return __io_worker_unuse(wqe, worker);
230}
231
232static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
233 __must_hold(wqe->lock)
234{
235 struct io_wq_work *work;
236
237 list_for_each_entry(work, &wqe->work_list, list) {
238 /* not hashed, can run anytime */
239 if (!(work->flags & IO_WQ_WORK_HASHED)) {
240 list_del(&work->list);
241 return work;
242 }
243
244 /* hashed, can run if not already running */
245 *hash = work->flags >> IO_WQ_HASH_SHIFT;
246 if (!(wqe->hash_map & BIT_ULL(*hash))) {
247 wqe->hash_map |= BIT_ULL(*hash);
248 list_del(&work->list);
249 return work;
250 }
251 }
252
253 return NULL;
254}
255
256static void io_worker_handle_work(struct io_worker *worker)
257 __releases(wqe->lock)
258{
259 struct io_wq_work *work, *old_work;
260 struct io_wqe *wqe = worker->wqe;
261 struct io_wq *wq = wqe->wq;
262
263 do {
264 unsigned hash = -1U;
265
266 /*
267 * Signals are either sent to cancel specific work, or to just
268 * cancel all work items. For the former, ->cur_work must
269 * match. ->cur_work is NULL at this point, since we haven't
270 * assigned any work, so it's safe to flush signals for that
271 * case. For the latter case of cancelling all work, the caller
272 * wil have set IO_WQ_BIT_CANCEL.
273 */
274 if (signal_pending(current))
275 flush_signals(current);
276
277 /*
278 * If we got some work, mark us as busy. If we didn't, but
279 * the list isn't empty, it means we stalled on hashed work.
280 * Mark us stalled so we don't keep looking for work when we
281 * can't make progress, any work completion or insertion will
282 * clear the stalled flag.
283 */
284 work = io_get_next_work(wqe, &hash);
285 if (work)
286 __io_worker_busy(wqe, worker, work);
287 else if (!list_empty(&wqe->work_list))
288 wqe->flags |= IO_WQE_FLAG_STALLED;
289
290 spin_unlock_irq(&wqe->lock);
291 if (!work)
292 break;
293next:
294 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
295 wq->mm && mmget_not_zero(wq->mm)) {
296 use_mm(wq->mm);
297 set_fs(USER_DS);
298 worker->mm = wq->mm;
299 }
300 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
301 work->flags |= IO_WQ_WORK_CANCEL;
302 if (worker->mm)
303 work->flags |= IO_WQ_WORK_HAS_MM;
304
305 old_work = work;
306 work->func(&work);
307
308 spin_lock_irq(&wqe->lock);
309 worker->cur_work = NULL;
310 if (hash != -1U) {
311 wqe->hash_map &= ~BIT_ULL(hash);
312 wqe->flags &= ~IO_WQE_FLAG_STALLED;
313 }
314 if (work && work != old_work) {
315 spin_unlock_irq(&wqe->lock);
316 /* dependent work not hashed */
317 hash = -1U;
318 goto next;
319 }
320 } while (1);
321}
322
323static inline bool io_wqe_run_queue(struct io_wqe *wqe)
324 __must_hold(wqe->lock)
325{
326 if (!list_empty_careful(&wqe->work_list) &&
327 !(wqe->flags & IO_WQE_FLAG_STALLED))
328 return true;
329 return false;
330}
331
332static int io_wqe_worker(void *data)
333{
334 struct io_worker *worker = data;
335 struct io_wqe *wqe = worker->wqe;
336 struct io_wq *wq = wqe->wq;
337 DEFINE_WAIT(wait);
338
339 io_worker_start(wqe, worker);
340
341 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
342 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
343
344 spin_lock_irq(&wqe->lock);
345 if (io_wqe_run_queue(wqe)) {
346 __set_current_state(TASK_RUNNING);
347 io_worker_handle_work(worker);
348 continue;
349 }
350 /* drops the lock on success, retry */
351 if (__io_worker_idle(wqe, worker)) {
352 __release(&wqe->lock);
353 continue;
354 }
355 spin_unlock_irq(&wqe->lock);
356 if (signal_pending(current))
357 flush_signals(current);
358 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
359 continue;
360 /* timed out, exit unless we're the fixed worker */
361 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
362 !(worker->flags & IO_WORKER_F_FIXED))
363 break;
364 }
365
366 finish_wait(&worker->wait, &wait);
367
368 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
369 spin_lock_irq(&wqe->lock);
370 if (!list_empty(&wqe->work_list))
371 io_worker_handle_work(worker);
372 else
373 spin_unlock_irq(&wqe->lock);
374 }
375
376 io_worker_exit(worker);
377 return 0;
378}
379
380/*
381 * Check head of free list for an available worker. If one isn't available,
382 * caller must wake up the wq manager to create one.
383 */
384static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
385 __must_hold(RCU)
386{
387 struct hlist_nulls_node *n;
388 struct io_worker *worker;
389
390 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head));
391 if (is_a_nulls(n))
392 return false;
393
394 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
395 if (io_worker_get(worker)) {
396 wake_up(&worker->wait);
397 io_worker_release(worker);
398 return true;
399 }
400
401 return false;
402}
403
404/*
405 * We need a worker. If we find a free one, we're good. If not, and we're
406 * below the max number of workers, wake up the manager to create one.
407 */
408static void io_wqe_wake_worker(struct io_wqe *wqe)
409{
410 bool ret;
411
412 rcu_read_lock();
413 ret = io_wqe_activate_free_worker(wqe);
414 rcu_read_unlock();
415
416 if (!ret && wqe->nr_workers < wqe->max_workers)
417 wake_up_process(wqe->wq->manager);
418}
419
420/*
421 * Called when a worker is scheduled in. Mark us as currently running.
422 */
423void io_wq_worker_running(struct task_struct *tsk)
424{
425 struct io_worker *worker = kthread_data(tsk);
426 struct io_wqe *wqe = worker->wqe;
427
428 if (!(worker->flags & IO_WORKER_F_UP))
429 return;
430 if (worker->flags & IO_WORKER_F_RUNNING)
431 return;
432 worker->flags |= IO_WORKER_F_RUNNING;
433 atomic_inc(&wqe->nr_running);
434}
435
436/*
437 * Called when worker is going to sleep. If there are no workers currently
438 * running and we have work pending, wake up a free one or have the manager
439 * set one up.
440 */
441void io_wq_worker_sleeping(struct task_struct *tsk)
442{
443 struct io_worker *worker = kthread_data(tsk);
444 struct io_wqe *wqe = worker->wqe;
445
446 if (!(worker->flags & IO_WORKER_F_UP))
447 return;
448 if (!(worker->flags & IO_WORKER_F_RUNNING))
449 return;
450
451 worker->flags &= ~IO_WORKER_F_RUNNING;
452
453 spin_lock_irq(&wqe->lock);
454 if (atomic_dec_and_test(&wqe->nr_running) && io_wqe_run_queue(wqe))
455 io_wqe_wake_worker(wqe);
456 spin_unlock_irq(&wqe->lock);
457}
458
459static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
460{
461 struct io_worker *worker;
462
463 worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
464 if (!worker)
465 return;
466
467 refcount_set(&worker->ref, 1);
468 worker->nulls_node.pprev = NULL;
469 init_waitqueue_head(&worker->wait);
470 worker->wqe = wqe;
471
472 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
473 "io_wqe_worker-%d", wqe->node);
474 if (IS_ERR(worker->task)) {
475 kfree(worker);
476 return;
477 }
478
479 spin_lock_irq(&wqe->lock);
480 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head);
481 worker->flags |= IO_WORKER_F_FREE;
482 if (!wqe->nr_workers)
483 worker->flags |= IO_WORKER_F_FIXED;
484 wqe->nr_workers++;
485 spin_unlock_irq(&wqe->lock);
486
487 wake_up_process(worker->task);
488}
489
490static inline bool io_wqe_need_new_worker(struct io_wqe *wqe)
491 __must_hold(wqe->lock)
492{
493 if (!wqe->nr_workers)
494 return true;
495 if (hlist_nulls_empty(&wqe->free_list.head) &&
496 wqe->nr_workers < wqe->max_workers && io_wqe_run_queue(wqe))
497 return true;
498
499 return false;
500}
501
502/*
503 * Manager thread. Tasked with creating new workers, if we need them.
504 */
505static int io_wq_manager(void *data)
506{
507 struct io_wq *wq = data;
508
509 while (!kthread_should_stop()) {
510 int i;
511
512 for (i = 0; i < wq->nr_wqes; i++) {
513 struct io_wqe *wqe = wq->wqes[i];
514 bool fork_worker = false;
515
516 spin_lock_irq(&wqe->lock);
517 fork_worker = io_wqe_need_new_worker(wqe);
518 spin_unlock_irq(&wqe->lock);
519 if (fork_worker)
520 create_io_worker(wq, wqe);
521 }
522 set_current_state(TASK_INTERRUPTIBLE);
523 schedule_timeout(HZ);
524 }
525
526 return 0;
527}
528
529static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
530{
531 unsigned long flags;
532
533 spin_lock_irqsave(&wqe->lock, flags);
534 list_add_tail(&work->list, &wqe->work_list);
535 wqe->flags &= ~IO_WQE_FLAG_STALLED;
536 spin_unlock_irqrestore(&wqe->lock, flags);
537
538 if (!atomic_read(&wqe->nr_running))
539 io_wqe_wake_worker(wqe);
540}
541
542void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
543{
544 struct io_wqe *wqe = wq->wqes[numa_node_id()];
545
546 io_wqe_enqueue(wqe, work);
547}
548
549/*
550 * Enqueue work, hashed by some key. Work items that hash to the same value
551 * will not be done in parallel. Used to limit concurrent writes, generally
552 * hashed by inode.
553 */
554void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
555{
556 struct io_wqe *wqe = wq->wqes[numa_node_id()];
557 unsigned bit;
558
559
560 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
561 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
562 io_wqe_enqueue(wqe, work);
563}
564
565static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
566{
567 send_sig(SIGINT, worker->task, 1);
568 return false;
569}
570
571/*
572 * Iterate the passed in list and call the specific function for each
573 * worker that isn't exiting
574 */
575static bool io_wq_for_each_worker(struct io_wqe *wqe,
576 struct io_wq_nulls_list *list,
577 bool (*func)(struct io_worker *, void *),
578 void *data)
579{
580 struct hlist_nulls_node *n;
581 struct io_worker *worker;
582 bool ret = false;
583
584restart:
585 hlist_nulls_for_each_entry_rcu(worker, n, &list->head, nulls_node) {
586 if (io_worker_get(worker)) {
587 ret = func(worker, data);
588 io_worker_release(worker);
589 if (ret)
590 break;
591 }
592 }
593 if (!ret && get_nulls_value(n) != list->nulls)
594 goto restart;
595 return ret;
596}
597
598void io_wq_cancel_all(struct io_wq *wq)
599{
600 int i;
601
602 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
603
604 /*
605 * Browse both lists, as there's a gap between handing work off
606 * to a worker and the worker putting itself on the busy_list
607 */
608 rcu_read_lock();
609 for (i = 0; i < wq->nr_wqes; i++) {
610 struct io_wqe *wqe = wq->wqes[i];
611
612 io_wq_for_each_worker(wqe, &wqe->busy_list,
613 io_wqe_worker_send_sig, NULL);
614 io_wq_for_each_worker(wqe, &wqe->free_list,
615 io_wqe_worker_send_sig, NULL);
616 }
617 rcu_read_unlock();
618}
619
620static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
621{
622 struct io_wq_work *work = data;
623
624 if (worker->cur_work == work) {
625 send_sig(SIGINT, worker->task, 1);
626 return true;
627 }
628
629 return false;
630}
631
632static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
633 struct io_wq_work *cwork)
634{
635 struct io_wq_work *work;
636 bool found = false;
637
638 cwork->flags |= IO_WQ_WORK_CANCEL;
639
640 /*
641 * First check pending list, if we're lucky we can just remove it
642 * from there. CANCEL_OK means that the work is returned as-new,
643 * no completion will be posted for it.
644 */
645 spin_lock_irq(&wqe->lock);
646 list_for_each_entry(work, &wqe->work_list, list) {
647 if (work == cwork) {
648 list_del(&work->list);
649 found = true;
650 break;
651 }
652 }
653 spin_unlock_irq(&wqe->lock);
654
655 if (found) {
656 work->flags |= IO_WQ_WORK_CANCEL;
657 work->func(&work);
658 return IO_WQ_CANCEL_OK;
659 }
660
661 /*
662 * Now check if a free (going busy) or busy worker has the work
663 * currently running. If we find it there, we'll return CANCEL_RUNNING
664 * as an indication that we attempte to signal cancellation. The
665 * completion will run normally in this case.
666 */
667 rcu_read_lock();
668 found = io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_cancel,
669 cwork);
670 if (found)
671 goto done;
672
673 found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_cancel,
674 cwork);
675done:
676 rcu_read_unlock();
677 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
678}
679
680enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
681{
682 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
683 int i;
684
685 for (i = 0; i < wq->nr_wqes; i++) {
686 struct io_wqe *wqe = wq->wqes[i];
687
688 ret = io_wqe_cancel_work(wqe, cwork);
689 if (ret != IO_WQ_CANCEL_NOTFOUND)
690 break;
691 }
692
693 return ret;
694}
695
696struct io_wq_flush_data {
697 struct io_wq_work work;
698 struct completion done;
699};
700
701static void io_wq_flush_func(struct io_wq_work **workptr)
702{
703 struct io_wq_work *work = *workptr;
704 struct io_wq_flush_data *data;
705
706 data = container_of(work, struct io_wq_flush_data, work);
707 complete(&data->done);
708}
709
710/*
711 * Doesn't wait for previously queued work to finish. When this completes,
712 * it just means that previously queued work was started.
713 */
714void io_wq_flush(struct io_wq *wq)
715{
716 struct io_wq_flush_data data;
717 int i;
718
719 for (i = 0; i < wq->nr_wqes; i++) {
720 struct io_wqe *wqe = wq->wqes[i];
721
722 init_completion(&data.done);
723 INIT_IO_WORK(&data.work, io_wq_flush_func);
724 io_wqe_enqueue(wqe, &data.work);
725 wait_for_completion(&data.done);
726 }
727}
728
729struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
730{
731 int ret = -ENOMEM, i, node;
732 struct io_wq *wq;
733
734 wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
735 if (!wq)
736 return ERR_PTR(-ENOMEM);
737
738 wq->nr_wqes = num_online_nodes();
739 wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
740 if (!wq->wqes) {
741 kfree(wq);
742 return ERR_PTR(-ENOMEM);
743 }
744
745 i = 0;
746 refcount_set(&wq->refs, wq->nr_wqes);
747 for_each_online_node(node) {
748 struct io_wqe *wqe;
749
750 wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
751 if (!wqe)
752 break;
753 wq->wqes[i] = wqe;
754 wqe->node = node;
755 wqe->max_workers = concurrency;
756 wqe->node = node;
757 wqe->wq = wq;
758 spin_lock_init(&wqe->lock);
759 INIT_LIST_HEAD(&wqe->work_list);
760 INIT_HLIST_NULLS_HEAD(&wqe->free_list.head, 0);
761 wqe->free_list.nulls = 0;
762 INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1);
763 wqe->busy_list.nulls = 1;
764 atomic_set(&wqe->nr_running, 0);
765
766 i++;
767 }
768
769 init_completion(&wq->done);
770
771 if (i != wq->nr_wqes)
772 goto err;
773
774 /* caller must have already done mmgrab() on this mm */
775 wq->mm = mm;
776
777 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
778 if (!IS_ERR(wq->manager)) {
779 wake_up_process(wq->manager);
780 return wq;
781 }
782
783 ret = PTR_ERR(wq->manager);
784 wq->manager = NULL;
785err:
786 complete(&wq->done);
787 io_wq_destroy(wq);
788 return ERR_PTR(ret);
789}
790
791static bool io_wq_worker_wake(struct io_worker *worker, void *data)
792{
793 wake_up_process(worker->task);
794 return false;
795}
796
797void io_wq_destroy(struct io_wq *wq)
798{
799 int i;
800
801 if (wq->manager) {
802 set_bit(IO_WQ_BIT_EXIT, &wq->state);
803 kthread_stop(wq->manager);
804 }
805
806 rcu_read_lock();
807 for (i = 0; i < wq->nr_wqes; i++) {
808 struct io_wqe *wqe = wq->wqes[i];
809
810 if (!wqe)
811 continue;
812 io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_wake,
813 NULL);
814 io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_wake,
815 NULL);
816 }
817 rcu_read_unlock();
818
819 wait_for_completion(&wq->done);
820
821 for (i = 0; i < wq->nr_wqes; i++)
822 kfree(wq->wqes[i]);
823 kfree(wq->wqes);
824 kfree(wq);
825}