Merge tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block
Pull io_uring updates from Jens Axboe:
"Lots of cleanups in here, hardening the code and/or making it easier
to read and fixing bugs, but a core feature/change too adding support
for real async buffered reads. With the latter in place, we just need
buffered write async support and we're done relying on kthreads for
the fast path. In detail:
- Cleanup how memory accounting is done on ring setup/free (Bijan)
- sq array offset calculation fixup (Dmitry)
- Consistently handle blocking off O_DIRECT submission path (me)
- Support proper async buffered reads, instead of relying on kthread
offload for that. This uses the page waitqueue to drive retries
from task_work, like we handle poll based retry. (me)
- IO completion optimizations (me)
- Fix race with accounting and ring fd install (me)
- Support EPOLLEXCLUSIVE (Jiufei)
- Get rid of the io_kiocb unionizing, made possible by shrinking
other bits (Pavel)
- Completion side cleanups (Pavel)
- Cleanup REQ_F_ flags handling, and kill off many of them (Pavel)
- Request environment grabbing cleanups (Pavel)
- File and socket read/write cleanups (Pavel)
- Improve kiocb_set_rw_flags() (Pavel)
- Tons of fixes and cleanups (Pavel)
- IORING_SQ_NEED_WAKEUP clear fix (Xiaoguang)"
* tag 'for-5.9/io_uring-20200802' of git://git.kernel.dk/linux-block: (127 commits)
io_uring: flip if handling after io_setup_async_rw
fs: optimise kiocb_set_rw_flags()
io_uring: don't touch 'ctx' after installing file descriptor
io_uring: get rid of atomic FAA for cq_timeouts
io_uring: consolidate *_check_overflow accounting
io_uring: fix stalled deferred requests
io_uring: fix racy overflow count reporting
io_uring: deduplicate __io_complete_rw()
io_uring: de-unionise io_kiocb
io-wq: update hash bits
io_uring: fix missing io_queue_linked_timeout()
io_uring: mark ->work uninitialised after cleanup
io_uring: deduplicate io_grab_files() calls
io_uring: don't do opcode prep twice
io_uring: clear IORING_SQ_NEED_WAKEUP after executing task works
io_uring: batch put_task_struct()
tasks: add put_task_struct_many()
io_uring: return locked and pinned page accounting
io_uring: don't miscount pinned memory
io_uring: don't open-code recv kbuf managment
...
diff --git a/block/blk-core.c b/block/blk-core.c
index 93104c7..d9d6326 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -960,9 +960,14 @@ static noinline_for_stack bool submit_bio_checks(struct bio *bio)
{
struct request_queue *q = bio->bi_disk->queue;
blk_status_t status = BLK_STS_IOERR;
+ struct blk_plug *plug;
might_sleep();
+ plug = blk_mq_plug(q, bio);
+ if (plug && plug->nowait)
+ bio->bi_opf |= REQ_NOWAIT;
+
/*
* For a REQ_NOWAIT based request, return -EOPNOTSUPP
* if queue is not a request based queue.
@@ -1802,6 +1807,7 @@ void blk_start_plug(struct blk_plug *plug)
INIT_LIST_HEAD(&plug->cb_list);
plug->rq_count = 0;
plug->multiple_queues = false;
+ plug->nowait = false;
/*
* Store ordering should not be needed here, since a potential
diff --git a/fs/block_dev.c b/fs/block_dev.c
index 3f94a06..8ae833e 100644
--- a/fs/block_dev.c
+++ b/fs/block_dev.c
@@ -1734,7 +1734,7 @@ static int blkdev_open(struct inode * inode, struct file * filp)
*/
filp->f_flags |= O_LARGEFILE;
- filp->f_mode |= FMODE_NOWAIT;
+ filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
if (filp->f_flags & O_NDELAY)
filp->f_mode |= FMODE_NDELAY;
diff --git a/fs/btrfs/file.c b/fs/btrfs/file.c
index 841c516..bb824c7 100644
--- a/fs/btrfs/file.c
+++ b/fs/btrfs/file.c
@@ -3537,7 +3537,7 @@ static loff_t btrfs_file_llseek(struct file *file, loff_t offset, int whence)
static int btrfs_file_open(struct inode *inode, struct file *filp)
{
- filp->f_mode |= FMODE_NOWAIT;
+ filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
return generic_file_open(inode, filp);
}
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 47c5f3a..e92c472 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -462,6 +462,7 @@ static void io_impersonate_work(struct io_worker *worker,
io_wq_switch_mm(worker, work);
if (worker->cur_creds != work->creds)
io_wq_switch_creds(worker, work);
+ current->signal->rlim[RLIMIT_FSIZE].rlim_cur = work->fsize;
}
static void io_assign_current_work(struct io_worker *worker,
@@ -489,7 +490,6 @@ static void io_worker_handle_work(struct io_worker *worker)
do {
struct io_wq_work *work;
- unsigned int hash;
get_next:
/*
* If we got some work, mark us as busy. If we didn't, but
@@ -512,6 +512,7 @@ static void io_worker_handle_work(struct io_worker *worker)
/* handle a whole dependent link */
do {
struct io_wq_work *old_work, *next_hashed, *linked;
+ unsigned int hash = io_get_work_hash(work);
next_hashed = wq_next_work(work);
io_impersonate_work(worker, work);
@@ -522,10 +523,8 @@ static void io_worker_handle_work(struct io_worker *worker)
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL;
- hash = io_get_work_hash(work);
- linked = old_work = work;
- wq->do_work(&linked);
- linked = (old_work == linked) ? NULL : linked;
+ old_work = work;
+ linked = wq->do_work(work);
work = next_hashed;
if (!work && linked && !io_wq_is_hashed(linked)) {
@@ -542,8 +541,6 @@ static void io_worker_handle_work(struct io_worker *worker)
spin_lock_irq(&wqe->lock);
wqe->hash_map &= ~BIT_ULL(hash);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
- /* dependent work is not hashed */
- hash = -1U;
/* skip unnecessary unlock-lock wqe->lock */
if (!work)
goto get_next;
@@ -781,8 +778,7 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
struct io_wq_work *old_work = work;
work->flags |= IO_WQ_WORK_CANCEL;
- wq->do_work(&work);
- work = (work == old_work) ? NULL : work;
+ work = wq->do_work(work);
wq->free_work(old_work);
} while (work);
}
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 071f1a99..ddaf961 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -5,10 +5,10 @@ struct io_wq;
enum {
IO_WQ_WORK_CANCEL = 1,
- IO_WQ_WORK_HASHED = 4,
- IO_WQ_WORK_UNBOUND = 32,
- IO_WQ_WORK_NO_CANCEL = 256,
- IO_WQ_WORK_CONCURRENT = 512,
+ IO_WQ_WORK_HASHED = 2,
+ IO_WQ_WORK_UNBOUND = 4,
+ IO_WQ_WORK_NO_CANCEL = 8,
+ IO_WQ_WORK_CONCURRENT = 16,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
};
@@ -89,6 +89,7 @@ struct io_wq_work {
struct mm_struct *mm;
const struct cred *creds;
struct fs_struct *fs;
+ unsigned long fsize;
unsigned flags;
};
@@ -101,7 +102,7 @@ static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
}
typedef void (free_work_fn)(struct io_wq_work *);
-typedef void (io_wq_work_fn)(struct io_wq_work **);
+typedef struct io_wq_work *(io_wq_work_fn)(struct io_wq_work *);
struct io_wq_data {
struct user_struct *user;
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 493e504..2a3af95 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -78,6 +78,7 @@
#include <linux/fs_struct.h>
#include <linux/splice.h>
#include <linux/task_work.h>
+#include <linux/pagemap.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
@@ -226,7 +227,7 @@ struct io_ring_ctx {
struct {
unsigned int flags;
unsigned int compat: 1;
- unsigned int account_mem: 1;
+ unsigned int limit_mem: 1;
unsigned int cq_overflow_flushed: 1;
unsigned int drain_next: 1;
unsigned int eventfd_async: 1;
@@ -319,12 +320,12 @@ struct io_ring_ctx {
spinlock_t completion_lock;
/*
- * ->poll_list is protected by the ctx->uring_lock for
+ * ->iopoll_list is protected by the ctx->uring_lock for
* io_uring instances that don't use IORING_SETUP_SQPOLL.
* For SQPOLL, only the single threaded io_sq_thread() will
* manipulate the list, hence no extra locking is needed there.
*/
- struct list_head poll_list;
+ struct list_head iopoll_list;
struct hlist_head *cancel_hash;
unsigned cancel_hash_bits;
bool poll_multi_file;
@@ -395,6 +396,7 @@ struct io_timeout {
int flags;
u32 off;
u32 target_seq;
+ struct list_head list;
};
struct io_rw {
@@ -413,7 +415,7 @@ struct io_connect {
struct io_sr_msg {
struct file *file;
union {
- struct user_msghdr __user *msg;
+ struct user_msghdr __user *umsg;
void __user *buf;
};
int msg_flags;
@@ -486,6 +488,12 @@ struct io_statx {
struct statx __user *buffer;
};
+struct io_completion {
+ struct file *file;
+ struct list_head list;
+ int cflags;
+};
+
struct io_async_connect {
struct sockaddr_storage address;
};
@@ -503,6 +511,7 @@ struct io_async_rw {
struct iovec *iov;
ssize_t nr_segs;
ssize_t size;
+ struct wait_page_queue wpq;
};
struct io_async_ctx {
@@ -523,23 +532,18 @@ enum {
REQ_F_BUFFER_SELECT_BIT = IOSQE_BUFFER_SELECT_BIT,
REQ_F_LINK_HEAD_BIT,
- REQ_F_LINK_NEXT_BIT,
REQ_F_FAIL_LINK_BIT,
REQ_F_INFLIGHT_BIT,
REQ_F_CUR_POS_BIT,
REQ_F_NOWAIT_BIT,
REQ_F_LINK_TIMEOUT_BIT,
- REQ_F_TIMEOUT_BIT,
REQ_F_ISREG_BIT,
- REQ_F_MUST_PUNT_BIT,
- REQ_F_TIMEOUT_NOSEQ_BIT,
REQ_F_COMP_LOCKED_BIT,
REQ_F_NEED_CLEANUP_BIT,
REQ_F_OVERFLOW_BIT,
REQ_F_POLLED_BIT,
REQ_F_BUFFER_SELECTED_BIT,
REQ_F_NO_FILE_TABLE_BIT,
- REQ_F_QUEUE_TIMEOUT_BIT,
REQ_F_WORK_INITIALIZED_BIT,
REQ_F_TASK_PINNED_BIT,
@@ -563,8 +567,6 @@ enum {
/* head of a link */
REQ_F_LINK_HEAD = BIT(REQ_F_LINK_HEAD_BIT),
- /* already grabbed next link */
- REQ_F_LINK_NEXT = BIT(REQ_F_LINK_NEXT_BIT),
/* fail rest of links */
REQ_F_FAIL_LINK = BIT(REQ_F_FAIL_LINK_BIT),
/* on inflight list */
@@ -575,14 +577,8 @@ enum {
REQ_F_NOWAIT = BIT(REQ_F_NOWAIT_BIT),
/* has linked timeout */
REQ_F_LINK_TIMEOUT = BIT(REQ_F_LINK_TIMEOUT_BIT),
- /* timeout request */
- REQ_F_TIMEOUT = BIT(REQ_F_TIMEOUT_BIT),
/* regular file */
REQ_F_ISREG = BIT(REQ_F_ISREG_BIT),
- /* must be punted even for NONBLOCK */
- REQ_F_MUST_PUNT = BIT(REQ_F_MUST_PUNT_BIT),
- /* no timeout sequence */
- REQ_F_TIMEOUT_NOSEQ = BIT(REQ_F_TIMEOUT_NOSEQ_BIT),
/* completion under lock */
REQ_F_COMP_LOCKED = BIT(REQ_F_COMP_LOCKED_BIT),
/* needs cleanup */
@@ -595,8 +591,6 @@ enum {
REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT),
/* doesn't need file table for this request */
REQ_F_NO_FILE_TABLE = BIT(REQ_F_NO_FILE_TABLE_BIT),
- /* needs to queue linked timeout */
- REQ_F_QUEUE_TIMEOUT = BIT(REQ_F_QUEUE_TIMEOUT_BIT),
/* io_wq_work is initialized */
REQ_F_WORK_INITIALIZED = BIT(REQ_F_WORK_INITIALIZED_BIT),
/* req->task is refcounted */
@@ -606,7 +600,6 @@ enum {
struct async_poll {
struct io_poll_iocb poll;
struct io_poll_iocb *double_poll;
- struct io_wq_work work;
};
/*
@@ -635,51 +628,54 @@ struct io_kiocb {
struct io_splice splice;
struct io_provide_buf pbuf;
struct io_statx statx;
+ /* use only after cleaning per-op data, see io_clean_op() */
+ struct io_completion compl;
};
struct io_async_ctx *io;
- int cflags;
u8 opcode;
/* polled IO has completed */
u8 iopoll_completed;
u16 buf_index;
+ u32 result;
- struct io_ring_ctx *ctx;
- struct list_head list;
- unsigned int flags;
- refcount_t refs;
- struct task_struct *task;
- unsigned long fsize;
- u64 user_data;
- u32 result;
- u32 sequence;
+ struct io_ring_ctx *ctx;
+ unsigned int flags;
+ refcount_t refs;
+ struct task_struct *task;
+ u64 user_data;
- struct list_head link_list;
+ struct list_head link_list;
- struct list_head inflight_entry;
+ /*
+ * 1. used with ctx->iopoll_list with reads/writes
+ * 2. to track reqs with ->files (see io_op_def::file_table)
+ */
+ struct list_head inflight_entry;
- struct percpu_ref *fixed_file_refs;
-
- union {
- /*
- * Only commands that never go async can use the below fields,
- * obviously. Right now only IORING_OP_POLL_ADD uses them, and
- * async armed poll handlers for regular commands. The latter
- * restore the work, if needed.
- */
- struct {
- struct callback_head task_work;
- struct hlist_node hash_node;
- struct async_poll *apoll;
- };
- struct io_wq_work work;
- };
+ struct percpu_ref *fixed_file_refs;
+ struct callback_head task_work;
+ /* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
+ struct hlist_node hash_node;
+ struct async_poll *apoll;
+ struct io_wq_work work;
};
-#define IO_PLUG_THRESHOLD 2
+struct io_defer_entry {
+ struct list_head list;
+ struct io_kiocb *req;
+ u32 seq;
+};
+
#define IO_IOPOLL_BATCH 8
+struct io_comp_state {
+ unsigned int nr;
+ struct list_head list;
+ struct io_ring_ctx *ctx;
+};
+
struct io_submit_state {
struct blk_plug plug;
@@ -690,12 +686,16 @@ struct io_submit_state {
unsigned int free_reqs;
/*
+ * Batch completion logic
+ */
+ struct io_comp_state comp;
+
+ /*
* File reference cache
*/
struct file *file;
unsigned int fd;
unsigned int has_refs;
- unsigned int used_refs;
unsigned int ios_left;
};
@@ -723,6 +723,7 @@ struct io_op_def {
unsigned pollout : 1;
/* op supports buffer selection */
unsigned buffer_select : 1;
+ unsigned needs_fsize : 1;
};
static const struct io_op_def io_op_defs[] = {
@@ -742,6 +743,7 @@ static const struct io_op_def io_op_defs[] = {
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
+ .needs_fsize = 1,
},
[IORING_OP_FSYNC] = {
.needs_file = 1,
@@ -756,6 +758,7 @@ static const struct io_op_def io_op_defs[] = {
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
+ .needs_fsize = 1,
},
[IORING_OP_POLL_ADD] = {
.needs_file = 1,
@@ -808,6 +811,7 @@ static const struct io_op_def io_op_defs[] = {
},
[IORING_OP_FALLOCATE] = {
.needs_file = 1,
+ .needs_fsize = 1,
},
[IORING_OP_OPENAT] = {
.file_table = 1,
@@ -839,6 +843,7 @@ static const struct io_op_def io_op_defs[] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
+ .needs_fsize = 1,
},
[IORING_OP_FADVISE] = {
.needs_file = 1,
@@ -881,22 +886,37 @@ static const struct io_op_def io_op_defs[] = {
},
};
-static void io_wq_submit_work(struct io_wq_work **workptr);
+enum io_mem_account {
+ ACCT_LOCKED,
+ ACCT_PINNED,
+};
+
+static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
+ struct io_comp_state *cs);
static void io_cqring_fill_event(struct io_kiocb *req, long res);
static void io_put_req(struct io_kiocb *req);
+static void io_double_put_req(struct io_kiocb *req);
static void __io_double_put_req(struct io_kiocb *req);
static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
static void io_queue_linked_timeout(struct io_kiocb *req);
static int __io_sqe_files_update(struct io_ring_ctx *ctx,
struct io_uring_files_update *ip,
unsigned nr_args);
-static int io_grab_files(struct io_kiocb *req);
-static void io_complete_rw_common(struct kiocb *kiocb, long res);
-static void io_cleanup_req(struct io_kiocb *req);
+static int io_prep_work_files(struct io_kiocb *req);
+static void __io_clean_op(struct io_kiocb *req);
static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
int fd, struct file **out_file, bool fixed);
static void __io_queue_sqe(struct io_kiocb *req,
- const struct io_uring_sqe *sqe);
+ const struct io_uring_sqe *sqe,
+ struct io_comp_state *cs);
+static void io_file_put_work(struct work_struct *work);
+
+static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
+ struct iovec **iovec, struct iov_iter *iter,
+ bool needs_lock);
+static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
+ struct iovec *iovec, struct iovec *fast_iov,
+ struct iov_iter *iter);
static struct kmem_cache *req_cachep;
@@ -923,6 +943,12 @@ static void io_get_req_task(struct io_kiocb *req)
req->flags |= REQ_F_TASK_PINNED;
}
+static inline void io_clean_op(struct io_kiocb *req)
+{
+ if (req->flags & (REQ_F_NEED_CLEANUP | REQ_F_BUFFER_SELECTED))
+ __io_clean_op(req);
+}
+
/* not idempotent -- it doesn't clear REQ_F_TASK_PINNED */
static void __io_put_req_task(struct io_kiocb *req)
{
@@ -930,7 +956,41 @@ static void __io_put_req_task(struct io_kiocb *req)
put_task_struct(req->task);
}
-static void io_file_put_work(struct work_struct *work);
+static void io_sq_thread_drop_mm(void)
+{
+ struct mm_struct *mm = current->mm;
+
+ if (mm) {
+ kthread_unuse_mm(mm);
+ mmput(mm);
+ }
+}
+
+static int __io_sq_thread_acquire_mm(struct io_ring_ctx *ctx)
+{
+ if (!current->mm) {
+ if (unlikely(!(ctx->flags & IORING_SETUP_SQPOLL) ||
+ !mmget_not_zero(ctx->sqo_mm)))
+ return -EFAULT;
+ kthread_use_mm(ctx->sqo_mm);
+ }
+
+ return 0;
+}
+
+static int io_sq_thread_acquire_mm(struct io_ring_ctx *ctx,
+ struct io_kiocb *req)
+{
+ if (!io_op_defs[req->opcode].needs_mm)
+ return 0;
+ return __io_sq_thread_acquire_mm(ctx);
+}
+
+static inline void req_set_fail_links(struct io_kiocb *req)
+{
+ if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
+ req->flags |= REQ_F_FAIL_LINK;
+}
/*
* Note: must call io_req_init_async() for the first time you
@@ -957,6 +1017,11 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref)
complete(&ctx->ref_comp);
}
+static inline bool io_is_timeout_noseq(struct io_kiocb *req)
+{
+ return !req->timeout.off;
+}
+
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
struct io_ring_ctx *ctx;
@@ -1000,7 +1065,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->wait);
spin_lock_init(&ctx->completion_lock);
- INIT_LIST_HEAD(&ctx->poll_list);
+ INIT_LIST_HEAD(&ctx->iopoll_list);
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
init_waitqueue_head(&ctx->inflight_wait);
@@ -1017,18 +1082,14 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
return NULL;
}
-static inline bool __req_need_defer(struct io_kiocb *req)
+static bool req_need_defer(struct io_kiocb *req, u32 seq)
{
- struct io_ring_ctx *ctx = req->ctx;
+ if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
+ struct io_ring_ctx *ctx = req->ctx;
- return req->sequence != ctx->cached_cq_tail
+ return seq != ctx->cached_cq_tail
+ atomic_read(&ctx->cached_cq_overflow);
-}
-
-static inline bool req_need_defer(struct io_kiocb *req)
-{
- if (unlikely(req->flags & REQ_F_IO_DRAIN))
- return __req_need_defer(req);
+ }
return false;
}
@@ -1046,28 +1107,7 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
}
}
-static inline void io_req_work_grab_env(struct io_kiocb *req,
- const struct io_op_def *def)
-{
- if (!req->work.mm && def->needs_mm) {
- mmgrab(current->mm);
- req->work.mm = current->mm;
- }
- if (!req->work.creds)
- req->work.creds = get_current_cred();
- if (!req->work.fs && def->needs_fs) {
- spin_lock(¤t->fs->lock);
- if (!current->fs->in_exec) {
- req->work.fs = current->fs;
- req->work.fs->users++;
- } else {
- req->work.flags |= IO_WQ_WORK_CANCEL;
- }
- spin_unlock(¤t->fs->lock);
- }
-}
-
-static inline void io_req_work_drop_env(struct io_kiocb *req)
+static void io_req_clean_work(struct io_kiocb *req)
{
if (!(req->flags & REQ_F_WORK_INITIALIZED))
return;
@@ -1089,11 +1129,12 @@ static inline void io_req_work_drop_env(struct io_kiocb *req)
spin_unlock(&req->work.fs->lock);
if (fs)
free_fs_struct(fs);
+ req->work.fs = NULL;
}
+ req->flags &= ~REQ_F_WORK_INITIALIZED;
}
-static inline void io_prep_async_work(struct io_kiocb *req,
- struct io_kiocb **link)
+static void io_prep_async_work(struct io_kiocb *req)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
@@ -1106,18 +1147,42 @@ static inline void io_prep_async_work(struct io_kiocb *req,
if (def->unbound_nonreg_file)
req->work.flags |= IO_WQ_WORK_UNBOUND;
}
-
- io_req_work_grab_env(req, def);
-
- *link = io_prep_linked_timeout(req);
+ if (!req->work.mm && def->needs_mm) {
+ mmgrab(current->mm);
+ req->work.mm = current->mm;
+ }
+ if (!req->work.creds)
+ req->work.creds = get_current_cred();
+ if (!req->work.fs && def->needs_fs) {
+ spin_lock(¤t->fs->lock);
+ if (!current->fs->in_exec) {
+ req->work.fs = current->fs;
+ req->work.fs->users++;
+ } else {
+ req->work.flags |= IO_WQ_WORK_CANCEL;
+ }
+ spin_unlock(¤t->fs->lock);
+ }
+ if (def->needs_fsize)
+ req->work.fsize = rlimit(RLIMIT_FSIZE);
+ else
+ req->work.fsize = RLIM_INFINITY;
}
-static inline void io_queue_async_work(struct io_kiocb *req)
+static void io_prep_async_link(struct io_kiocb *req)
+{
+ struct io_kiocb *cur;
+
+ io_prep_async_work(req);
+ if (req->flags & REQ_F_LINK_HEAD)
+ list_for_each_entry(cur, &req->link_list, link_list)
+ io_prep_async_work(cur);
+}
+
+static void __io_queue_async_work(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- struct io_kiocb *link;
-
- io_prep_async_work(req, &link);
+ struct io_kiocb *link = io_prep_linked_timeout(req);
trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
&req->work, req->flags);
@@ -1127,14 +1192,22 @@ static inline void io_queue_async_work(struct io_kiocb *req)
io_queue_linked_timeout(link);
}
+static void io_queue_async_work(struct io_kiocb *req)
+{
+ /* init ->work of the whole link before punting */
+ io_prep_async_link(req);
+ __io_queue_async_work(req);
+}
+
static void io_kill_timeout(struct io_kiocb *req)
{
int ret;
ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
if (ret != -1) {
- atomic_inc(&req->ctx->cq_timeouts);
- list_del_init(&req->list);
+ atomic_set(&req->ctx->cq_timeouts,
+ atomic_read(&req->ctx->cq_timeouts) + 1);
+ list_del_init(&req->timeout.list);
req->flags |= REQ_F_COMP_LOCKED;
io_cqring_fill_event(req, 0);
io_put_req(req);
@@ -1146,7 +1219,7 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx)
struct io_kiocb *req, *tmp;
spin_lock_irq(&ctx->completion_lock);
- list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+ list_for_each_entry_safe(req, tmp, &ctx->timeout_list, timeout.list)
io_kill_timeout(req);
spin_unlock_irq(&ctx->completion_lock);
}
@@ -1154,13 +1227,15 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx)
static void __io_queue_deferred(struct io_ring_ctx *ctx)
{
do {
- struct io_kiocb *req = list_first_entry(&ctx->defer_list,
- struct io_kiocb, list);
+ struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
+ struct io_defer_entry, list);
- if (req_need_defer(req))
+ if (req_need_defer(de->req, de->seq))
break;
- list_del_init(&req->list);
- io_queue_async_work(req);
+ list_del_init(&de->list);
+ /* punt-init is done before queueing for defer */
+ __io_queue_async_work(de->req);
+ kfree(de);
} while (!list_empty(&ctx->defer_list));
}
@@ -1168,15 +1243,15 @@ static void io_flush_timeouts(struct io_ring_ctx *ctx)
{
while (!list_empty(&ctx->timeout_list)) {
struct io_kiocb *req = list_first_entry(&ctx->timeout_list,
- struct io_kiocb, list);
+ struct io_kiocb, timeout.list);
- if (req->flags & REQ_F_TIMEOUT_NOSEQ)
+ if (io_is_timeout_noseq(req))
break;
if (req->timeout.target_seq != ctx->cached_cq_tail
- atomic_read(&ctx->cq_timeouts))
break;
- list_del_init(&req->list);
+ list_del_init(&req->timeout.list);
io_kill_timeout(req);
}
}
@@ -1229,6 +1304,15 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
eventfd_signal(ctx->cq_ev_fd, 1);
}
+static void io_cqring_mark_overflow(struct io_ring_ctx *ctx)
+{
+ if (list_empty(&ctx->cq_overflow_list)) {
+ clear_bit(0, &ctx->sq_check_overflow);
+ clear_bit(0, &ctx->cq_check_overflow);
+ ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW;
+ }
+}
+
/* Returns true if there are no backlogged entries after the flush */
static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
{
@@ -1259,13 +1343,13 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
break;
req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
- list);
- list_move(&req->list, &list);
+ compl.list);
+ list_move(&req->compl.list, &list);
req->flags &= ~REQ_F_OVERFLOW;
if (cqe) {
WRITE_ONCE(cqe->user_data, req->user_data);
WRITE_ONCE(cqe->res, req->result);
- WRITE_ONCE(cqe->flags, req->cflags);
+ WRITE_ONCE(cqe->flags, req->compl.cflags);
} else {
WRITE_ONCE(ctx->rings->cq_overflow,
atomic_inc_return(&ctx->cached_cq_overflow));
@@ -1273,17 +1357,14 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
}
io_commit_cqring(ctx);
- if (cqe) {
- clear_bit(0, &ctx->sq_check_overflow);
- clear_bit(0, &ctx->cq_check_overflow);
- ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW;
- }
+ io_cqring_mark_overflow(ctx);
+
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
while (!list_empty(&list)) {
- req = list_first_entry(&list, struct io_kiocb, list);
- list_del(&req->list);
+ req = list_first_entry(&list, struct io_kiocb, compl.list);
+ list_del(&req->compl.list);
io_put_req(req);
}
@@ -1316,11 +1397,12 @@ static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags)
set_bit(0, &ctx->cq_check_overflow);
ctx->rings->sq_flags |= IORING_SQ_CQ_OVERFLOW;
}
+ io_clean_op(req);
req->flags |= REQ_F_OVERFLOW;
- refcount_inc(&req->refs);
req->result = res;
- req->cflags = cflags;
- list_add_tail(&req->list, &ctx->cq_overflow_list);
+ req->compl.cflags = cflags;
+ refcount_inc(&req->refs);
+ list_add_tail(&req->compl.list, &ctx->cq_overflow_list);
}
}
@@ -1329,7 +1411,7 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
__io_cqring_fill_event(req, res, 0);
}
-static void __io_cqring_add_event(struct io_kiocb *req, long res, long cflags)
+static void io_cqring_add_event(struct io_kiocb *req, long res, long cflags)
{
struct io_ring_ctx *ctx = req->ctx;
unsigned long flags;
@@ -1342,9 +1424,52 @@ static void __io_cqring_add_event(struct io_kiocb *req, long res, long cflags)
io_cqring_ev_posted(ctx);
}
-static void io_cqring_add_event(struct io_kiocb *req, long res)
+static void io_submit_flush_completions(struct io_comp_state *cs)
{
- __io_cqring_add_event(req, res, 0);
+ struct io_ring_ctx *ctx = cs->ctx;
+
+ spin_lock_irq(&ctx->completion_lock);
+ while (!list_empty(&cs->list)) {
+ struct io_kiocb *req;
+
+ req = list_first_entry(&cs->list, struct io_kiocb, compl.list);
+ list_del(&req->compl.list);
+ __io_cqring_fill_event(req, req->result, req->compl.cflags);
+ if (!(req->flags & REQ_F_LINK_HEAD)) {
+ req->flags |= REQ_F_COMP_LOCKED;
+ io_put_req(req);
+ } else {
+ spin_unlock_irq(&ctx->completion_lock);
+ io_put_req(req);
+ spin_lock_irq(&ctx->completion_lock);
+ }
+ }
+ io_commit_cqring(ctx);
+ spin_unlock_irq(&ctx->completion_lock);
+
+ io_cqring_ev_posted(ctx);
+ cs->nr = 0;
+}
+
+static void __io_req_complete(struct io_kiocb *req, long res, unsigned cflags,
+ struct io_comp_state *cs)
+{
+ if (!cs) {
+ io_cqring_add_event(req, res, cflags);
+ io_put_req(req);
+ } else {
+ io_clean_op(req);
+ req->result = res;
+ req->compl.cflags = cflags;
+ list_add_tail(&req->compl.list, &cs->list);
+ if (++cs->nr >= 32)
+ io_submit_flush_completions(cs);
+ }
+}
+
+static void io_req_complete(struct io_kiocb *req, long res)
+{
+ __io_req_complete(req, res, 0, NULL);
}
static inline bool io_is_fallback_req(struct io_kiocb *req)
@@ -1370,11 +1495,7 @@ static struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx,
gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
struct io_kiocb *req;
- if (!state) {
- req = kmem_cache_alloc(req_cachep, gfp);
- if (unlikely(!req))
- goto fallback;
- } else if (!state->free_reqs) {
+ if (!state->free_reqs) {
size_t sz;
int ret;
@@ -1412,21 +1533,15 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file,
fput(file);
}
-static void __io_req_aux_free(struct io_kiocb *req)
+static void io_dismantle_req(struct io_kiocb *req)
{
- if (req->flags & REQ_F_NEED_CLEANUP)
- io_cleanup_req(req);
+ io_clean_op(req);
- kfree(req->io);
+ if (req->io)
+ kfree(req->io);
if (req->file)
io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE));
- __io_put_req_task(req);
- io_req_work_drop_env(req);
-}
-
-static void __io_free_req(struct io_kiocb *req)
-{
- __io_req_aux_free(req);
+ io_req_clean_work(req);
if (req->flags & REQ_F_INFLIGHT) {
struct io_ring_ctx *ctx = req->ctx;
@@ -1438,57 +1553,20 @@ static void __io_free_req(struct io_kiocb *req)
wake_up(&ctx->inflight_wait);
spin_unlock_irqrestore(&ctx->inflight_lock, flags);
}
+}
- percpu_ref_put(&req->ctx->refs);
+static void __io_free_req(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx;
+
+ io_dismantle_req(req);
+ __io_put_req_task(req);
+ ctx = req->ctx;
if (likely(!io_is_fallback_req(req)))
kmem_cache_free(req_cachep, req);
else
- clear_bit_unlock(0, (unsigned long *) &req->ctx->fallback_req);
-}
-
-struct req_batch {
- void *reqs[IO_IOPOLL_BATCH];
- int to_free;
- int need_iter;
-};
-
-static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
-{
- if (!rb->to_free)
- return;
- if (rb->need_iter) {
- int i, inflight = 0;
- unsigned long flags;
-
- for (i = 0; i < rb->to_free; i++) {
- struct io_kiocb *req = rb->reqs[i];
-
- if (req->flags & REQ_F_INFLIGHT)
- inflight++;
- __io_req_aux_free(req);
- }
- if (!inflight)
- goto do_free;
-
- spin_lock_irqsave(&ctx->inflight_lock, flags);
- for (i = 0; i < rb->to_free; i++) {
- struct io_kiocb *req = rb->reqs[i];
-
- if (req->flags & REQ_F_INFLIGHT) {
- list_del(&req->inflight_entry);
- if (!--inflight)
- break;
- }
- }
- spin_unlock_irqrestore(&ctx->inflight_lock, flags);
-
- if (waitqueue_active(&ctx->inflight_wait))
- wake_up(&ctx->inflight_wait);
- }
-do_free:
- kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
- percpu_ref_put_many(&ctx->refs, rb->to_free);
- rb->to_free = rb->need_iter = 0;
+ clear_bit_unlock(0, (unsigned long *) &ctx->fallback_req);
+ percpu_ref_put(&ctx->refs);
}
static bool io_link_cancel_timeout(struct io_kiocb *req)
@@ -1508,53 +1586,67 @@ static bool io_link_cancel_timeout(struct io_kiocb *req)
return false;
}
-static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
+static bool __io_kill_linked_timeout(struct io_kiocb *req)
+{
+ struct io_kiocb *link;
+ bool wake_ev;
+
+ if (list_empty(&req->link_list))
+ return false;
+ link = list_first_entry(&req->link_list, struct io_kiocb, link_list);
+ if (link->opcode != IORING_OP_LINK_TIMEOUT)
+ return false;
+
+ list_del_init(&link->link_list);
+ wake_ev = io_link_cancel_timeout(link);
+ req->flags &= ~REQ_F_LINK_TIMEOUT;
+ return wake_ev;
+}
+
+static void io_kill_linked_timeout(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- bool wake_ev = false;
+ bool wake_ev;
- /* Already got next link */
- if (req->flags & REQ_F_LINK_NEXT)
- return;
+ if (!(req->flags & REQ_F_COMP_LOCKED)) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->completion_lock, flags);
+ wake_ev = __io_kill_linked_timeout(req);
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ } else {
+ wake_ev = __io_kill_linked_timeout(req);
+ }
+
+ if (wake_ev)
+ io_cqring_ev_posted(ctx);
+}
+
+static struct io_kiocb *io_req_link_next(struct io_kiocb *req)
+{
+ struct io_kiocb *nxt;
/*
* The list should never be empty when we are called here. But could
* potentially happen if the chain is messed up, check to be on the
* safe side.
*/
- while (!list_empty(&req->link_list)) {
- struct io_kiocb *nxt = list_first_entry(&req->link_list,
- struct io_kiocb, link_list);
+ if (unlikely(list_empty(&req->link_list)))
+ return NULL;
- if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
- (nxt->flags & REQ_F_TIMEOUT))) {
- list_del_init(&nxt->link_list);
- wake_ev |= io_link_cancel_timeout(nxt);
- req->flags &= ~REQ_F_LINK_TIMEOUT;
- continue;
- }
-
- list_del_init(&req->link_list);
- if (!list_empty(&nxt->link_list))
- nxt->flags |= REQ_F_LINK_HEAD;
- *nxtptr = nxt;
- break;
- }
-
- req->flags |= REQ_F_LINK_NEXT;
- if (wake_ev)
- io_cqring_ev_posted(ctx);
+ nxt = list_first_entry(&req->link_list, struct io_kiocb, link_list);
+ list_del_init(&req->link_list);
+ if (!list_empty(&nxt->link_list))
+ nxt->flags |= REQ_F_LINK_HEAD;
+ return nxt;
}
/*
* Called if REQ_F_LINK_HEAD is set, and we fail the head request
*/
-static void io_fail_links(struct io_kiocb *req)
+static void __io_fail_links(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- unsigned long flags;
-
- spin_lock_irqsave(&ctx->completion_lock, flags);
while (!list_empty(&req->link_list)) {
struct io_kiocb *link = list_first_entry(&req->link_list,
@@ -1563,25 +1655,37 @@ static void io_fail_links(struct io_kiocb *req)
list_del_init(&link->link_list);
trace_io_uring_fail_link(req, link);
- if ((req->flags & REQ_F_LINK_TIMEOUT) &&
- link->opcode == IORING_OP_LINK_TIMEOUT) {
- io_link_cancel_timeout(link);
- } else {
- io_cqring_fill_event(link, -ECANCELED);
- __io_double_put_req(link);
- }
+ io_cqring_fill_event(link, -ECANCELED);
+ __io_double_put_req(link);
req->flags &= ~REQ_F_LINK_TIMEOUT;
}
io_commit_cqring(ctx);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
}
-static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
+static void io_fail_links(struct io_kiocb *req)
{
- if (likely(!(req->flags & REQ_F_LINK_HEAD)))
- return;
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (!(req->flags & REQ_F_COMP_LOCKED)) {
+ unsigned long flags;
+
+ spin_lock_irqsave(&ctx->completion_lock, flags);
+ __io_fail_links(req);
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ } else {
+ __io_fail_links(req);
+ }
+
+ io_cqring_ev_posted(ctx);
+}
+
+static struct io_kiocb *__io_req_find_next(struct io_kiocb *req)
+{
+ req->flags &= ~REQ_F_LINK_HEAD;
+ if (req->flags & REQ_F_LINK_TIMEOUT)
+ io_kill_linked_timeout(req);
/*
* If LINK is set, we have dependent requests in this chain. If we
@@ -1589,62 +1693,187 @@ static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
* dependencies to the next request. In case of failure, fail the rest
* of the chain.
*/
- if (req->flags & REQ_F_FAIL_LINK) {
- io_fail_links(req);
- } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
- REQ_F_LINK_TIMEOUT) {
- struct io_ring_ctx *ctx = req->ctx;
- unsigned long flags;
+ if (likely(!(req->flags & REQ_F_FAIL_LINK)))
+ return io_req_link_next(req);
+ io_fail_links(req);
+ return NULL;
+}
- /*
- * If this is a timeout link, we could be racing with the
- * timeout timer. Grab the completion lock for this case to
- * protect against that.
- */
- spin_lock_irqsave(&ctx->completion_lock, flags);
- io_req_link_next(req, nxt);
- spin_unlock_irqrestore(&ctx->completion_lock, flags);
+static struct io_kiocb *io_req_find_next(struct io_kiocb *req)
+{
+ if (likely(!(req->flags & REQ_F_LINK_HEAD)))
+ return NULL;
+ return __io_req_find_next(req);
+}
+
+static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
+{
+ struct task_struct *tsk = req->task;
+ struct io_ring_ctx *ctx = req->ctx;
+ int ret, notify = TWA_RESUME;
+
+ /*
+ * SQPOLL kernel thread doesn't need notification, just a wakeup.
+ * If we're not using an eventfd, then TWA_RESUME is always fine,
+ * as we won't have dependencies between request completions for
+ * other kernel wait conditions.
+ */
+ if (ctx->flags & IORING_SETUP_SQPOLL)
+ notify = 0;
+ else if (ctx->cq_ev_fd)
+ notify = TWA_SIGNAL;
+
+ ret = task_work_add(tsk, cb, notify);
+ if (!ret)
+ wake_up_process(tsk);
+ return ret;
+}
+
+static void __io_req_task_cancel(struct io_kiocb *req, int error)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ spin_lock_irq(&ctx->completion_lock);
+ io_cqring_fill_event(req, error);
+ io_commit_cqring(ctx);
+ spin_unlock_irq(&ctx->completion_lock);
+
+ io_cqring_ev_posted(ctx);
+ req_set_fail_links(req);
+ io_double_put_req(req);
+}
+
+static void io_req_task_cancel(struct callback_head *cb)
+{
+ struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+
+ __io_req_task_cancel(req, -ECANCELED);
+}
+
+static void __io_req_task_submit(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ if (!__io_sq_thread_acquire_mm(ctx)) {
+ mutex_lock(&ctx->uring_lock);
+ __io_queue_sqe(req, NULL, NULL);
+ mutex_unlock(&ctx->uring_lock);
} else {
- io_req_link_next(req, nxt);
+ __io_req_task_cancel(req, -EFAULT);
}
}
+static void io_req_task_submit(struct callback_head *cb)
+{
+ struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+
+ __io_req_task_submit(req);
+}
+
+static void io_req_task_queue(struct io_kiocb *req)
+{
+ int ret;
+
+ init_task_work(&req->task_work, io_req_task_submit);
+
+ ret = io_req_task_work_add(req, &req->task_work);
+ if (unlikely(ret)) {
+ struct task_struct *tsk;
+
+ init_task_work(&req->task_work, io_req_task_cancel);
+ tsk = io_wq_get_task(req->ctx->io_wq);
+ task_work_add(tsk, &req->task_work, 0);
+ wake_up_process(tsk);
+ }
+}
+
+static void io_queue_next(struct io_kiocb *req)
+{
+ struct io_kiocb *nxt = io_req_find_next(req);
+
+ if (nxt)
+ io_req_task_queue(nxt);
+}
+
static void io_free_req(struct io_kiocb *req)
{
- struct io_kiocb *nxt = NULL;
-
- io_req_find_next(req, &nxt);
+ io_queue_next(req);
__io_free_req(req);
-
- if (nxt)
- io_queue_async_work(nxt);
}
-static void io_wq_assign_next(struct io_wq_work **workptr, struct io_kiocb *nxt)
+struct req_batch {
+ void *reqs[IO_IOPOLL_BATCH];
+ int to_free;
+
+ struct task_struct *task;
+ int task_refs;
+};
+
+static inline void io_init_req_batch(struct req_batch *rb)
{
- struct io_kiocb *link;
- const struct io_op_def *def = &io_op_defs[nxt->opcode];
+ rb->to_free = 0;
+ rb->task_refs = 0;
+ rb->task = NULL;
+}
- if ((nxt->flags & REQ_F_ISREG) && def->hash_reg_file)
- io_wq_hash_work(&nxt->work, file_inode(nxt->file));
+static void __io_req_free_batch_flush(struct io_ring_ctx *ctx,
+ struct req_batch *rb)
+{
+ kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
+ percpu_ref_put_many(&ctx->refs, rb->to_free);
+ rb->to_free = 0;
+}
- *workptr = &nxt->work;
- link = io_prep_linked_timeout(nxt);
- if (link)
- nxt->flags |= REQ_F_QUEUE_TIMEOUT;
+static void io_req_free_batch_finish(struct io_ring_ctx *ctx,
+ struct req_batch *rb)
+{
+ if (rb->to_free)
+ __io_req_free_batch_flush(ctx, rb);
+ if (rb->task) {
+ put_task_struct_many(rb->task, rb->task_refs);
+ rb->task = NULL;
+ }
+}
+
+static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req)
+{
+ if (unlikely(io_is_fallback_req(req))) {
+ io_free_req(req);
+ return;
+ }
+ if (req->flags & REQ_F_LINK_HEAD)
+ io_queue_next(req);
+
+ if (req->flags & REQ_F_TASK_PINNED) {
+ if (req->task != rb->task) {
+ if (rb->task)
+ put_task_struct_many(rb->task, rb->task_refs);
+ rb->task = req->task;
+ rb->task_refs = 0;
+ }
+ rb->task_refs++;
+ req->flags &= ~REQ_F_TASK_PINNED;
+ }
+
+ io_dismantle_req(req);
+ rb->reqs[rb->to_free++] = req;
+ if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
+ __io_req_free_batch_flush(req->ctx, rb);
}
/*
* Drop reference to request, return next in chain (if there is one) if this
* was the last reference to this request.
*/
-__attribute__((nonnull))
-static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
+static struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
{
+ struct io_kiocb *nxt = NULL;
+
if (refcount_dec_and_test(&req->refs)) {
- io_req_find_next(req, nxtptr);
+ nxt = io_req_find_next(req);
__io_free_req(req);
}
+ return nxt;
}
static void io_put_req(struct io_kiocb *req)
@@ -1653,24 +1882,20 @@ static void io_put_req(struct io_kiocb *req)
io_free_req(req);
}
-static void io_steal_work(struct io_kiocb *req,
- struct io_wq_work **workptr)
+static struct io_wq_work *io_steal_work(struct io_kiocb *req)
{
- /*
- * It's in an io-wq worker, so there always should be at least
- * one reference, which will be dropped in io_put_work() just
- * after the current handler returns.
- *
- * It also means, that if the counter dropped to 1, then there is
- * no asynchronous users left, so it's safe to steal the next work.
- */
- if (refcount_read(&req->refs) == 1) {
- struct io_kiocb *nxt = NULL;
+ struct io_kiocb *nxt;
- io_req_find_next(req, &nxt);
- if (nxt)
- io_wq_assign_next(workptr, nxt);
- }
+ /*
+ * A ref is owned by io-wq in which context we're. So, if that's the
+ * last one, it's safe to steal next work. False negatives are Ok,
+ * it just will be re-punted async in io_put_work()
+ */
+ if (refcount_read(&req->refs) != 1)
+ return NULL;
+
+ nxt = io_req_find_next(req);
+ return nxt ? &nxt->work : NULL;
}
/*
@@ -1720,50 +1945,44 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
}
-static inline bool io_req_multi_free(struct req_batch *rb, struct io_kiocb *req)
+static unsigned int io_put_kbuf(struct io_kiocb *req, struct io_buffer *kbuf)
{
- if ((req->flags & REQ_F_LINK_HEAD) || io_is_fallback_req(req))
- return false;
+ unsigned int cflags;
- if (req->file || req->io)
- rb->need_iter++;
-
- rb->reqs[rb->to_free++] = req;
- if (unlikely(rb->to_free == ARRAY_SIZE(rb->reqs)))
- io_free_req_many(req->ctx, rb);
- return true;
-}
-
-static int io_put_kbuf(struct io_kiocb *req)
-{
- struct io_buffer *kbuf;
- int cflags;
-
- kbuf = (struct io_buffer *) (unsigned long) req->rw.addr;
cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
cflags |= IORING_CQE_F_BUFFER;
- req->rw.addr = 0;
+ req->flags &= ~REQ_F_BUFFER_SELECTED;
kfree(kbuf);
return cflags;
}
+static inline unsigned int io_put_rw_kbuf(struct io_kiocb *req)
+{
+ struct io_buffer *kbuf;
+
+ kbuf = (struct io_buffer *) (unsigned long) req->rw.addr;
+ return io_put_kbuf(req, kbuf);
+}
+
+static inline bool io_run_task_work(void)
+{
+ if (current->task_works) {
+ __set_current_state(TASK_RUNNING);
+ task_work_run();
+ return true;
+ }
+
+ return false;
+}
+
static void io_iopoll_queue(struct list_head *again)
{
struct io_kiocb *req;
do {
- req = list_first_entry(again, struct io_kiocb, list);
- list_del(&req->list);
-
- /* shouldn't happen unless io_uring is dying, cancel reqs */
- if (unlikely(!current->mm)) {
- io_complete_rw_common(&req->rw.kiocb, -EAGAIN);
- io_put_req(req);
- continue;
- }
-
- refcount_inc(&req->refs);
- io_queue_async_work(req);
+ req = list_first_entry(again, struct io_kiocb, inflight_entry);
+ list_del(&req->inflight_entry);
+ __io_complete_rw(req, -EAGAIN, 0, NULL);
} while (!list_empty(again));
}
@@ -1780,33 +1999,32 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
/* order with ->result store in io_complete_rw_iopoll() */
smp_rmb();
- rb.to_free = rb.need_iter = 0;
+ io_init_req_batch(&rb);
while (!list_empty(done)) {
int cflags = 0;
- req = list_first_entry(done, struct io_kiocb, list);
+ req = list_first_entry(done, struct io_kiocb, inflight_entry);
if (READ_ONCE(req->result) == -EAGAIN) {
req->iopoll_completed = 0;
- list_move_tail(&req->list, &again);
+ list_move_tail(&req->inflight_entry, &again);
continue;
}
- list_del(&req->list);
+ list_del(&req->inflight_entry);
if (req->flags & REQ_F_BUFFER_SELECTED)
- cflags = io_put_kbuf(req);
+ cflags = io_put_rw_kbuf(req);
__io_cqring_fill_event(req, req->result, cflags);
(*nr_events)++;
- if (refcount_dec_and_test(&req->refs) &&
- !io_req_multi_free(&rb, req))
- io_free_req(req);
+ if (refcount_dec_and_test(&req->refs))
+ io_req_free_batch(&rb, req);
}
io_commit_cqring(ctx);
if (ctx->flags & IORING_SETUP_SQPOLL)
io_cqring_ev_posted(ctx);
- io_free_req_many(ctx, &rb);
+ io_req_free_batch_finish(ctx, &rb);
if (!list_empty(&again))
io_iopoll_queue(&again);
@@ -1827,7 +2045,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
spin = !ctx->poll_multi_file && *nr_events < min;
ret = 0;
- list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
+ list_for_each_entry_safe(req, tmp, &ctx->iopoll_list, inflight_entry) {
struct kiocb *kiocb = &req->rw.kiocb;
/*
@@ -1836,7 +2054,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
* and complete those lists first, if we have entries there.
*/
if (READ_ONCE(req->iopoll_completed)) {
- list_move_tail(&req->list, &done);
+ list_move_tail(&req->inflight_entry, &done);
continue;
}
if (!list_empty(&done))
@@ -1846,6 +2064,10 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
if (ret < 0)
break;
+ /* iopoll may have completed current req */
+ if (READ_ONCE(req->iopoll_completed))
+ list_move_tail(&req->inflight_entry, &done);
+
if (ret && spin)
spin = false;
ret = 0;
@@ -1865,13 +2087,13 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
long min)
{
- while (!list_empty(&ctx->poll_list) && !need_resched()) {
+ while (!list_empty(&ctx->iopoll_list) && !need_resched()) {
int ret;
ret = io_do_iopoll(ctx, nr_events, min);
if (ret < 0)
return ret;
- if (!min || *nr_events >= min)
+ if (*nr_events >= min)
return 0;
}
@@ -1882,29 +2104,37 @@ static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
* We can't just wait for polled events to come to us, we have to actively
* find and complete them.
*/
-static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
+static void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
{
if (!(ctx->flags & IORING_SETUP_IOPOLL))
return;
mutex_lock(&ctx->uring_lock);
- while (!list_empty(&ctx->poll_list)) {
+ while (!list_empty(&ctx->iopoll_list)) {
unsigned int nr_events = 0;
- io_iopoll_getevents(ctx, &nr_events, 1);
+ io_do_iopoll(ctx, &nr_events, 0);
+ /* let it sleep and repeat later if can't complete a request */
+ if (nr_events == 0)
+ break;
/*
* Ensure we allow local-to-the-cpu processing to take place,
* in this case we need to ensure that we reap all events.
+ * Also let task_work, etc. to progress by releasing the mutex
*/
- cond_resched();
+ if (need_resched()) {
+ mutex_unlock(&ctx->uring_lock);
+ cond_resched();
+ mutex_lock(&ctx->uring_lock);
+ }
}
mutex_unlock(&ctx->uring_lock);
}
-static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
- long min)
+static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
{
+ unsigned int nr_events = 0;
int iters = 0, ret = 0;
/*
@@ -1914,8 +2144,6 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
*/
mutex_lock(&ctx->uring_lock);
do {
- int tmin = 0;
-
/*
* Don't enter poll loop if we already have events pending.
* If we do, we can potentially be spinning for commands that
@@ -1936,17 +2164,15 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
*/
if (!(++iters & 7)) {
mutex_unlock(&ctx->uring_lock);
+ io_run_task_work();
mutex_lock(&ctx->uring_lock);
}
- if (*nr_events < min)
- tmin = min - *nr_events;
-
- ret = io_iopoll_getevents(ctx, nr_events, tmin);
+ ret = io_iopoll_getevents(ctx, &nr_events, min);
if (ret <= 0)
break;
ret = 0;
- } while (min && !*nr_events && !need_resched());
+ } while (min && !nr_events && !need_resched());
mutex_unlock(&ctx->uring_lock);
return ret;
@@ -1966,13 +2192,8 @@ static void kiocb_end_write(struct io_kiocb *req)
file_end_write(req->file);
}
-static inline void req_set_fail_links(struct io_kiocb *req)
-{
- if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
- req->flags |= REQ_F_FAIL_LINK;
-}
-
-static void io_complete_rw_common(struct kiocb *kiocb, long res)
+static void io_complete_rw_common(struct kiocb *kiocb, long res,
+ struct io_comp_state *cs)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
int cflags = 0;
@@ -1983,16 +2204,96 @@ static void io_complete_rw_common(struct kiocb *kiocb, long res)
if (res != req->result)
req_set_fail_links(req);
if (req->flags & REQ_F_BUFFER_SELECTED)
- cflags = io_put_kbuf(req);
- __io_cqring_add_event(req, res, cflags);
+ cflags = io_put_rw_kbuf(req);
+ __io_req_complete(req, res, cflags, cs);
+}
+
+#ifdef CONFIG_BLOCK
+static bool io_resubmit_prep(struct io_kiocb *req, int error)
+{
+ struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
+ ssize_t ret = -ECANCELED;
+ struct iov_iter iter;
+ int rw;
+
+ if (error) {
+ ret = error;
+ goto end_req;
+ }
+
+ switch (req->opcode) {
+ case IORING_OP_READV:
+ case IORING_OP_READ_FIXED:
+ case IORING_OP_READ:
+ rw = READ;
+ break;
+ case IORING_OP_WRITEV:
+ case IORING_OP_WRITE_FIXED:
+ case IORING_OP_WRITE:
+ rw = WRITE;
+ break;
+ default:
+ printk_once(KERN_WARNING "io_uring: bad opcode in resubmit %d\n",
+ req->opcode);
+ goto end_req;
+ }
+
+ ret = io_import_iovec(rw, req, &iovec, &iter, false);
+ if (ret < 0)
+ goto end_req;
+ ret = io_setup_async_rw(req, ret, iovec, inline_vecs, &iter);
+ if (!ret)
+ return true;
+ kfree(iovec);
+end_req:
+ req_set_fail_links(req);
+ io_req_complete(req, ret);
+ return false;
+}
+
+static void io_rw_resubmit(struct callback_head *cb)
+{
+ struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
+ struct io_ring_ctx *ctx = req->ctx;
+ int err;
+
+ err = io_sq_thread_acquire_mm(ctx, req);
+
+ if (io_resubmit_prep(req, err)) {
+ refcount_inc(&req->refs);
+ io_queue_async_work(req);
+ }
+}
+#endif
+
+static bool io_rw_reissue(struct io_kiocb *req, long res)
+{
+#ifdef CONFIG_BLOCK
+ int ret;
+
+ if ((res != -EAGAIN && res != -EOPNOTSUPP) || io_wq_current_is_worker())
+ return false;
+
+ init_task_work(&req->task_work, io_rw_resubmit);
+ ret = io_req_task_work_add(req, &req->task_work);
+ if (!ret)
+ return true;
+#endif
+ return false;
+}
+
+static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
+ struct io_comp_state *cs)
+{
+ if (!io_rw_reissue(req, res))
+ io_complete_rw_common(&req->rw.kiocb, res, cs);
}
static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
- io_complete_rw_common(kiocb, res);
- io_put_req(req);
+ __io_complete_rw(req, res, res2, NULL);
}
static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
@@ -2026,13 +2327,13 @@ static void io_iopoll_req_issued(struct io_kiocb *req)
* how we do polling eventually, not spinning if we're on potentially
* different devices.
*/
- if (list_empty(&ctx->poll_list)) {
+ if (list_empty(&ctx->iopoll_list)) {
ctx->poll_multi_file = false;
} else if (!ctx->poll_multi_file) {
struct io_kiocb *list_req;
- list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
- list);
+ list_req = list_first_entry(&ctx->iopoll_list, struct io_kiocb,
+ inflight_entry);
if (list_req->file != req->file)
ctx->poll_multi_file = true;
}
@@ -2042,9 +2343,9 @@ static void io_iopoll_req_issued(struct io_kiocb *req)
* it to the front so we find it first.
*/
if (READ_ONCE(req->iopoll_completed))
- list_add(&req->list, &ctx->poll_list);
+ list_add(&req->inflight_entry, &ctx->iopoll_list);
else
- list_add_tail(&req->list, &ctx->poll_list);
+ list_add_tail(&req->inflight_entry, &ctx->iopoll_list);
if ((ctx->flags & IORING_SETUP_SQPOLL) &&
wq_has_sleeper(&ctx->sqo_wait))
@@ -2053,10 +2354,8 @@ static void io_iopoll_req_issued(struct io_kiocb *req)
static void __io_state_file_put(struct io_submit_state *state)
{
- int diff = state->has_refs - state->used_refs;
-
- if (diff)
- fput_many(state->file, diff);
+ if (state->has_refs)
+ fput_many(state->file, state->has_refs);
state->file = NULL;
}
@@ -2078,7 +2377,7 @@ static struct file *__io_file_get(struct io_submit_state *state, int fd)
if (state->file) {
if (state->fd == fd) {
- state->used_refs++;
+ state->has_refs--;
state->ios_left--;
return state->file;
}
@@ -2089,12 +2388,20 @@ static struct file *__io_file_get(struct io_submit_state *state, int fd)
return NULL;
state->fd = fd;
- state->has_refs = state->ios_left;
- state->used_refs = 1;
state->ios_left--;
+ state->has_refs = state->ios_left;
return state->file;
}
+static bool io_bdev_nowait(struct block_device *bdev)
+{
+#ifdef CONFIG_BLOCK
+ return !bdev || queue_is_mq(bdev_get_queue(bdev));
+#else
+ return true;
+#endif
+}
+
/*
* If we tracked the file through the SCM inflight mechanism, we could support
* any file. For now, just ensure that anything potentially problematic is done
@@ -2104,10 +2411,19 @@ static bool io_file_supports_async(struct file *file, int rw)
{
umode_t mode = file_inode(file)->i_mode;
- if (S_ISBLK(mode) || S_ISCHR(mode) || S_ISSOCK(mode))
+ if (S_ISBLK(mode)) {
+ if (io_bdev_nowait(file->f_inode->i_bdev))
+ return true;
+ return false;
+ }
+ if (S_ISCHR(mode) || S_ISSOCK(mode))
return true;
- if (S_ISREG(mode) && file->f_op != &io_uring_fops)
- return true;
+ if (S_ISREG(mode)) {
+ if (io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
+ file->f_op != &io_uring_fops)
+ return true;
+ return false;
+ }
/* any ->read/write should understand O_NONBLOCK */
if (file->f_flags & O_NONBLOCK)
@@ -2158,6 +2474,9 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (kiocb->ki_flags & IOCB_NOWAIT)
req->flags |= REQ_F_NOWAIT;
+ if (kiocb->ki_flags & IOCB_DIRECT)
+ io_get_req_task(req);
+
if (force_nonblock)
kiocb->ki_flags |= IOCB_NOWAIT;
@@ -2168,8 +2487,8 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
kiocb->ki_flags |= IOCB_HIPRI;
kiocb->ki_complete = io_complete_rw_iopoll;
- req->result = 0;
req->iopoll_completed = 0;
+ io_get_req_task(req);
} else {
if (kiocb->ki_flags & IOCB_HIPRI)
return -EINVAL;
@@ -2203,14 +2522,15 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
}
}
-static void kiocb_done(struct kiocb *kiocb, ssize_t ret)
+static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
+ struct io_comp_state *cs)
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
if (req->flags & REQ_F_CUR_POS)
req->file->f_pos = kiocb->ki_pos;
if (ret >= 0 && kiocb->ki_complete == io_complete_rw)
- io_complete_rw(kiocb, ret, 0);
+ __io_complete_rw(req, ret, 0, cs);
else
io_rw_done(kiocb, ret);
}
@@ -2466,10 +2786,8 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
if (req->io) {
struct io_async_rw *iorw = &req->io->rw;
- *iovec = iorw->iov;
- iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
- if (iorw->iov == iorw->fast_iov)
- *iovec = NULL;
+ iov_iter_init(iter, rw, iorw->iov, iorw->nr_segs, iorw->size);
+ *iovec = NULL;
return iorw->size;
}
@@ -2554,15 +2872,17 @@ static void io_req_map_rw(struct io_kiocb *req, ssize_t io_size,
struct iovec *iovec, struct iovec *fast_iov,
struct iov_iter *iter)
{
- req->io->rw.nr_segs = iter->nr_segs;
- req->io->rw.size = io_size;
- req->io->rw.iov = iovec;
- if (!req->io->rw.iov) {
- req->io->rw.iov = req->io->rw.fast_iov;
- if (req->io->rw.iov != fast_iov)
- memcpy(req->io->rw.iov, fast_iov,
+ struct io_async_rw *rw = &req->io->rw;
+
+ rw->nr_segs = iter->nr_segs;
+ rw->size = io_size;
+ if (!iovec) {
+ rw->iov = rw->fast_iov;
+ if (rw->iov != fast_iov)
+ memcpy(rw->iov, fast_iov,
sizeof(struct iovec) * iter->nr_segs);
} else {
+ rw->iov = iovec;
req->flags |= REQ_F_NEED_CLEANUP;
}
}
@@ -2596,11 +2916,27 @@ static int io_setup_async_rw(struct io_kiocb *req, ssize_t io_size,
return 0;
}
+static inline int io_rw_prep_async(struct io_kiocb *req, int rw,
+ bool force_nonblock)
+{
+ struct io_async_ctx *io = req->io;
+ struct iov_iter iter;
+ ssize_t ret;
+
+ io->rw.iov = io->rw.fast_iov;
+ req->io = NULL;
+ ret = io_import_iovec(rw, req, &io->rw.iov, &iter, !force_nonblock);
+ req->io = io;
+ if (unlikely(ret < 0))
+ return ret;
+
+ io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
+ return 0;
+}
+
static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
bool force_nonblock)
{
- struct io_async_ctx *io;
- struct iov_iter iter;
ssize_t ret;
ret = io_prep_rw(req, sqe, force_nonblock);
@@ -2613,75 +2949,169 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
/* either don't need iovec imported or already have it */
if (!req->io || req->flags & REQ_F_NEED_CLEANUP)
return 0;
-
- io = req->io;
- io->rw.iov = io->rw.fast_iov;
- req->io = NULL;
- ret = io_import_iovec(READ, req, &io->rw.iov, &iter, !force_nonblock);
- req->io = io;
- if (ret < 0)
- return ret;
-
- io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
- return 0;
+ return io_rw_prep_async(req, READ, force_nonblock);
}
-static int io_read(struct io_kiocb *req, bool force_nonblock)
+static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
+ int sync, void *arg)
+{
+ struct wait_page_queue *wpq;
+ struct io_kiocb *req = wait->private;
+ struct wait_page_key *key = arg;
+ int ret;
+
+ wpq = container_of(wait, struct wait_page_queue, wait);
+
+ if (!wake_page_match(wpq, key))
+ return 0;
+
+ /* Stop waking things up if the page is locked again */
+ if (test_bit(key->bit_nr, &key->page->flags))
+ return -1;
+
+ list_del_init(&wait->entry);
+
+ init_task_work(&req->task_work, io_req_task_submit);
+ /* submit ref gets dropped, acquire a new one */
+ refcount_inc(&req->refs);
+ ret = io_req_task_work_add(req, &req->task_work);
+ if (unlikely(ret)) {
+ struct task_struct *tsk;
+
+ /* queue just for cancelation */
+ init_task_work(&req->task_work, io_req_task_cancel);
+ tsk = io_wq_get_task(req->ctx->io_wq);
+ task_work_add(tsk, &req->task_work, 0);
+ wake_up_process(tsk);
+ }
+ return 1;
+}
+
+static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
+ struct wait_page_queue *wait,
+ wait_queue_func_t func,
+ void *data)
+{
+ /* Can't support async wakeup with polled IO */
+ if (kiocb->ki_flags & IOCB_HIPRI)
+ return -EINVAL;
+ if (kiocb->ki_filp->f_mode & FMODE_BUF_RASYNC) {
+ wait->wait.func = func;
+ wait->wait.private = data;
+ wait->wait.flags = 0;
+ INIT_LIST_HEAD(&wait->wait.entry);
+ kiocb->ki_flags |= IOCB_WAITQ;
+ kiocb->ki_waitq = wait;
+ return 0;
+ }
+
+ return -EOPNOTSUPP;
+}
+
+
+static bool io_rw_should_retry(struct io_kiocb *req)
+{
+ struct kiocb *kiocb = &req->rw.kiocb;
+ int ret;
+
+ /* never retry for NOWAIT, we just complete with -EAGAIN */
+ if (req->flags & REQ_F_NOWAIT)
+ return false;
+
+ /* already tried, or we're doing O_DIRECT */
+ if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+ return false;
+ /*
+ * just use poll if we can, and don't attempt if the fs doesn't
+ * support callback based unlocks
+ */
+ if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
+ return false;
+
+ /*
+ * If request type doesn't require req->io to defer in general,
+ * we need to allocate it here
+ */
+ if (!req->io && __io_alloc_async_ctx(req))
+ return false;
+
+ ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
+ io_async_buf_func, req);
+ if (!ret) {
+ io_get_req_task(req);
+ return true;
+ }
+
+ return false;
+}
+
+static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
+{
+ if (req->file->f_op->read_iter)
+ return call_read_iter(req->file, &req->rw.kiocb, iter);
+ return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
+}
+
+static int io_read(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw.kiocb;
struct iov_iter iter;
size_t iov_count;
- ssize_t io_size, ret;
+ ssize_t io_size, ret, ret2;
+ unsigned long nr_segs;
ret = io_import_iovec(READ, req, &iovec, &iter, !force_nonblock);
if (ret < 0)
return ret;
+ io_size = ret;
+ req->result = io_size;
/* Ensure we clear previously set non-block flag */
if (!force_nonblock)
kiocb->ki_flags &= ~IOCB_NOWAIT;
- req->result = 0;
- io_size = ret;
- if (req->flags & REQ_F_LINK_HEAD)
- req->result = io_size;
-
- /*
- * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
- * we know to async punt it even if it was opened O_NONBLOCK
- */
+ /* If the file doesn't support async, just async punt */
if (force_nonblock && !io_file_supports_async(req->file, READ))
goto copy_iov;
iov_count = iov_iter_count(&iter);
+ nr_segs = iter.nr_segs;
ret = rw_verify_area(READ, req->file, &kiocb->ki_pos, iov_count);
- if (!ret) {
- ssize_t ret2;
+ if (unlikely(ret))
+ goto out_free;
- if (req->file->f_op->read_iter)
- ret2 = call_read_iter(req->file, kiocb, &iter);
- else
- ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
+ ret2 = io_iter_do_read(req, &iter);
- /* Catch -EAGAIN return for forced non-blocking submission */
- if (!force_nonblock || ret2 != -EAGAIN) {
- kiocb_done(kiocb, ret2);
- } else {
+ /* Catch -EAGAIN return for forced non-blocking submission */
+ if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
+ kiocb_done(kiocb, ret2, cs);
+ } else {
+ iter.count = iov_count;
+ iter.nr_segs = nr_segs;
copy_iov:
- ret = io_setup_async_rw(req, io_size, iovec,
- inline_vecs, &iter);
- if (ret)
+ ret = io_setup_async_rw(req, io_size, iovec, inline_vecs,
+ &iter);
+ if (ret)
+ goto out_free;
+ /* it's copied and will be cleaned with ->io */
+ iovec = NULL;
+ /* if we can retry, do so with the callbacks armed */
+ if (io_rw_should_retry(req)) {
+ ret2 = io_iter_do_read(req, &iter);
+ if (ret2 == -EIOCBQUEUED) {
goto out_free;
- /* any defer here is final, must blocking retry */
- if (!(req->flags & REQ_F_NOWAIT) &&
- !file_can_poll(req->file))
- req->flags |= REQ_F_MUST_PUNT;
- return -EAGAIN;
+ } else if (ret2 != -EAGAIN) {
+ kiocb_done(kiocb, ret2, cs);
+ goto out_free;
+ }
}
+ kiocb->ki_flags &= ~IOCB_WAITQ;
+ return -EAGAIN;
}
out_free:
- if (!(req->flags & REQ_F_NEED_CLEANUP))
+ if (iovec)
kfree(iovec);
return ret;
}
@@ -2689,8 +3119,6 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
bool force_nonblock)
{
- struct io_async_ctx *io;
- struct iov_iter iter;
ssize_t ret;
ret = io_prep_rw(req, sqe, force_nonblock);
@@ -2700,49 +3128,33 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
return -EBADF;
- req->fsize = rlimit(RLIMIT_FSIZE);
-
/* either don't need iovec imported or already have it */
if (!req->io || req->flags & REQ_F_NEED_CLEANUP)
return 0;
-
- io = req->io;
- io->rw.iov = io->rw.fast_iov;
- req->io = NULL;
- ret = io_import_iovec(WRITE, req, &io->rw.iov, &iter, !force_nonblock);
- req->io = io;
- if (ret < 0)
- return ret;
-
- io_req_map_rw(req, ret, io->rw.iov, io->rw.fast_iov, &iter);
- return 0;
+ return io_rw_prep_async(req, WRITE, force_nonblock);
}
-static int io_write(struct io_kiocb *req, bool force_nonblock)
+static int io_write(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw.kiocb;
struct iov_iter iter;
size_t iov_count;
- ssize_t ret, io_size;
+ ssize_t ret, ret2, io_size;
+ unsigned long nr_segs;
ret = io_import_iovec(WRITE, req, &iovec, &iter, !force_nonblock);
if (ret < 0)
return ret;
+ io_size = ret;
+ req->result = io_size;
/* Ensure we clear previously set non-block flag */
if (!force_nonblock)
req->rw.kiocb.ki_flags &= ~IOCB_NOWAIT;
- req->result = 0;
- io_size = ret;
- if (req->flags & REQ_F_LINK_HEAD)
- req->result = io_size;
-
- /*
- * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
- * we know to async punt it even if it was opened O_NONBLOCK
- */
+ /* If the file doesn't support async, just async punt */
if (force_nonblock && !io_file_supports_async(req->file, WRITE))
goto copy_iov;
@@ -2752,59 +3164,53 @@ static int io_write(struct io_kiocb *req, bool force_nonblock)
goto copy_iov;
iov_count = iov_iter_count(&iter);
+ nr_segs = iter.nr_segs;
ret = rw_verify_area(WRITE, req->file, &kiocb->ki_pos, iov_count);
- if (!ret) {
- ssize_t ret2;
+ if (unlikely(ret))
+ goto out_free;
- /*
- * Open-code file_start_write here to grab freeze protection,
- * which will be released by another thread in
- * io_complete_rw(). Fool lockdep by telling it the lock got
- * released so that it doesn't complain about the held lock when
- * we return to userspace.
- */
- if (req->flags & REQ_F_ISREG) {
- __sb_start_write(file_inode(req->file)->i_sb,
- SB_FREEZE_WRITE, true);
- __sb_writers_release(file_inode(req->file)->i_sb,
- SB_FREEZE_WRITE);
- }
- kiocb->ki_flags |= IOCB_WRITE;
+ /*
+ * Open-code file_start_write here to grab freeze protection,
+ * which will be released by another thread in
+ * io_complete_rw(). Fool lockdep by telling it the lock got
+ * released so that it doesn't complain about the held lock when
+ * we return to userspace.
+ */
+ if (req->flags & REQ_F_ISREG) {
+ __sb_start_write(file_inode(req->file)->i_sb,
+ SB_FREEZE_WRITE, true);
+ __sb_writers_release(file_inode(req->file)->i_sb,
+ SB_FREEZE_WRITE);
+ }
+ kiocb->ki_flags |= IOCB_WRITE;
- if (!force_nonblock)
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = req->fsize;
+ if (req->file->f_op->write_iter)
+ ret2 = call_write_iter(req->file, kiocb, &iter);
+ else
+ ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
- if (req->file->f_op->write_iter)
- ret2 = call_write_iter(req->file, kiocb, &iter);
- else
- ret2 = loop_rw_iter(WRITE, req->file, kiocb, &iter);
-
- if (!force_nonblock)
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
-
- /*
- * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
- * retry them without IOCB_NOWAIT.
- */
- if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
- ret2 = -EAGAIN;
- if (!force_nonblock || ret2 != -EAGAIN) {
- kiocb_done(kiocb, ret2);
- } else {
+ /*
+ * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
+ * retry them without IOCB_NOWAIT.
+ */
+ if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
+ ret2 = -EAGAIN;
+ if (!force_nonblock || ret2 != -EAGAIN) {
+ kiocb_done(kiocb, ret2, cs);
+ } else {
+ iter.count = iov_count;
+ iter.nr_segs = nr_segs;
copy_iov:
- ret = io_setup_async_rw(req, io_size, iovec,
- inline_vecs, &iter);
- if (ret)
- goto out_free;
- /* any defer here is final, must blocking retry */
- if (!(req->flags & REQ_F_NOWAIT) &&
- !file_can_poll(req->file))
- req->flags |= REQ_F_MUST_PUNT;
- return -EAGAIN;
- }
+ ret = io_setup_async_rw(req, io_size, iovec, inline_vecs,
+ &iter);
+ if (ret)
+ goto out_free;
+ /* it's copied and will be cleaned with ->io */
+ iovec = NULL;
+ return -EAGAIN;
}
out_free:
- if (!(req->flags & REQ_F_NEED_CLEANUP))
+ if (iovec)
kfree(iovec);
return ret;
}
@@ -2870,10 +3276,9 @@ static int io_tee(struct io_kiocb *req, bool force_nonblock)
io_put_file(req, in, (sp->flags & SPLICE_F_FD_IN_FIXED));
req->flags &= ~REQ_F_NEED_CLEANUP;
- io_cqring_add_event(req, ret);
if (ret != sp->len)
req_set_fail_links(req);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -2907,25 +3312,23 @@ static int io_splice(struct io_kiocb *req, bool force_nonblock)
io_put_file(req, in, (sp->flags & SPLICE_F_FD_IN_FIXED));
req->flags &= ~REQ_F_NEED_CLEANUP;
- io_cqring_add_event(req, ret);
if (ret != sp->len)
req_set_fail_links(req);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
/*
* IORING_OP_NOP just posts a completion event, nothing else.
*/
-static int io_nop(struct io_kiocb *req)
+static int io_nop(struct io_kiocb *req, struct io_comp_state *cs)
{
struct io_ring_ctx *ctx = req->ctx;
if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
- io_cqring_add_event(req, 0);
- io_put_req(req);
+ __io_req_complete(req, 0, 0, cs);
return 0;
}
@@ -2964,8 +3367,7 @@ static int io_fsync(struct io_kiocb *req, bool force_nonblock)
req->sync.flags & IORING_FSYNC_DATASYNC);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -2980,7 +3382,6 @@ static int io_fallocate_prep(struct io_kiocb *req,
req->sync.off = READ_ONCE(sqe->off);
req->sync.len = READ_ONCE(sqe->addr);
req->sync.mode = READ_ONCE(sqe->len);
- req->fsize = rlimit(RLIMIT_FSIZE);
return 0;
}
@@ -2991,15 +3392,11 @@ static int io_fallocate(struct io_kiocb *req, bool force_nonblock)
/* fallocate always requiring blocking context */
if (force_nonblock)
return -EAGAIN;
-
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = req->fsize;
ret = vfs_fallocate(req->file, req->sync.mode, req->sync.off,
req->sync.len);
- current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -3095,8 +3492,7 @@ static int io_openat2(struct io_kiocb *req, bool force_nonblock)
req->flags &= ~REQ_F_NEED_CLEANUP;
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -3150,7 +3546,8 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf,
return i;
}
-static int io_remove_buffers(struct io_kiocb *req, bool force_nonblock)
+static int io_remove_buffers(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_provide_buf *p = &req->pbuf;
struct io_ring_ctx *ctx = req->ctx;
@@ -3169,8 +3566,7 @@ static int io_remove_buffers(struct io_kiocb *req, bool force_nonblock)
io_ring_submit_lock(ctx, !force_nonblock);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
@@ -3228,7 +3624,8 @@ static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head)
return i ? i : -ENOMEM;
}
-static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock)
+static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_provide_buf *p = &req->pbuf;
struct io_ring_ctx *ctx = req->ctx;
@@ -3257,8 +3654,7 @@ static int io_provide_buffers(struct io_kiocb *req, bool force_nonblock)
io_ring_submit_unlock(ctx, !force_nonblock);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
@@ -3289,7 +3685,8 @@ static int io_epoll_ctl_prep(struct io_kiocb *req,
#endif
}
-static int io_epoll_ctl(struct io_kiocb *req, bool force_nonblock)
+static int io_epoll_ctl(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
#if defined(CONFIG_EPOLL)
struct io_epoll *ie = &req->epoll;
@@ -3301,8 +3698,7 @@ static int io_epoll_ctl(struct io_kiocb *req, bool force_nonblock)
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
#else
return -EOPNOTSUPP;
@@ -3338,8 +3734,7 @@ static int io_madvise(struct io_kiocb *req, bool force_nonblock)
ret = do_madvise(ma->addr, ma->len, ma->advice);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
#else
return -EOPNOTSUPP;
@@ -3378,8 +3773,7 @@ static int io_fadvise(struct io_kiocb *req, bool force_nonblock)
ret = vfs_fadvise(req->file, fa->offset, fa->len, fa->advice);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -3418,8 +3812,7 @@ static int io_statx(struct io_kiocb *req, bool force_nonblock)
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -3450,7 +3843,8 @@ static int io_close_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return 0;
}
-static int io_close(struct io_kiocb *req, bool force_nonblock)
+static int io_close(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_close *close = &req->close;
int ret;
@@ -3464,8 +3858,10 @@ static int io_close(struct io_kiocb *req, bool force_nonblock)
/* if the file has a flush method, be safe and punt to async */
if (close->put_file->f_op->flush && force_nonblock) {
+ /* was never set, but play safe */
+ req->flags &= ~REQ_F_NOWAIT;
/* avoid grabbing files - we don't need the files */
- req->flags |= REQ_F_NO_FILE_TABLE | REQ_F_MUST_PUNT;
+ req->flags |= REQ_F_NO_FILE_TABLE;
return -EAGAIN;
}
@@ -3473,10 +3869,9 @@ static int io_close(struct io_kiocb *req, bool force_nonblock)
ret = filp_close(close->put_file, req->work.files);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
fput(close->put_file);
close->put_file = NULL;
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
@@ -3510,8 +3905,7 @@ static int io_sync_file_range(struct io_kiocb *req, bool force_nonblock)
req->sync.flags);
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -3531,6 +3925,15 @@ static int io_setup_async_msg(struct io_kiocb *req,
return -EAGAIN;
}
+static int io_sendmsg_copy_hdr(struct io_kiocb *req,
+ struct io_async_msghdr *iomsg)
+{
+ iomsg->iov = iomsg->fast_iov;
+ iomsg->msg.msg_name = &iomsg->addr;
+ return sendmsg_copy_msghdr(&iomsg->msg, req->sr_msg.umsg,
+ req->sr_msg.msg_flags, &iomsg->iov);
+}
+
static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_sr_msg *sr = &req->sr_msg;
@@ -3541,7 +3944,7 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EINVAL;
sr->msg_flags = READ_ONCE(sqe->msg_flags);
- sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
+ sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
sr->len = READ_ONCE(sqe->len);
#ifdef CONFIG_COMPAT
@@ -3555,136 +3958,126 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (req->flags & REQ_F_NEED_CLEANUP)
return 0;
- io->msg.msg.msg_name = &io->msg.addr;
- io->msg.iov = io->msg.fast_iov;
- ret = sendmsg_copy_msghdr(&io->msg.msg, sr->msg, sr->msg_flags,
- &io->msg.iov);
+ ret = io_sendmsg_copy_hdr(req, &io->msg);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
}
-static int io_sendmsg(struct io_kiocb *req, bool force_nonblock)
+static int io_sendmsg(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
- struct io_async_msghdr *kmsg = NULL;
+ struct io_async_msghdr iomsg, *kmsg;
struct socket *sock;
+ unsigned flags;
int ret;
sock = sock_from_file(req->file, &ret);
- if (sock) {
- struct io_async_ctx io;
- unsigned flags;
+ if (unlikely(!sock))
+ return ret;
- if (req->io) {
- kmsg = &req->io->msg;
- kmsg->msg.msg_name = &req->io->msg.addr;
- /* if iov is set, it's allocated already */
- if (!kmsg->iov)
- kmsg->iov = kmsg->fast_iov;
- kmsg->msg.msg_iter.iov = kmsg->iov;
- } else {
- struct io_sr_msg *sr = &req->sr_msg;
-
- kmsg = &io.msg;
- kmsg->msg.msg_name = &io.msg.addr;
-
- io.msg.iov = io.msg.fast_iov;
- ret = sendmsg_copy_msghdr(&io.msg.msg, sr->msg,
- sr->msg_flags, &io.msg.iov);
- if (ret)
- return ret;
- }
-
- flags = req->sr_msg.msg_flags;
- if (flags & MSG_DONTWAIT)
- req->flags |= REQ_F_NOWAIT;
- else if (force_nonblock)
- flags |= MSG_DONTWAIT;
-
- ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
- if (force_nonblock && ret == -EAGAIN)
- return io_setup_async_msg(req, kmsg);
- if (ret == -ERESTARTSYS)
- ret = -EINTR;
- }
-
- if (kmsg && kmsg->iov != kmsg->fast_iov)
- kfree(kmsg->iov);
- req->flags &= ~REQ_F_NEED_CLEANUP;
- io_cqring_add_event(req, ret);
- if (ret < 0)
- req_set_fail_links(req);
- io_put_req(req);
- return 0;
-}
-
-static int io_send(struct io_kiocb *req, bool force_nonblock)
-{
- struct socket *sock;
- int ret;
-
- sock = sock_from_file(req->file, &ret);
- if (sock) {
- struct io_sr_msg *sr = &req->sr_msg;
- struct msghdr msg;
- struct iovec iov;
- unsigned flags;
-
- ret = import_single_range(WRITE, sr->buf, sr->len, &iov,
- &msg.msg_iter);
+ if (req->io) {
+ kmsg = &req->io->msg;
+ kmsg->msg.msg_name = &req->io->msg.addr;
+ /* if iov is set, it's allocated already */
+ if (!kmsg->iov)
+ kmsg->iov = kmsg->fast_iov;
+ kmsg->msg.msg_iter.iov = kmsg->iov;
+ } else {
+ ret = io_sendmsg_copy_hdr(req, &iomsg);
if (ret)
return ret;
-
- msg.msg_name = NULL;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
- msg.msg_namelen = 0;
-
- flags = req->sr_msg.msg_flags;
- if (flags & MSG_DONTWAIT)
- req->flags |= REQ_F_NOWAIT;
- else if (force_nonblock)
- flags |= MSG_DONTWAIT;
-
- msg.msg_flags = flags;
- ret = sock_sendmsg(sock, &msg);
- if (force_nonblock && ret == -EAGAIN)
- return -EAGAIN;
- if (ret == -ERESTARTSYS)
- ret = -EINTR;
+ kmsg = &iomsg;
}
- io_cqring_add_event(req, ret);
+ flags = req->sr_msg.msg_flags;
+ if (flags & MSG_DONTWAIT)
+ req->flags |= REQ_F_NOWAIT;
+ else if (force_nonblock)
+ flags |= MSG_DONTWAIT;
+
+ ret = __sys_sendmsg_sock(sock, &kmsg->msg, flags);
+ if (force_nonblock && ret == -EAGAIN)
+ return io_setup_async_msg(req, kmsg);
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+
+ if (kmsg->iov != kmsg->fast_iov)
+ kfree(kmsg->iov);
+ req->flags &= ~REQ_F_NEED_CLEANUP;
if (ret < 0)
req_set_fail_links(req);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
-static int __io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io)
+static int io_send(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
+{
+ struct io_sr_msg *sr = &req->sr_msg;
+ struct msghdr msg;
+ struct iovec iov;
+ struct socket *sock;
+ unsigned flags;
+ int ret;
+
+ sock = sock_from_file(req->file, &ret);
+ if (unlikely(!sock))
+ return ret;
+
+ ret = import_single_range(WRITE, sr->buf, sr->len, &iov, &msg.msg_iter);
+ if (unlikely(ret))
+ return ret;;
+
+ msg.msg_name = NULL;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_namelen = 0;
+
+ flags = req->sr_msg.msg_flags;
+ if (flags & MSG_DONTWAIT)
+ req->flags |= REQ_F_NOWAIT;
+ else if (force_nonblock)
+ flags |= MSG_DONTWAIT;
+
+ msg.msg_flags = flags;
+ ret = sock_sendmsg(sock, &msg);
+ if (force_nonblock && ret == -EAGAIN)
+ return -EAGAIN;
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+
+ if (ret < 0)
+ req_set_fail_links(req);
+ __io_req_complete(req, ret, 0, cs);
+ return 0;
+}
+
+static int __io_recvmsg_copy_hdr(struct io_kiocb *req,
+ struct io_async_msghdr *iomsg)
{
struct io_sr_msg *sr = &req->sr_msg;
struct iovec __user *uiov;
size_t iov_len;
int ret;
- ret = __copy_msghdr_from_user(&io->msg.msg, sr->msg, &io->msg.uaddr,
- &uiov, &iov_len);
+ ret = __copy_msghdr_from_user(&iomsg->msg, sr->umsg,
+ &iomsg->uaddr, &uiov, &iov_len);
if (ret)
return ret;
if (req->flags & REQ_F_BUFFER_SELECT) {
if (iov_len > 1)
return -EINVAL;
- if (copy_from_user(io->msg.iov, uiov, sizeof(*uiov)))
+ if (copy_from_user(iomsg->iov, uiov, sizeof(*uiov)))
return -EFAULT;
- sr->len = io->msg.iov[0].iov_len;
- iov_iter_init(&io->msg.msg.msg_iter, READ, io->msg.iov, 1,
+ sr->len = iomsg->iov[0].iov_len;
+ iov_iter_init(&iomsg->msg.msg_iter, READ, iomsg->iov, 1,
sr->len);
- io->msg.iov = NULL;
+ iomsg->iov = NULL;
} else {
ret = import_iovec(READ, uiov, iov_len, UIO_FASTIOV,
- &io->msg.iov, &io->msg.msg.msg_iter);
+ &iomsg->iov, &iomsg->msg.msg_iter);
if (ret > 0)
ret = 0;
}
@@ -3694,7 +4087,7 @@ static int __io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io)
#ifdef CONFIG_COMPAT
static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req,
- struct io_async_ctx *io)
+ struct io_async_msghdr *iomsg)
{
struct compat_msghdr __user *msg_compat;
struct io_sr_msg *sr = &req->sr_msg;
@@ -3703,8 +4096,8 @@ static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req,
compat_size_t len;
int ret;
- msg_compat = (struct compat_msghdr __user *) sr->msg;
- ret = __get_compat_msghdr(&io->msg.msg, msg_compat, &io->msg.uaddr,
+ msg_compat = (struct compat_msghdr __user *) sr->umsg;
+ ret = __get_compat_msghdr(&iomsg->msg, msg_compat, &iomsg->uaddr,
&ptr, &len);
if (ret)
return ret;
@@ -3721,12 +4114,12 @@ static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req,
return -EFAULT;
if (clen < 0)
return -EINVAL;
- sr->len = io->msg.iov[0].iov_len;
- io->msg.iov = NULL;
+ sr->len = iomsg->iov[0].iov_len;
+ iomsg->iov = NULL;
} else {
ret = compat_import_iovec(READ, uiov, len, UIO_FASTIOV,
- &io->msg.iov,
- &io->msg.msg.msg_iter);
+ &iomsg->iov,
+ &iomsg->msg.msg_iter);
if (ret < 0)
return ret;
}
@@ -3735,40 +4128,40 @@ static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req,
}
#endif
-static int io_recvmsg_copy_hdr(struct io_kiocb *req, struct io_async_ctx *io)
+static int io_recvmsg_copy_hdr(struct io_kiocb *req,
+ struct io_async_msghdr *iomsg)
{
- io->msg.msg.msg_name = &io->msg.addr;
- io->msg.iov = io->msg.fast_iov;
+ iomsg->msg.msg_name = &iomsg->addr;
+ iomsg->iov = iomsg->fast_iov;
#ifdef CONFIG_COMPAT
if (req->ctx->compat)
- return __io_compat_recvmsg_copy_hdr(req, io);
+ return __io_compat_recvmsg_copy_hdr(req, iomsg);
#endif
- return __io_recvmsg_copy_hdr(req, io);
+ return __io_recvmsg_copy_hdr(req, iomsg);
}
static struct io_buffer *io_recv_buffer_select(struct io_kiocb *req,
- int *cflags, bool needs_lock)
+ bool needs_lock)
{
struct io_sr_msg *sr = &req->sr_msg;
struct io_buffer *kbuf;
- if (!(req->flags & REQ_F_BUFFER_SELECT))
- return NULL;
-
kbuf = io_buffer_select(req, &sr->len, sr->bgid, sr->kbuf, needs_lock);
if (IS_ERR(kbuf))
return kbuf;
sr->kbuf = kbuf;
req->flags |= REQ_F_BUFFER_SELECTED;
-
- *cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
- *cflags |= IORING_CQE_F_BUFFER;
return kbuf;
}
+static inline unsigned int io_put_recv_kbuf(struct io_kiocb *req)
+{
+ return io_put_kbuf(req, req->sr_msg.kbuf);
+}
+
static int io_recvmsg_prep(struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
@@ -3780,7 +4173,7 @@ static int io_recvmsg_prep(struct io_kiocb *req,
return -EINVAL;
sr->msg_flags = READ_ONCE(sqe->msg_flags);
- sr->msg = u64_to_user_ptr(READ_ONCE(sqe->addr));
+ sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
sr->len = READ_ONCE(sqe->len);
sr->bgid = READ_ONCE(sqe->buf_group);
@@ -3795,133 +4188,123 @@ static int io_recvmsg_prep(struct io_kiocb *req,
if (req->flags & REQ_F_NEED_CLEANUP)
return 0;
- ret = io_recvmsg_copy_hdr(req, io);
+ ret = io_recvmsg_copy_hdr(req, &io->msg);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
}
-static int io_recvmsg(struct io_kiocb *req, bool force_nonblock)
+static int io_recvmsg(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
- struct io_async_msghdr *kmsg = NULL;
+ struct io_async_msghdr iomsg, *kmsg;
struct socket *sock;
+ struct io_buffer *kbuf;
+ unsigned flags;
int ret, cflags = 0;
sock = sock_from_file(req->file, &ret);
- if (sock) {
- struct io_buffer *kbuf;
- struct io_async_ctx io;
- unsigned flags;
+ if (unlikely(!sock))
+ return ret;
- if (req->io) {
- kmsg = &req->io->msg;
- kmsg->msg.msg_name = &req->io->msg.addr;
- /* if iov is set, it's allocated already */
- if (!kmsg->iov)
- kmsg->iov = kmsg->fast_iov;
- kmsg->msg.msg_iter.iov = kmsg->iov;
- } else {
- kmsg = &io.msg;
- kmsg->msg.msg_name = &io.msg.addr;
-
- ret = io_recvmsg_copy_hdr(req, &io);
- if (ret)
- return ret;
- }
-
- kbuf = io_recv_buffer_select(req, &cflags, !force_nonblock);
- if (IS_ERR(kbuf)) {
- return PTR_ERR(kbuf);
- } else if (kbuf) {
- kmsg->fast_iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
- iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->iov,
- 1, req->sr_msg.len);
- }
-
- flags = req->sr_msg.msg_flags;
- if (flags & MSG_DONTWAIT)
- req->flags |= REQ_F_NOWAIT;
- else if (force_nonblock)
- flags |= MSG_DONTWAIT;
-
- ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.msg,
- kmsg->uaddr, flags);
- if (force_nonblock && ret == -EAGAIN) {
- ret = io_setup_async_msg(req, kmsg);
- if (ret != -EAGAIN)
- kfree(kbuf);
+ if (req->io) {
+ kmsg = &req->io->msg;
+ kmsg->msg.msg_name = &req->io->msg.addr;
+ /* if iov is set, it's allocated already */
+ if (!kmsg->iov)
+ kmsg->iov = kmsg->fast_iov;
+ kmsg->msg.msg_iter.iov = kmsg->iov;
+ } else {
+ ret = io_recvmsg_copy_hdr(req, &iomsg);
+ if (ret)
return ret;
- }
- if (ret == -ERESTARTSYS)
- ret = -EINTR;
- if (kbuf)
- kfree(kbuf);
+ kmsg = &iomsg;
}
- if (kmsg && kmsg->iov != kmsg->fast_iov)
+ if (req->flags & REQ_F_BUFFER_SELECT) {
+ kbuf = io_recv_buffer_select(req, !force_nonblock);
+ if (IS_ERR(kbuf))
+ return PTR_ERR(kbuf);
+ kmsg->fast_iov[0].iov_base = u64_to_user_ptr(kbuf->addr);
+ iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->iov,
+ 1, req->sr_msg.len);
+ }
+
+ flags = req->sr_msg.msg_flags;
+ if (flags & MSG_DONTWAIT)
+ req->flags |= REQ_F_NOWAIT;
+ else if (force_nonblock)
+ flags |= MSG_DONTWAIT;
+
+ ret = __sys_recvmsg_sock(sock, &kmsg->msg, req->sr_msg.umsg,
+ kmsg->uaddr, flags);
+ if (force_nonblock && ret == -EAGAIN)
+ return io_setup_async_msg(req, kmsg);
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+
+ if (req->flags & REQ_F_BUFFER_SELECTED)
+ cflags = io_put_recv_kbuf(req);
+ if (kmsg->iov != kmsg->fast_iov)
kfree(kmsg->iov);
req->flags &= ~REQ_F_NEED_CLEANUP;
- __io_cqring_add_event(req, ret, cflags);
if (ret < 0)
req_set_fail_links(req);
- io_put_req(req);
+ __io_req_complete(req, ret, cflags, cs);
return 0;
}
-static int io_recv(struct io_kiocb *req, bool force_nonblock)
+static int io_recv(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
- struct io_buffer *kbuf = NULL;
+ struct io_buffer *kbuf;
+ struct io_sr_msg *sr = &req->sr_msg;
+ struct msghdr msg;
+ void __user *buf = sr->buf;
struct socket *sock;
+ struct iovec iov;
+ unsigned flags;
int ret, cflags = 0;
sock = sock_from_file(req->file, &ret);
- if (sock) {
- struct io_sr_msg *sr = &req->sr_msg;
- void __user *buf = sr->buf;
- struct msghdr msg;
- struct iovec iov;
- unsigned flags;
+ if (unlikely(!sock))
+ return ret;
- kbuf = io_recv_buffer_select(req, &cflags, !force_nonblock);
+ if (req->flags & REQ_F_BUFFER_SELECT) {
+ kbuf = io_recv_buffer_select(req, !force_nonblock);
if (IS_ERR(kbuf))
return PTR_ERR(kbuf);
- else if (kbuf)
- buf = u64_to_user_ptr(kbuf->addr);
-
- ret = import_single_range(READ, buf, sr->len, &iov,
- &msg.msg_iter);
- if (ret) {
- kfree(kbuf);
- return ret;
- }
-
- req->flags |= REQ_F_NEED_CLEANUP;
- msg.msg_name = NULL;
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
- msg.msg_namelen = 0;
- msg.msg_iocb = NULL;
- msg.msg_flags = 0;
-
- flags = req->sr_msg.msg_flags;
- if (flags & MSG_DONTWAIT)
- req->flags |= REQ_F_NOWAIT;
- else if (force_nonblock)
- flags |= MSG_DONTWAIT;
-
- ret = sock_recvmsg(sock, &msg, flags);
- if (force_nonblock && ret == -EAGAIN)
- return -EAGAIN;
- if (ret == -ERESTARTSYS)
- ret = -EINTR;
+ buf = u64_to_user_ptr(kbuf->addr);
}
- kfree(kbuf);
- req->flags &= ~REQ_F_NEED_CLEANUP;
- __io_cqring_add_event(req, ret, cflags);
+ ret = import_single_range(READ, buf, sr->len, &iov, &msg.msg_iter);
+ if (unlikely(ret))
+ goto out_free;
+
+ msg.msg_name = NULL;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_namelen = 0;
+ msg.msg_iocb = NULL;
+ msg.msg_flags = 0;
+
+ flags = req->sr_msg.msg_flags;
+ if (flags & MSG_DONTWAIT)
+ req->flags |= REQ_F_NOWAIT;
+ else if (force_nonblock)
+ flags |= MSG_DONTWAIT;
+
+ ret = sock_recvmsg(sock, &msg, flags);
+ if (force_nonblock && ret == -EAGAIN)
+ return -EAGAIN;
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+out_free:
+ if (req->flags & REQ_F_BUFFER_SELECTED)
+ cflags = io_put_recv_kbuf(req);
if (ret < 0)
req_set_fail_links(req);
- io_put_req(req);
+ __io_req_complete(req, ret, cflags, cs);
return 0;
}
@@ -3941,7 +4324,8 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return 0;
}
-static int io_accept(struct io_kiocb *req, bool force_nonblock)
+static int io_accept(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_accept *accept = &req->accept;
unsigned int file_flags = force_nonblock ? O_NONBLOCK : 0;
@@ -3960,8 +4344,7 @@ static int io_accept(struct io_kiocb *req, bool force_nonblock)
ret = -EINTR;
req_set_fail_links(req);
}
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
@@ -3985,7 +4368,8 @@ static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
&io->connect.address);
}
-static int io_connect(struct io_kiocb *req, bool force_nonblock)
+static int io_connect(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_async_ctx __io, *io;
unsigned file_flags;
@@ -4021,8 +4405,7 @@ static int io_connect(struct io_kiocb *req, bool force_nonblock)
out:
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
#else /* !CONFIG_NET */
@@ -4031,12 +4414,14 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EOPNOTSUPP;
}
-static int io_sendmsg(struct io_kiocb *req, bool force_nonblock)
+static int io_sendmsg(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
-static int io_send(struct io_kiocb *req, bool force_nonblock)
+static int io_send(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
@@ -4047,12 +4432,14 @@ static int io_recvmsg_prep(struct io_kiocb *req,
return -EOPNOTSUPP;
}
-static int io_recvmsg(struct io_kiocb *req, bool force_nonblock)
+static int io_recvmsg(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
-static int io_recv(struct io_kiocb *req, bool force_nonblock)
+static int io_recv(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
@@ -4062,7 +4449,8 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EOPNOTSUPP;
}
-static int io_accept(struct io_kiocb *req, bool force_nonblock)
+static int io_accept(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
@@ -4072,7 +4460,8 @@ static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EOPNOTSUPP;
}
-static int io_connect(struct io_kiocb *req, bool force_nonblock)
+static int io_connect(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
return -EOPNOTSUPP;
}
@@ -4084,33 +4473,9 @@ struct io_poll_table {
int error;
};
-static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
-{
- struct task_struct *tsk = req->task;
- struct io_ring_ctx *ctx = req->ctx;
- int ret, notify = TWA_RESUME;
-
- /*
- * SQPOLL kernel thread doesn't need notification, just a wakeup.
- * If we're not using an eventfd, then TWA_RESUME is always fine,
- * as we won't have dependencies between request completions for
- * other kernel wait conditions.
- */
- if (ctx->flags & IORING_SETUP_SQPOLL)
- notify = 0;
- else if (ctx->cq_ev_fd)
- notify = TWA_SIGNAL;
-
- ret = task_work_add(tsk, cb, notify);
- if (!ret)
- wake_up_process(tsk);
- return ret;
-}
-
static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
__poll_t mask, task_work_func_t func)
{
- struct task_struct *tsk;
int ret;
/* for instances that support it check for an event match first: */
@@ -4121,7 +4486,6 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
list_del_init(&poll->wait.entry);
- tsk = req->task;
req->result = mask;
init_task_work(&req->task_work, func);
/*
@@ -4132,6 +4496,8 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
*/
ret = io_req_task_work_add(req, &req->task_work);
if (unlikely(ret)) {
+ struct task_struct *tsk;
+
WRITE_ONCE(poll->canceled, true);
tsk = io_wq_get_task(req->ctx->io_wq);
task_work_add(tsk, &req->task_work, 0);
@@ -4199,9 +4565,10 @@ static void io_poll_task_handler(struct io_kiocb *req, struct io_kiocb **nxt)
hash_del(&req->hash_node);
io_poll_complete(req, req->result, 0);
+ req->flags |= REQ_F_COMP_LOCKED;
+ *nxt = io_put_req_find_next(req);
spin_unlock_irq(&ctx->completion_lock);
- io_put_req_find_next(req, nxt);
io_cqring_ev_posted(ctx);
}
@@ -4211,13 +4578,8 @@ static void io_poll_task_func(struct callback_head *cb)
struct io_kiocb *nxt = NULL;
io_poll_task_handler(req, &nxt);
- if (nxt) {
- struct io_ring_ctx *ctx = nxt->ctx;
-
- mutex_lock(&ctx->uring_lock);
- __io_queue_sqe(nxt, NULL);
- mutex_unlock(&ctx->uring_lock);
- }
+ if (nxt)
+ __io_req_task_submit(nxt);
}
static int io_poll_double_wake(struct wait_queue_entry *wait, unsigned mode,
@@ -4287,7 +4649,11 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
pt->error = 0;
poll->head = head;
- add_wait_queue(head, &poll->wait);
+
+ if (poll->events & EPOLLEXCLUSIVE)
+ add_wait_queue_exclusive(head, &poll->wait);
+ else
+ add_wait_queue(head, &poll->wait);
}
static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
@@ -4299,34 +4665,11 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
}
-static void io_sq_thread_drop_mm(struct io_ring_ctx *ctx)
-{
- struct mm_struct *mm = current->mm;
-
- if (mm) {
- kthread_unuse_mm(mm);
- mmput(mm);
- }
-}
-
-static int io_sq_thread_acquire_mm(struct io_ring_ctx *ctx,
- struct io_kiocb *req)
-{
- if (io_op_defs[req->opcode].needs_mm && !current->mm) {
- if (unlikely(!mmget_not_zero(ctx->sqo_mm)))
- return -EFAULT;
- kthread_use_mm(ctx->sqo_mm);
- }
-
- return 0;
-}
-
static void io_async_task_func(struct callback_head *cb)
{
struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
struct async_poll *apoll = req->apoll;
struct io_ring_ctx *ctx = req->ctx;
- bool canceled = false;
trace_io_uring_task_run(req->ctx, req->opcode, req->user_data);
@@ -4336,40 +4679,19 @@ static void io_async_task_func(struct callback_head *cb)
}
/* If req is still hashed, it cannot have been canceled. Don't check. */
- if (hash_hashed(&req->hash_node)) {
+ if (hash_hashed(&req->hash_node))
hash_del(&req->hash_node);
- } else {
- canceled = READ_ONCE(apoll->poll.canceled);
- if (canceled) {
- io_cqring_fill_event(req, -ECANCELED);
- io_commit_cqring(ctx);
- }
- }
io_poll_remove_double(req, apoll->double_poll);
spin_unlock_irq(&ctx->completion_lock);
- /* restore ->work in case we need to retry again */
- if (req->flags & REQ_F_WORK_INITIALIZED)
- memcpy(&req->work, &apoll->work, sizeof(req->work));
+ if (!READ_ONCE(apoll->poll.canceled))
+ __io_req_task_submit(req);
+ else
+ __io_req_task_cancel(req, -ECANCELED);
+
kfree(apoll->double_poll);
kfree(apoll);
-
- if (!canceled) {
- __set_current_state(TASK_RUNNING);
- if (io_sq_thread_acquire_mm(ctx, req)) {
- io_cqring_add_event(req, -EFAULT);
- goto end_req;
- }
- mutex_lock(&ctx->uring_lock);
- __io_queue_sqe(req, NULL);
- mutex_unlock(&ctx->uring_lock);
- } else {
- io_cqring_ev_posted(ctx);
-end_req:
- req_set_fail_links(req);
- io_double_put_req(req);
- }
}
static int io_async_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
@@ -4402,8 +4724,8 @@ static __poll_t __io_arm_poll_handler(struct io_kiocb *req,
struct io_ring_ctx *ctx = req->ctx;
bool cancel = false;
- poll->file = req->file;
io_init_poll_iocb(poll, mask, wake_func);
+ poll->file = req->file;
poll->wait.private = req;
ipt->pt._key = mask;
@@ -4443,7 +4765,7 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
if (!req->file || !file_can_poll(req->file))
return false;
- if (req->flags & (REQ_F_MUST_PUNT | REQ_F_POLLED))
+ if (req->flags & REQ_F_POLLED)
return false;
if (!def->pollin && !def->pollout)
return false;
@@ -4454,9 +4776,6 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
apoll->double_poll = NULL;
req->flags |= REQ_F_POLLED;
- if (req->flags & REQ_F_WORK_INITIALIZED)
- memcpy(&apoll->work, &req->work, sizeof(req->work));
-
io_get_req_task(req);
req->apoll = apoll;
INIT_HLIST_NODE(&req->hash_node);
@@ -4475,8 +4794,6 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
if (ret) {
io_poll_remove_double(req, apoll->double_poll);
spin_unlock_irq(&ctx->completion_lock);
- if (req->flags & REQ_F_WORK_INITIALIZED)
- memcpy(&req->work, &apoll->work, sizeof(req->work));
kfree(apoll->double_poll);
kfree(apoll);
return false;
@@ -4519,14 +4836,6 @@ static bool io_poll_remove_one(struct io_kiocb *req)
do_complete = __io_poll_remove_one(req, &apoll->poll);
if (do_complete) {
io_put_req(req);
- /*
- * restore ->work because we will call
- * io_req_work_drop_env below when dropping the
- * final reference.
- */
- if (req->flags & REQ_F_WORK_INITIALIZED)
- memcpy(&req->work, &apoll->work,
- sizeof(req->work));
kfree(apoll->double_poll);
kfree(apoll);
}
@@ -4607,10 +4916,9 @@ static int io_poll_remove(struct io_kiocb *req)
ret = io_poll_cancel(ctx, addr);
spin_unlock_irq(&ctx->completion_lock);
- io_cqring_add_event(req, ret);
if (ret < 0)
req_set_fail_links(req);
- io_put_req(req);
+ io_req_complete(req, ret);
return 0;
}
@@ -4634,7 +4942,7 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_poll_iocb *poll = &req->poll;
- u16 events;
+ u32 events;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
@@ -4643,8 +4951,12 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
if (!poll->file)
return -EBADF;
- events = READ_ONCE(sqe->poll_events);
- poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
+ events = READ_ONCE(sqe->poll32_events);
+#ifdef __BIG_ENDIAN
+ events = swahw32(events);
+#endif
+ poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP |
+ (events & EPOLLEXCLUSIVE);
io_get_req_task(req);
return 0;
@@ -4657,12 +4969,7 @@ static int io_poll_add(struct io_kiocb *req)
struct io_poll_table ipt;
__poll_t mask;
- /* ->work is in union with hash_node and others */
- io_req_work_drop_env(req);
- req->flags &= ~REQ_F_WORK_INITIALIZED;
-
INIT_HLIST_NODE(&req->hash_node);
- INIT_LIST_HEAD(&req->list);
ipt.pt._qproc = io_poll_queue_proc;
mask = __io_arm_poll_handler(req, &req->poll, &ipt, poll->events,
@@ -4689,15 +4996,16 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
struct io_ring_ctx *ctx = req->ctx;
unsigned long flags;
- atomic_inc(&ctx->cq_timeouts);
-
spin_lock_irqsave(&ctx->completion_lock, flags);
+ atomic_set(&req->ctx->cq_timeouts,
+ atomic_read(&req->ctx->cq_timeouts) + 1);
+
/*
* We could be racing with timeout deletion. If the list is empty,
* then timeout lookup already found it and will be handling it.
*/
- if (!list_empty(&req->list))
- list_del_init(&req->list);
+ if (!list_empty(&req->timeout.list))
+ list_del_init(&req->timeout.list);
io_cqring_fill_event(req, -ETIME);
io_commit_cqring(ctx);
@@ -4714,9 +5022,9 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
struct io_kiocb *req;
int ret = -ENOENT;
- list_for_each_entry(req, &ctx->timeout_list, list) {
+ list_for_each_entry(req, &ctx->timeout_list, timeout.list) {
if (user_data == req->user_data) {
- list_del_init(&req->list);
+ list_del_init(&req->timeout.list);
ret = 0;
break;
}
@@ -4798,7 +5106,6 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
data = &req->io->timeout;
data->req = req;
- req->flags |= REQ_F_TIMEOUT;
if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
return -EFAULT;
@@ -4826,8 +5133,7 @@ static int io_timeout(struct io_kiocb *req)
* timeout event to be satisfied. If it isn't set, then this is
* a pure timeout request, sequence isn't used.
*/
- if (!off) {
- req->flags |= REQ_F_TIMEOUT_NOSEQ;
+ if (io_is_timeout_noseq(req)) {
entry = ctx->timeout_list.prev;
goto add;
}
@@ -4840,16 +5146,17 @@ static int io_timeout(struct io_kiocb *req)
* the one we need first.
*/
list_for_each_prev(entry, &ctx->timeout_list) {
- struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
+ struct io_kiocb *nxt = list_entry(entry, struct io_kiocb,
+ timeout.list);
- if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
+ if (io_is_timeout_noseq(nxt))
continue;
/* nxt.seq is behind @tail, otherwise would've been completed */
if (off >= nxt->timeout.target_seq - tail)
break;
}
add:
- list_add(&req->list, entry);
+ list_add(&req->timeout.list, entry);
data->timer.function = io_timeout_fn;
hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
spin_unlock_irq(&ctx->completion_lock);
@@ -4953,7 +5260,8 @@ static int io_files_update_prep(struct io_kiocb *req,
return 0;
}
-static int io_files_update(struct io_kiocb *req, bool force_nonblock)
+static int io_files_update(struct io_kiocb *req, bool force_nonblock,
+ struct io_comp_state *cs)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_uring_files_update up;
@@ -4971,8 +5279,7 @@ static int io_files_update(struct io_kiocb *req, bool force_nonblock)
if (ret < 0)
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ __io_req_complete(req, ret, 0, cs);
return 0;
}
@@ -4984,15 +5291,11 @@ static int io_req_defer_prep(struct io_kiocb *req,
if (!sqe)
return 0;
- io_req_init_async(req);
-
- if (io_op_defs[req->opcode].file_table) {
- ret = io_grab_files(req);
- if (unlikely(ret))
- return ret;
- }
-
- io_req_work_grab_env(req, &io_op_defs[req->opcode]);
+ if (io_alloc_async_ctx(req))
+ return -EAGAIN;
+ ret = io_prep_work_files(req);
+ if (unlikely(ret))
+ return ret;
switch (req->opcode) {
case IORING_OP_NOP:
@@ -5094,86 +5397,117 @@ static int io_req_defer_prep(struct io_kiocb *req,
return ret;
}
+static u32 io_get_sequence(struct io_kiocb *req)
+{
+ struct io_kiocb *pos;
+ struct io_ring_ctx *ctx = req->ctx;
+ u32 total_submitted, nr_reqs = 1;
+
+ if (req->flags & REQ_F_LINK_HEAD)
+ list_for_each_entry(pos, &req->link_list, link_list)
+ nr_reqs++;
+
+ total_submitted = ctx->cached_sq_head - ctx->cached_sq_dropped;
+ return total_submitted - nr_reqs;
+}
+
static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
+ struct io_defer_entry *de;
int ret;
+ u32 seq;
/* Still need defer if there is pending req in defer list. */
- if (!req_need_defer(req) && list_empty_careful(&ctx->defer_list))
+ if (likely(list_empty_careful(&ctx->defer_list) &&
+ !(req->flags & REQ_F_IO_DRAIN)))
+ return 0;
+
+ seq = io_get_sequence(req);
+ /* Still a chance to pass the sequence check */
+ if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list))
return 0;
if (!req->io) {
- if (io_alloc_async_ctx(req))
- return -EAGAIN;
ret = io_req_defer_prep(req, sqe);
- if (ret < 0)
+ if (ret)
return ret;
}
+ io_prep_async_link(req);
+ de = kmalloc(sizeof(*de), GFP_KERNEL);
+ if (!de)
+ return -ENOMEM;
spin_lock_irq(&ctx->completion_lock);
- if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
+ if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
spin_unlock_irq(&ctx->completion_lock);
- return 0;
+ kfree(de);
+ io_queue_async_work(req);
+ return -EIOCBQUEUED;
}
trace_io_uring_defer(ctx, req, req->user_data);
- list_add_tail(&req->list, &ctx->defer_list);
+ de->req = req;
+ de->seq = seq;
+ list_add_tail(&de->list, &ctx->defer_list);
spin_unlock_irq(&ctx->completion_lock);
return -EIOCBQUEUED;
}
-static void io_cleanup_req(struct io_kiocb *req)
+static void __io_clean_op(struct io_kiocb *req)
{
struct io_async_ctx *io = req->io;
- switch (req->opcode) {
- case IORING_OP_READV:
- case IORING_OP_READ_FIXED:
- case IORING_OP_READ:
- if (req->flags & REQ_F_BUFFER_SELECTED)
+ if (req->flags & REQ_F_BUFFER_SELECTED) {
+ switch (req->opcode) {
+ case IORING_OP_READV:
+ case IORING_OP_READ_FIXED:
+ case IORING_OP_READ:
kfree((void *)(unsigned long)req->rw.addr);
- /* fallthrough */
- case IORING_OP_WRITEV:
- case IORING_OP_WRITE_FIXED:
- case IORING_OP_WRITE:
- if (io->rw.iov != io->rw.fast_iov)
- kfree(io->rw.iov);
- break;
- case IORING_OP_RECVMSG:
- if (req->flags & REQ_F_BUFFER_SELECTED)
+ break;
+ case IORING_OP_RECVMSG:
+ case IORING_OP_RECV:
kfree(req->sr_msg.kbuf);
- /* fallthrough */
- case IORING_OP_SENDMSG:
- if (io->msg.iov != io->msg.fast_iov)
- kfree(io->msg.iov);
- break;
- case IORING_OP_RECV:
- if (req->flags & REQ_F_BUFFER_SELECTED)
- kfree(req->sr_msg.kbuf);
- break;
- case IORING_OP_OPENAT:
- case IORING_OP_OPENAT2:
- break;
- case IORING_OP_SPLICE:
- case IORING_OP_TEE:
- io_put_file(req, req->splice.file_in,
- (req->splice.flags & SPLICE_F_FD_IN_FIXED));
- break;
+ break;
+ }
+ req->flags &= ~REQ_F_BUFFER_SELECTED;
}
- req->flags &= ~REQ_F_NEED_CLEANUP;
+ if (req->flags & REQ_F_NEED_CLEANUP) {
+ switch (req->opcode) {
+ case IORING_OP_READV:
+ case IORING_OP_READ_FIXED:
+ case IORING_OP_READ:
+ case IORING_OP_WRITEV:
+ case IORING_OP_WRITE_FIXED:
+ case IORING_OP_WRITE:
+ if (io->rw.iov != io->rw.fast_iov)
+ kfree(io->rw.iov);
+ break;
+ case IORING_OP_RECVMSG:
+ case IORING_OP_SENDMSG:
+ if (io->msg.iov != io->msg.fast_iov)
+ kfree(io->msg.iov);
+ break;
+ case IORING_OP_SPLICE:
+ case IORING_OP_TEE:
+ io_put_file(req, req->splice.file_in,
+ (req->splice.flags & SPLICE_F_FD_IN_FIXED));
+ break;
+ }
+ req->flags &= ~REQ_F_NEED_CLEANUP;
+ }
}
static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
- bool force_nonblock)
+ bool force_nonblock, struct io_comp_state *cs)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
switch (req->opcode) {
case IORING_OP_NOP:
- ret = io_nop(req);
+ ret = io_nop(req, cs);
break;
case IORING_OP_READV:
case IORING_OP_READ_FIXED:
@@ -5183,7 +5517,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret < 0)
break;
}
- ret = io_read(req, force_nonblock);
+ ret = io_read(req, force_nonblock, cs);
break;
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
@@ -5193,7 +5527,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret < 0)
break;
}
- ret = io_write(req, force_nonblock);
+ ret = io_write(req, force_nonblock, cs);
break;
case IORING_OP_FSYNC:
if (sqe) {
@@ -5235,9 +5569,9 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
break;
}
if (req->opcode == IORING_OP_SENDMSG)
- ret = io_sendmsg(req, force_nonblock);
+ ret = io_sendmsg(req, force_nonblock, cs);
else
- ret = io_send(req, force_nonblock);
+ ret = io_send(req, force_nonblock, cs);
break;
case IORING_OP_RECVMSG:
case IORING_OP_RECV:
@@ -5247,9 +5581,9 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
break;
}
if (req->opcode == IORING_OP_RECVMSG)
- ret = io_recvmsg(req, force_nonblock);
+ ret = io_recvmsg(req, force_nonblock, cs);
else
- ret = io_recv(req, force_nonblock);
+ ret = io_recv(req, force_nonblock, cs);
break;
case IORING_OP_TIMEOUT:
if (sqe) {
@@ -5273,7 +5607,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_accept(req, force_nonblock);
+ ret = io_accept(req, force_nonblock, cs);
break;
case IORING_OP_CONNECT:
if (sqe) {
@@ -5281,7 +5615,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_connect(req, force_nonblock);
+ ret = io_connect(req, force_nonblock, cs);
break;
case IORING_OP_ASYNC_CANCEL:
if (sqe) {
@@ -5313,7 +5647,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_close(req, force_nonblock);
+ ret = io_close(req, force_nonblock, cs);
break;
case IORING_OP_FILES_UPDATE:
if (sqe) {
@@ -5321,7 +5655,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_files_update(req, force_nonblock);
+ ret = io_files_update(req, force_nonblock, cs);
break;
case IORING_OP_STATX:
if (sqe) {
@@ -5361,7 +5695,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_epoll_ctl(req, force_nonblock);
+ ret = io_epoll_ctl(req, force_nonblock, cs);
break;
case IORING_OP_SPLICE:
if (sqe) {
@@ -5377,7 +5711,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_provide_buffers(req, force_nonblock);
+ ret = io_provide_buffers(req, force_nonblock, cs);
break;
case IORING_OP_REMOVE_BUFFERS:
if (sqe) {
@@ -5385,7 +5719,7 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
if (ret)
break;
}
- ret = io_remove_buffers(req, force_nonblock);
+ ret = io_remove_buffers(req, force_nonblock, cs);
break;
case IORING_OP_TEE:
if (sqe) {
@@ -5420,25 +5754,15 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
return 0;
}
-static void io_arm_async_linked_timeout(struct io_kiocb *req)
+static struct io_wq_work *io_wq_submit_work(struct io_wq_work *work)
{
- struct io_kiocb *link;
-
- /* link head's timeout is queued in io_queue_async_work() */
- if (!(req->flags & REQ_F_QUEUE_TIMEOUT))
- return;
-
- link = list_first_entry(&req->link_list, struct io_kiocb, link_list);
- io_queue_linked_timeout(link);
-}
-
-static void io_wq_submit_work(struct io_wq_work **workptr)
-{
- struct io_wq_work *work = *workptr;
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
+ struct io_kiocb *timeout;
int ret = 0;
- io_arm_async_linked_timeout(req);
+ timeout = io_prep_linked_timeout(req);
+ if (timeout)
+ io_queue_linked_timeout(timeout);
/* if NO_CANCEL is set, we must still run the work */
if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
@@ -5448,7 +5772,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
if (!ret) {
do {
- ret = io_issue_sqe(req, NULL, false);
+ ret = io_issue_sqe(req, NULL, false, NULL);
/*
* We can get EAGAIN for polled IO even though we're
* forcing a sync submission from here, since we can't
@@ -5462,11 +5786,10 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
if (ret) {
req_set_fail_links(req);
- io_cqring_add_event(req, ret);
- io_put_req(req);
+ io_req_complete(req, ret);
}
- io_steal_work(req, workptr);
+ return io_steal_work(req);
}
static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
@@ -5523,6 +5846,8 @@ static int io_grab_files(struct io_kiocb *req)
int ret = -EBADF;
struct io_ring_ctx *ctx = req->ctx;
+ io_req_init_async(req);
+
if (req->work.files || (req->flags & REQ_F_NO_FILE_TABLE))
return 0;
if (!ctx->ring_file)
@@ -5548,6 +5873,13 @@ static int io_grab_files(struct io_kiocb *req)
return ret;
}
+static inline int io_prep_work_files(struct io_kiocb *req)
+{
+ if (!io_op_defs[req->opcode].file_table)
+ return 0;
+ return io_grab_files(req);
+}
+
static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
{
struct io_timeout_data *data = container_of(timer,
@@ -5580,8 +5912,7 @@ static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
io_async_find_and_cancel(ctx, req, prev->user_data, -ETIME);
io_put_req(prev);
} else {
- io_cqring_add_event(req, -ETIME);
- io_put_req(req);
+ io_req_complete(req, -ETIME);
}
return HRTIMER_NORESTART;
}
@@ -5614,8 +5945,7 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
if (!(req->flags & REQ_F_LINK_HEAD))
return NULL;
- /* for polled retry, if flag is set, we already went through here */
- if (req->flags & REQ_F_POLLED)
+ if (req->flags & REQ_F_LINK_TIMEOUT)
return NULL;
nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
@@ -5627,7 +5957,8 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
return nxt;
}
-static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
+ struct io_comp_state *cs)
{
struct io_kiocb *linked_timeout;
struct io_kiocb *nxt;
@@ -5647,54 +5978,45 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
old_creds = override_creds(req->work.creds);
}
- ret = io_issue_sqe(req, sqe, true);
+ ret = io_issue_sqe(req, sqe, true, cs);
/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts
*/
- if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
- (req->flags & REQ_F_MUST_PUNT))) {
- if (io_arm_poll_handler(req)) {
- if (linked_timeout)
- io_queue_linked_timeout(linked_timeout);
- goto exit;
- }
+ if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
+ if (!io_arm_poll_handler(req)) {
punt:
- io_req_init_async(req);
-
- if (io_op_defs[req->opcode].file_table) {
- ret = io_grab_files(req);
- if (ret)
+ ret = io_prep_work_files(req);
+ if (unlikely(ret))
goto err;
+ /*
+ * Queued up for async execution, worker will release
+ * submit reference when the iocb is actually submitted.
+ */
+ io_queue_async_work(req);
}
- /*
- * Queued up for async execution, worker will release
- * submit reference when the iocb is actually submitted.
- */
- io_queue_async_work(req);
+ if (linked_timeout)
+ io_queue_linked_timeout(linked_timeout);
goto exit;
}
+ if (unlikely(ret)) {
err:
- nxt = NULL;
- /* drop submission reference */
- io_put_req_find_next(req, &nxt);
-
- if (linked_timeout) {
- if (!ret)
- io_queue_linked_timeout(linked_timeout);
- else
- io_put_req(linked_timeout);
- }
-
- /* and drop final reference, if we failed */
- if (ret) {
- io_cqring_add_event(req, ret);
+ /* un-prep timeout, so it'll be killed as any other linked */
+ req->flags &= ~REQ_F_LINK_TIMEOUT;
req_set_fail_links(req);
io_put_req(req);
+ io_req_complete(req, ret);
+ goto exit;
}
+
+ /* drop submission reference */
+ nxt = io_put_req_find_next(req);
+ if (linked_timeout)
+ io_queue_linked_timeout(linked_timeout);
+
if (nxt) {
req = nxt;
@@ -5707,7 +6029,8 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
revert_creds(old_creds);
}
-static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
+ struct io_comp_state *cs)
{
int ret;
@@ -5715,17 +6038,14 @@ static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (ret) {
if (ret != -EIOCBQUEUED) {
fail_req:
- io_cqring_add_event(req, ret);
req_set_fail_links(req);
- io_double_put_req(req);
+ io_put_req(req);
+ io_req_complete(req, ret);
}
} else if (req->flags & REQ_F_FORCE_ASYNC) {
if (!req->io) {
- ret = -EAGAIN;
- if (io_alloc_async_ctx(req))
- goto fail_req;
ret = io_req_defer_prep(req, sqe);
- if (unlikely(ret < 0))
+ if (unlikely(ret))
goto fail_req;
}
@@ -5737,21 +6057,22 @@ static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
req->work.flags |= IO_WQ_WORK_CONCURRENT;
io_queue_async_work(req);
} else {
- __io_queue_sqe(req, sqe);
+ __io_queue_sqe(req, sqe, cs);
}
}
-static inline void io_queue_link_head(struct io_kiocb *req)
+static inline void io_queue_link_head(struct io_kiocb *req,
+ struct io_comp_state *cs)
{
if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
- io_cqring_add_event(req, -ECANCELED);
- io_double_put_req(req);
+ io_put_req(req);
+ io_req_complete(req, -ECANCELED);
} else
- io_queue_sqe(req, NULL);
+ io_queue_sqe(req, NULL, cs);
}
static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
- struct io_kiocb **link)
+ struct io_kiocb **link, struct io_comp_state *cs)
{
struct io_ring_ctx *ctx = req->ctx;
int ret;
@@ -5777,21 +6098,19 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
head->flags |= REQ_F_IO_DRAIN;
ctx->drain_next = 1;
}
- if (io_alloc_async_ctx(req))
- return -EAGAIN;
-
ret = io_req_defer_prep(req, sqe);
- if (ret) {
+ if (unlikely(ret)) {
/* fail even hard links since we don't submit */
head->flags |= REQ_F_FAIL_LINK;
return ret;
}
trace_io_uring_link(ctx, req, head);
+ io_get_req_task(req);
list_add_tail(&req->link_list, &head->link_list);
/* last request of a link, enqueue the link */
if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) {
- io_queue_link_head(head);
+ io_queue_link_head(head, cs);
*link = NULL;
}
} else {
@@ -5803,15 +6122,12 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
req->flags |= REQ_F_LINK_HEAD;
INIT_LIST_HEAD(&req->link_list);
- if (io_alloc_async_ctx(req))
- return -EAGAIN;
-
ret = io_req_defer_prep(req, sqe);
- if (ret)
+ if (unlikely(ret))
req->flags |= REQ_F_FAIL_LINK;
*link = req;
} else {
- io_queue_sqe(req, sqe);
+ io_queue_sqe(req, sqe, cs);
}
}
@@ -5823,6 +6139,8 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
*/
static void io_submit_state_end(struct io_submit_state *state)
{
+ if (!list_empty(&state->comp.list))
+ io_submit_flush_completions(&state->comp);
blk_finish_plug(&state->plug);
io_state_file_put(state);
if (state->free_reqs)
@@ -5833,9 +6151,15 @@ static void io_submit_state_end(struct io_submit_state *state)
* Start submission side cache.
*/
static void io_submit_state_start(struct io_submit_state *state,
- unsigned int max_ios)
+ struct io_ring_ctx *ctx, unsigned int max_ios)
{
blk_start_plug(&state->plug);
+#ifdef CONFIG_BLOCK
+ state->plug.nowait = true;
+#endif
+ state->comp.nr = 0;
+ INIT_LIST_HEAD(&state->comp.list);
+ state->comp.ctx = ctx;
state->free_reqs = 0;
state->file = NULL;
state->ios_left = max_ios;
@@ -5900,12 +6224,6 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
unsigned int sqe_flags;
int id;
- /*
- * All io need record the previous position, if LINK vs DARIN,
- * it can be used to mark the position of the first IO in the
- * link list.
- */
- req->sequence = ctx->cached_sq_head - ctx->cached_sq_dropped;
req->opcode = READ_ONCE(sqe->opcode);
req->user_data = READ_ONCE(sqe->user_data);
req->io = NULL;
@@ -5953,7 +6271,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
struct file *ring_file, int ring_fd)
{
- struct io_submit_state state, *statep = NULL;
+ struct io_submit_state state;
struct io_kiocb *link = NULL;
int i, submitted = 0;
@@ -5970,10 +6288,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
if (!percpu_ref_tryget_many(&ctx->refs, nr))
return -EAGAIN;
- if (nr > IO_PLUG_THRESHOLD) {
- io_submit_state_start(&state, nr);
- statep = &state;
- }
+ io_submit_state_start(&state, ctx, nr);
ctx->ring_fd = ring_fd;
ctx->ring_file = ring_file;
@@ -5988,28 +6303,28 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
io_consume_sqe(ctx);
break;
}
- req = io_alloc_req(ctx, statep);
+ req = io_alloc_req(ctx, &state);
if (unlikely(!req)) {
if (!submitted)
submitted = -EAGAIN;
break;
}
- err = io_init_req(ctx, req, sqe, statep);
+ err = io_init_req(ctx, req, sqe, &state);
io_consume_sqe(ctx);
/* will complete beyond this point, count as submitted */
submitted++;
if (unlikely(err)) {
fail_req:
- io_cqring_add_event(req, err);
- io_double_put_req(req);
+ io_put_req(req);
+ io_req_complete(req, err);
break;
}
trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
true, io_async_submit(ctx));
- err = io_submit_sqe(req, sqe, &link);
+ err = io_submit_sqe(req, sqe, &link, &state.comp);
if (err)
goto fail_req;
}
@@ -6020,9 +6335,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
percpu_ref_put_many(&ctx->refs, nr - ref_used);
}
if (link)
- io_queue_link_head(link);
- if (statep)
- io_submit_state_end(&state);
+ io_queue_link_head(link, &state.comp);
+ io_submit_state_end(&state);
/* Commit SQ ring head once we've consumed and submitted all SQEs */
io_commit_sqring(ctx);
@@ -6030,6 +6344,21 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
return submitted;
}
+static inline void io_ring_set_wakeup_flag(struct io_ring_ctx *ctx)
+{
+ /* Tell userspace we may need a wakeup call */
+ spin_lock_irq(&ctx->completion_lock);
+ ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
+ spin_unlock_irq(&ctx->completion_lock);
+}
+
+static inline void io_ring_clear_wakeup_flag(struct io_ring_ctx *ctx)
+{
+ spin_lock_irq(&ctx->completion_lock);
+ ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
+ spin_unlock_irq(&ctx->completion_lock);
+}
+
static int io_sq_thread(void *data)
{
struct io_ring_ctx *ctx = data;
@@ -6046,12 +6375,12 @@ static int io_sq_thread(void *data)
while (!kthread_should_park()) {
unsigned int to_submit;
- if (!list_empty(&ctx->poll_list)) {
+ if (!list_empty(&ctx->iopoll_list)) {
unsigned nr_events = 0;
mutex_lock(&ctx->uring_lock);
- if (!list_empty(&ctx->poll_list))
- io_iopoll_getevents(ctx, &nr_events, 0);
+ if (!list_empty(&ctx->iopoll_list) && !need_resched())
+ io_do_iopoll(ctx, &nr_events, 0);
else
timeout = jiffies + ctx->sq_thread_idle;
mutex_unlock(&ctx->uring_lock);
@@ -6070,7 +6399,7 @@ static int io_sq_thread(void *data)
* adding ourselves to the waitqueue, as the unuse/drop
* may sleep.
*/
- io_sq_thread_drop_mm(ctx);
+ io_sq_thread_drop_mm();
/*
* We're polling. If we're within the defined idle
@@ -6079,11 +6408,10 @@ static int io_sq_thread(void *data)
* more IO, we should wait for the application to
* reap events and wake us up.
*/
- if (!list_empty(&ctx->poll_list) || need_resched() ||
+ if (!list_empty(&ctx->iopoll_list) || need_resched() ||
(!time_after(jiffies, timeout) && ret != -EBUSY &&
!percpu_ref_is_dying(&ctx->refs))) {
- if (current->task_works)
- task_work_run();
+ io_run_task_work();
cond_resched();
continue;
}
@@ -6093,21 +6421,18 @@ static int io_sq_thread(void *data)
/*
* While doing polled IO, before going to sleep, we need
- * to check if there are new reqs added to poll_list, it
- * is because reqs may have been punted to io worker and
- * will be added to poll_list later, hence check the
- * poll_list again.
+ * to check if there are new reqs added to iopoll_list,
+ * it is because reqs may have been punted to io worker
+ * and will be added to iopoll_list later, hence check
+ * the iopoll_list again.
*/
if ((ctx->flags & IORING_SETUP_IOPOLL) &&
- !list_empty_careful(&ctx->poll_list)) {
+ !list_empty_careful(&ctx->iopoll_list)) {
finish_wait(&ctx->sqo_wait, &wait);
continue;
}
- /* Tell userspace we may need a wakeup call */
- spin_lock_irq(&ctx->completion_lock);
- ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
- spin_unlock_irq(&ctx->completion_lock);
+ io_ring_set_wakeup_flag(ctx);
to_submit = io_sqring_entries(ctx);
if (!to_submit || ret == -EBUSY) {
@@ -6115,9 +6440,9 @@ static int io_sq_thread(void *data)
finish_wait(&ctx->sqo_wait, &wait);
break;
}
- if (current->task_works) {
- task_work_run();
+ if (io_run_task_work()) {
finish_wait(&ctx->sqo_wait, &wait);
+ io_ring_clear_wakeup_flag(ctx);
continue;
}
if (signal_pending(current))
@@ -6125,17 +6450,13 @@ static int io_sq_thread(void *data)
schedule();
finish_wait(&ctx->sqo_wait, &wait);
- spin_lock_irq(&ctx->completion_lock);
- ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
- spin_unlock_irq(&ctx->completion_lock);
+ io_ring_clear_wakeup_flag(ctx);
ret = 0;
continue;
}
finish_wait(&ctx->sqo_wait, &wait);
- spin_lock_irq(&ctx->completion_lock);
- ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
- spin_unlock_irq(&ctx->completion_lock);
+ io_ring_clear_wakeup_flag(ctx);
}
mutex_lock(&ctx->uring_lock);
@@ -6145,10 +6466,9 @@ static int io_sq_thread(void *data)
timeout = jiffies + ctx->sq_thread_idle;
}
- if (current->task_works)
- task_work_run();
+ io_run_task_work();
- io_sq_thread_drop_mm(ctx);
+ io_sq_thread_drop_mm();
revert_creds(old_cred);
kthread_parkme();
@@ -6211,9 +6531,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
do {
if (io_cqring_events(ctx, false) >= min_events)
return 0;
- if (!current->task_works)
+ if (!io_run_task_work())
break;
- task_work_run();
} while (1);
if (sig) {
@@ -6235,8 +6554,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
TASK_INTERRUPTIBLE);
/* make sure we run task_work before checking for signals */
- if (current->task_works)
- task_work_run();
+ if (io_run_task_work())
+ continue;
if (signal_pending(current)) {
if (current->jobctl & JOBCTL_TASK_WORK) {
spin_lock_irq(¤t->sighand->siglock);
@@ -7022,17 +7341,21 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
return 0;
err:
io_finish_async(ctx);
- mmdrop(ctx->sqo_mm);
- ctx->sqo_mm = NULL;
+ if (ctx->sqo_mm) {
+ mmdrop(ctx->sqo_mm);
+ ctx->sqo_mm = NULL;
+ }
return ret;
}
-static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
+static inline void __io_unaccount_mem(struct user_struct *user,
+ unsigned long nr_pages)
{
atomic_long_sub(nr_pages, &user->locked_vm);
}
-static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
+static inline int __io_account_mem(struct user_struct *user,
+ unsigned long nr_pages)
{
unsigned long page_limit, cur_pages, new_pages;
@@ -7050,6 +7373,41 @@ static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
return 0;
}
+static void io_unaccount_mem(struct io_ring_ctx *ctx, unsigned long nr_pages,
+ enum io_mem_account acct)
+{
+ if (ctx->limit_mem)
+ __io_unaccount_mem(ctx->user, nr_pages);
+
+ if (ctx->sqo_mm) {
+ if (acct == ACCT_LOCKED)
+ ctx->sqo_mm->locked_vm -= nr_pages;
+ else if (acct == ACCT_PINNED)
+ atomic64_sub(nr_pages, &ctx->sqo_mm->pinned_vm);
+ }
+}
+
+static int io_account_mem(struct io_ring_ctx *ctx, unsigned long nr_pages,
+ enum io_mem_account acct)
+{
+ int ret;
+
+ if (ctx->limit_mem) {
+ ret = __io_account_mem(ctx->user, nr_pages);
+ if (ret)
+ return ret;
+ }
+
+ if (ctx->sqo_mm) {
+ if (acct == ACCT_LOCKED)
+ ctx->sqo_mm->locked_vm += nr_pages;
+ else if (acct == ACCT_PINNED)
+ atomic64_add(nr_pages, &ctx->sqo_mm->pinned_vm);
+ }
+
+ return 0;
+}
+
static void io_mem_free(void *ptr)
{
struct page *page;
@@ -7086,6 +7444,9 @@ static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
return SIZE_MAX;
#endif
+ if (sq_offset)
+ *sq_offset = off;
+
sq_array_size = array_size(sizeof(u32), sq_entries);
if (sq_array_size == SIZE_MAX)
return SIZE_MAX;
@@ -7093,9 +7454,6 @@ static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
if (check_add_overflow(off, sq_array_size, &off))
return SIZE_MAX;
- if (sq_offset)
- *sq_offset = off;
-
return off;
}
@@ -7124,8 +7482,7 @@ static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
for (j = 0; j < imu->nr_bvecs; j++)
unpin_user_page(imu->bvec[j].bv_page);
- if (ctx->account_mem)
- io_unaccount_mem(ctx->user, imu->nr_bvecs);
+ io_unaccount_mem(ctx, imu->nr_bvecs, ACCT_PINNED);
kvfree(imu->bvec);
imu->nr_bvecs = 0;
}
@@ -7208,11 +7565,9 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
start = ubuf >> PAGE_SHIFT;
nr_pages = end - start;
- if (ctx->account_mem) {
- ret = io_account_mem(ctx->user, nr_pages);
- if (ret)
- goto err;
- }
+ ret = io_account_mem(ctx, nr_pages, ACCT_PINNED);
+ if (ret)
+ goto err;
ret = 0;
if (!pages || nr_pages > got_pages) {
@@ -7225,8 +7580,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
GFP_KERNEL);
if (!pages || !vmas) {
ret = -ENOMEM;
- if (ctx->account_mem)
- io_unaccount_mem(ctx->user, nr_pages);
+ io_unaccount_mem(ctx, nr_pages, ACCT_PINNED);
goto err;
}
got_pages = nr_pages;
@@ -7236,8 +7590,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
GFP_KERNEL);
ret = -ENOMEM;
if (!imu->bvec) {
- if (ctx->account_mem)
- io_unaccount_mem(ctx->user, nr_pages);
+ io_unaccount_mem(ctx, nr_pages, ACCT_PINNED);
goto err;
}
@@ -7268,8 +7621,7 @@ static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
*/
if (pret > 0)
unpin_user_pages(pages, pret);
- if (ctx->account_mem)
- io_unaccount_mem(ctx->user, nr_pages);
+ io_unaccount_mem(ctx, nr_pages, ACCT_PINNED);
kvfree(imu->bvec);
goto err;
}
@@ -7353,11 +7705,12 @@ static void io_destroy_buffers(struct io_ring_ctx *ctx)
static void io_ring_ctx_free(struct io_ring_ctx *ctx)
{
io_finish_async(ctx);
- if (ctx->sqo_mm)
- mmdrop(ctx->sqo_mm);
-
- io_iopoll_reap_events(ctx);
io_sqe_buffer_unregister(ctx);
+ if (ctx->sqo_mm) {
+ mmdrop(ctx->sqo_mm);
+ ctx->sqo_mm = NULL;
+ }
+
io_sqe_files_unregister(ctx);
io_eventfd_unregister(ctx);
io_destroy_buffers(ctx);
@@ -7421,11 +7774,8 @@ static int io_remove_personalities(int id, void *p, void *data)
static void io_ring_exit_work(struct work_struct *work)
{
- struct io_ring_ctx *ctx;
-
- ctx = container_of(work, struct io_ring_ctx, exit_work);
- if (ctx->rings)
- io_cqring_overflow_flush(ctx, true);
+ struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
+ exit_work);
/*
* If we're doing polled IO and end up having requests being
@@ -7433,11 +7783,11 @@ static void io_ring_exit_work(struct work_struct *work)
* we're waiting for refs to drop. We need to reap these manually,
* as nobody else will be looking for them.
*/
- while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)) {
- io_iopoll_reap_events(ctx);
+ do {
if (ctx->rings)
io_cqring_overflow_flush(ctx, true);
- }
+ io_iopoll_try_reap_events(ctx);
+ } while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20));
io_ring_ctx_free(ctx);
}
@@ -7453,10 +7803,10 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
if (ctx->io_wq)
io_wq_cancel_all(ctx->io_wq);
- io_iopoll_reap_events(ctx);
/* if we failed setting up the ctx, we might not have any rings */
if (ctx->rings)
io_cqring_overflow_flush(ctx, true);
+ io_iopoll_try_reap_events(ctx);
idr_for_each(&ctx->personality_idr, io_remove_personalities, ctx);
/*
@@ -7464,9 +7814,8 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
* is closed but resources aren't reaped yet. This can cause
* spurious failure in setting up a new ring.
*/
- if (ctx->account_mem)
- io_unaccount_mem(ctx->user,
- ring_pages(ctx->sq_entries, ctx->cq_entries));
+ io_unaccount_mem(ctx, ring_pages(ctx->sq_entries, ctx->cq_entries),
+ ACCT_LOCKED);
INIT_WORK(&ctx->exit_work, io_ring_exit_work);
queue_work(system_wq, &ctx->exit_work);
@@ -7522,17 +7871,14 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
if (cancel_req->flags & REQ_F_OVERFLOW) {
spin_lock_irq(&ctx->completion_lock);
- list_del(&cancel_req->list);
+ list_del(&cancel_req->compl.list);
cancel_req->flags &= ~REQ_F_OVERFLOW;
- if (list_empty(&ctx->cq_overflow_list)) {
- clear_bit(0, &ctx->sq_check_overflow);
- clear_bit(0, &ctx->cq_check_overflow);
- ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW;
- }
- spin_unlock_irq(&ctx->completion_lock);
+ io_cqring_mark_overflow(ctx);
WRITE_ONCE(ctx->rings->cq_overflow,
atomic_inc_return(&ctx->cached_cq_overflow));
+ io_commit_cqring(ctx);
+ spin_unlock_irq(&ctx->completion_lock);
/*
* Put inflight ref and overflow ref. If that's
@@ -7655,8 +8001,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
int submitted = 0;
struct fd f;
- if (current->task_works)
- task_work_run();
+ io_run_task_work();
if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
return -EINVAL;
@@ -7695,8 +8040,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
goto out;
}
if (flags & IORING_ENTER_GETEVENTS) {
- unsigned nr_events = 0;
-
min_complete = min(min_complete, ctx->cq_entries);
/*
@@ -7707,7 +8050,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
*/
if (ctx->flags & IORING_SETUP_IOPOLL &&
!(ctx->flags & IORING_SETUP_SQPOLL)) {
- ret = io_iopoll_check(ctx, &nr_events, min_complete);
+ ret = io_iopoll_check(ctx, min_complete);
} else {
ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
}
@@ -7912,7 +8255,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
{
struct user_struct *user = NULL;
struct io_ring_ctx *ctx;
- bool account_mem;
+ bool limit_mem;
int ret;
if (!entries)
@@ -7951,10 +8294,10 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
}
user = get_uid(current_user());
- account_mem = !capable(CAP_IPC_LOCK);
+ limit_mem = !capable(CAP_IPC_LOCK);
- if (account_mem) {
- ret = io_account_mem(user,
+ if (limit_mem) {
+ ret = __io_account_mem(user,
ring_pages(p->sq_entries, p->cq_entries));
if (ret) {
free_uid(user);
@@ -7964,14 +8307,13 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
ctx = io_ring_ctx_alloc(p);
if (!ctx) {
- if (account_mem)
- io_unaccount_mem(user, ring_pages(p->sq_entries,
+ if (limit_mem)
+ __io_unaccount_mem(user, ring_pages(p->sq_entries,
p->cq_entries));
free_uid(user);
return -ENOMEM;
}
ctx->compat = in_compat_syscall();
- ctx->account_mem = account_mem;
ctx->user = user;
ctx->creds = get_current_cred();
@@ -8003,12 +8345,22 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
- IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL;
+ IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
+ IORING_FEAT_POLL_32BITS;
if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
goto err;
}
+
+ /*
+ * Account memory _before_ installing the file descriptor. Once
+ * the descriptor is installed, it can get closed at any time.
+ */
+ io_account_mem(ctx, ring_pages(p->sq_entries, p->cq_entries),
+ ACCT_LOCKED);
+ ctx->limit_mem = limit_mem;
+
/*
* Install ring fd as the very last thing, so we don't risk someone
* having closed it before we finish setup
@@ -8292,7 +8644,8 @@ static int __init io_uring_init(void)
BUILD_BUG_SQE_ELEM(28, /* compat */ int, rw_flags);
BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
BUILD_BUG_SQE_ELEM(28, __u32, fsync_flags);
- BUILD_BUG_SQE_ELEM(28, __u16, poll_events);
+ BUILD_BUG_SQE_ELEM(28, /* compat */ __u16, poll_events);
+ BUILD_BUG_SQE_ELEM(28, __u32, poll32_events);
BUILD_BUG_SQE_ELEM(28, __u32, sync_range_flags);
BUILD_BUG_SQE_ELEM(28, __u32, msg_flags);
BUILD_BUG_SQE_ELEM(28, __u32, timeout_flags);
diff --git a/fs/xfs/xfs_file.c b/fs/xfs/xfs_file.c
index 00db81e..fdbff48 100644
--- a/fs/xfs/xfs_file.c
+++ b/fs/xfs/xfs_file.c
@@ -1080,7 +1080,7 @@ xfs_file_open(
return -EFBIG;
if (XFS_FORCED_SHUTDOWN(XFS_M(inode->i_sb)))
return -EIO;
- file->f_mode |= FMODE_NOWAIT;
+ file->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
return 0;
}
diff --git a/include/linux/blkdev.h b/include/linux/blkdev.h
index 9ab06ea..06ecb2c 100644
--- a/include/linux/blkdev.h
+++ b/include/linux/blkdev.h
@@ -1180,6 +1180,7 @@ struct blk_plug {
struct list_head cb_list; /* md requires an unplug callback */
unsigned short rq_count;
bool multiple_queues;
+ bool nowait;
};
#define BLK_MAX_REQUEST_COUNT 16
#define BLK_PLUG_FLUSH_SIZE (128 * 1024)
diff --git a/include/linux/fs.h b/include/linux/fs.h
index cdfed8c..bd7ec3e 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -175,6 +175,9 @@ typedef int (dio_iodone_t)(struct kiocb *iocb, loff_t offset,
/* File does not contribute to nr_files count */
#define FMODE_NOACCOUNT ((__force fmode_t)0x20000000)
+/* File supports async buffered reads */
+#define FMODE_BUF_RASYNC ((__force fmode_t)0x40000000)
+
/*
* Flag for rw_copy_check_uvector and compat_rw_copy_check_uvector
* that indicates that they should check the contents of the iovec are
@@ -315,6 +318,8 @@ enum rw_hint {
#define IOCB_SYNC (1 << 5)
#define IOCB_WRITE (1 << 6)
#define IOCB_NOWAIT (1 << 7)
+/* iocb->ki_waitq is valid */
+#define IOCB_WAITQ (1 << 8)
#define IOCB_NOIO (1 << 9)
struct kiocb {
@@ -329,7 +334,10 @@ struct kiocb {
int ki_flags;
u16 ki_hint;
u16 ki_ioprio; /* See linux/ioprio.h */
- unsigned int ki_cookie; /* for ->iopoll */
+ union {
+ unsigned int ki_cookie; /* for ->iopoll */
+ struct wait_page_queue *ki_waitq; /* for async buffered IO */
+ };
randomized_struct_fields_end
};
@@ -3275,22 +3283,28 @@ static inline int iocb_flags(struct file *file)
static inline int kiocb_set_rw_flags(struct kiocb *ki, rwf_t flags)
{
+ int kiocb_flags = 0;
+
+ if (!flags)
+ return 0;
if (unlikely(flags & ~RWF_SUPPORTED))
return -EOPNOTSUPP;
if (flags & RWF_NOWAIT) {
if (!(ki->ki_filp->f_mode & FMODE_NOWAIT))
return -EOPNOTSUPP;
- ki->ki_flags |= IOCB_NOWAIT;
+ kiocb_flags |= IOCB_NOWAIT;
}
if (flags & RWF_HIPRI)
- ki->ki_flags |= IOCB_HIPRI;
+ kiocb_flags |= IOCB_HIPRI;
if (flags & RWF_DSYNC)
- ki->ki_flags |= IOCB_DSYNC;
+ kiocb_flags |= IOCB_DSYNC;
if (flags & RWF_SYNC)
- ki->ki_flags |= (IOCB_DSYNC | IOCB_SYNC);
+ kiocb_flags |= (IOCB_DSYNC | IOCB_SYNC);
if (flags & RWF_APPEND)
- ki->ki_flags |= IOCB_APPEND;
+ kiocb_flags |= IOCB_APPEND;
+
+ ki->ki_flags |= kiocb_flags;
return 0;
}
diff --git a/include/linux/pagemap.h b/include/linux/pagemap.h
index cf2468d..d1f4eff 100644
--- a/include/linux/pagemap.h
+++ b/include/linux/pagemap.h
@@ -496,8 +496,35 @@ static inline pgoff_t linear_page_index(struct vm_area_struct *vma,
return pgoff;
}
+/* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
+struct wait_page_key {
+ struct page *page;
+ int bit_nr;
+ int page_match;
+};
+
+struct wait_page_queue {
+ struct page *page;
+ int bit_nr;
+ wait_queue_entry_t wait;
+};
+
+static inline bool wake_page_match(struct wait_page_queue *wait_page,
+ struct wait_page_key *key)
+{
+ if (wait_page->page != key->page)
+ return false;
+ key->page_match = 1;
+
+ if (wait_page->bit_nr != key->bit_nr)
+ return false;
+
+ return true;
+}
+
extern void __lock_page(struct page *page);
extern int __lock_page_killable(struct page *page);
+extern int __lock_page_async(struct page *page, struct wait_page_queue *wait);
extern int __lock_page_or_retry(struct page *page, struct mm_struct *mm,
unsigned int flags);
extern void unlock_page(struct page *page);
@@ -535,6 +562,22 @@ static inline int lock_page_killable(struct page *page)
}
/*
+ * lock_page_async - Lock the page, unless this would block. If the page
+ * is already locked, then queue a callback when the page becomes unlocked.
+ * This callback can then retry the operation.
+ *
+ * Returns 0 if the page is locked successfully, or -EIOCBQUEUED if the page
+ * was already locked and the callback defined in 'wait' was queued.
+ */
+static inline int lock_page_async(struct page *page,
+ struct wait_page_queue *wait)
+{
+ if (!trylock_page(page))
+ return __lock_page_async(page, wait);
+ return 0;
+}
+
+/*
* lock_page_or_retry - Lock the page, unless this would block and the
* caller indicated that it can handle a retry.
*
diff --git a/include/linux/sched/task.h b/include/linux/sched/task.h
index 3835907..1301077 100644
--- a/include/linux/sched/task.h
+++ b/include/linux/sched/task.h
@@ -126,6 +126,12 @@ static inline void put_task_struct(struct task_struct *t)
__put_task_struct(t);
}
+static inline void put_task_struct_many(struct task_struct *t, int nr)
+{
+ if (refcount_sub_and_test(nr, &t->usage))
+ __put_task_struct(t);
+}
+
void put_task_struct_rcu_user(struct task_struct *task);
#ifdef CONFIG_ARCH_WANTS_DYNAMIC_TASK_STRUCT
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 7843742..d65fde732 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -31,7 +31,8 @@ struct io_uring_sqe {
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
- __u16 poll_events;
+ __u16 poll_events; /* compatibility */
+ __u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
@@ -249,6 +250,7 @@ struct io_uring_params {
#define IORING_FEAT_RW_CUR_POS (1U << 3)
#define IORING_FEAT_CUR_PERSONALITY (1U << 4)
#define IORING_FEAT_FAST_POLL (1U << 5)
+#define IORING_FEAT_POLL_32BITS (1U << 6)
/*
* io_uring_register(2) opcodes and arguments
diff --git a/mm/filemap.c b/mm/filemap.c
index 991503b..9f131f1 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -987,19 +987,6 @@ void __init pagecache_init(void)
page_writeback_init();
}
-/* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
-struct wait_page_key {
- struct page *page;
- int bit_nr;
- int page_match;
-};
-
-struct wait_page_queue {
- struct page *page;
- int bit_nr;
- wait_queue_entry_t wait;
-};
-
static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
{
int ret;
@@ -1007,11 +994,7 @@ static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync,
struct wait_page_queue *wait_page
= container_of(wait, struct wait_page_queue, wait);
- if (wait_page->page != key->page)
- return 0;
- key->page_match = 1;
-
- if (wait_page->bit_nr != key->bit_nr)
+ if (!wake_page_match(wait_page, key))
return 0;
/*
@@ -1240,6 +1223,44 @@ int wait_on_page_bit_killable(struct page *page, int bit_nr)
}
EXPORT_SYMBOL(wait_on_page_bit_killable);
+static int __wait_on_page_locked_async(struct page *page,
+ struct wait_page_queue *wait, bool set)
+{
+ struct wait_queue_head *q = page_waitqueue(page);
+ int ret = 0;
+
+ wait->page = page;
+ wait->bit_nr = PG_locked;
+
+ spin_lock_irq(&q->lock);
+ __add_wait_queue_entry_tail(q, &wait->wait);
+ SetPageWaiters(page);
+ if (set)
+ ret = !trylock_page(page);
+ else
+ ret = PageLocked(page);
+ /*
+ * If we were succesful now, we know we're still on the
+ * waitqueue as we're still under the lock. This means it's
+ * safe to remove and return success, we know the callback
+ * isn't going to trigger.
+ */
+ if (!ret)
+ __remove_wait_queue(q, &wait->wait);
+ else
+ ret = -EIOCBQUEUED;
+ spin_unlock_irq(&q->lock);
+ return ret;
+}
+
+static int wait_on_page_locked_async(struct page *page,
+ struct wait_page_queue *wait)
+{
+ if (!PageLocked(page))
+ return 0;
+ return __wait_on_page_locked_async(compound_head(page), wait, false);
+}
+
/**
* put_and_wait_on_page_locked - Drop a reference and wait for it to be unlocked
* @page: The page to wait for.
@@ -1402,6 +1423,11 @@ int __lock_page_killable(struct page *__page)
}
EXPORT_SYMBOL_GPL(__lock_page_killable);
+int __lock_page_async(struct page *page, struct wait_page_queue *wait)
+{
+ return __wait_on_page_locked_async(page, wait, true);
+}
+
/*
* Return values:
* 1 - page is locked; mmap_lock is still held.
@@ -2061,7 +2087,7 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
page = find_get_page(mapping, index);
if (!page) {
- if (iocb->ki_flags & (IOCB_NOWAIT | IOCB_NOIO))
+ if (iocb->ki_flags & IOCB_NOIO)
goto would_block;
page_cache_sync_readahead(mapping,
ra, filp,
@@ -2080,17 +2106,25 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
index, last_index - index);
}
if (!PageUptodate(page)) {
- if (iocb->ki_flags & IOCB_NOWAIT) {
- put_page(page);
- goto would_block;
- }
-
/*
* See comment in do_read_cache_page on why
* wait_on_page_locked is used to avoid unnecessarily
* serialisations and why it's safe.
*/
- error = wait_on_page_locked_killable(page);
+ if (iocb->ki_flags & IOCB_WAITQ) {
+ if (written) {
+ put_page(page);
+ goto out;
+ }
+ error = wait_on_page_locked_async(page,
+ iocb->ki_waitq);
+ } else {
+ if (iocb->ki_flags & IOCB_NOWAIT) {
+ put_page(page);
+ goto would_block;
+ }
+ error = wait_on_page_locked_killable(page);
+ }
if (unlikely(error))
goto readpage_error;
if (PageUptodate(page))
@@ -2178,7 +2212,10 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
page_not_up_to_date:
/* Get exclusive access to the page ... */
- error = lock_page_killable(page);
+ if (iocb->ki_flags & IOCB_WAITQ)
+ error = lock_page_async(page, iocb->ki_waitq);
+ else
+ error = lock_page_killable(page);
if (unlikely(error))
goto readpage_error;
@@ -2197,7 +2234,7 @@ ssize_t generic_file_buffered_read(struct kiocb *iocb,
}
readpage:
- if (iocb->ki_flags & IOCB_NOIO) {
+ if (iocb->ki_flags & (IOCB_NOIO | IOCB_NOWAIT)) {
unlock_page(page);
put_page(page);
goto would_block;
diff --git a/tools/io_uring/liburing.h b/tools/io_uring/liburing.h
index 5f305c8..28a837b 100644
--- a/tools/io_uring/liburing.h
+++ b/tools/io_uring/liburing.h
@@ -10,6 +10,7 @@ extern "C" {
#include <string.h>
#include "../../include/uapi/linux/io_uring.h"
#include <inttypes.h>
+#include <linux/swab.h>
#include "barrier.h"
/*
@@ -145,11 +146,14 @@ static inline void io_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd,
}
static inline void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd,
- short poll_mask)
+ unsigned poll_mask)
{
memset(sqe, 0, sizeof(*sqe));
sqe->opcode = IORING_OP_POLL_ADD;
sqe->fd = fd;
+#if __BYTE_ORDER == __BIG_ENDIAN
+ poll_mask = __swahw32(poll_mask);
+#endif
sqe->poll_events = poll_mask;
}