io_uring api

api

数据结构定义好了,逻辑实现具体是如何驱动这些数据结构的呢?使用上,大体分为准备、提交、收割过程。

linux kernel仅仅提供了三个系统调用,简化了异步io的操作

1
2
3
4
5
6
7
8
9
10
// 初始化一个新的 io_uring 上下文,注册io_uring实例,包括注册共享内存等
int io_uring_setup(u32 entries, struct io_uring_params *p);

// 提交IO任务以及收割IO完成事件
int io_uring_enter(unsigned int fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags,
sigset_t *sig);

// 注册用户态和内核态的共享buffer或者文件,对共享内存或者内部数据结构长期持有,减少IO开销
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

源码分析

io_uring_setup

简介

io_uring通过io_uring_setup完成准备阶段,初始化io_uring实例,包括

  • 分配共享内存,该内存交由kernel管理

  • 初始化各个数据结构,如io_ring_ctx、io_rings、两个io_uring(sq和cq分别有一个)

  • 注册包含io_ring_ctx的文件,应用程序通过该文件的fd来获取io_ring_ctx,从而访问io_uring实例

  • 处理应用程序指定的flags相关的事宜,如SQ thread

接口如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int io_uring_setup(u32 entries, struct io_uring_params *p);

/*
* Passed in for io_uring_setup(2). Copied back with updated info on success
*/
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 features;
__u32 wq_fd;
__u32 resv[3];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};

其中,比较重要的flags有

  • IORING_SETUP_IOPOLL

    • 让内核采用 Polling 的模式收割Block层的请求。在收割IO时,以忙等待的方式,而不是异步中断通知(Interrupt Request)的方式,即应用程序需要不断调用io_uring_enter轮询设备来检查io是否完成。因此相比于IRQ,会消耗更多的cpu资源,但IO操作的延迟更低。该种方式需要依靠打开文件的时候,设置为 O_DIRECT 的标记。我没弄懂
    • 猜测:
      • 在IOPOLL启用时,会依靠轮询的方式收割block层的请求
      • 如果在IOPOLL开启后,SQPOLL也开启了,那么用户在收割完成事件时也不用阻塞了,SQ thread会处理该事情
  • IORING_SETUP_SQPOLL

    • 内核额外启用一个内核线程,称为SQ线程。这个内核线程可以运行在某个指定的 core 上(通过 sq_thread_cpu 配置)。这个内核线程会不停的 Poll SQ,除非在一段时间内没有 Poll 到任何请求(通过 sq_thread_idle 配置),才会被挂起。SQ线程不仅会处理IO提交,也会处理IO完成事件
  • IORING_SETUP_SINGLE_ISSUER

    • 只能有一个线程提交任务
  • IORING_SETUP_DEFER_TASKRUN

    • 在异步任务中,可能存在这种情况:在异步IO任务A提交后,该任务会加入到task work queue中,当cpu正在运行某个非常重要的任务B时,IO任务A可能从 task work queue中被调度出来,挤掉任务B的执行,导致这个非常重要的任务B的执行延迟变大,即执行时间增加。通过在io_uring_setup中设置IORING_SETUP_DEFER_TASKRUN,使得我们可以在用户调用io_uring_enter,并且带上IORING_ENTER_GETEVENTS时,才开始执行这些异步任务,例如IO任务A。这样就避免这些异步任务中断其他正在运行的任务。

源码

io_uring_setup系统调用的过程就是初始化相关数据结构,建立好对应的缓存区,然后通过系统调用的参数io_uring_params结构传递回去,告诉核外环内存地址在哪,起始指针的地址在哪等关键的信息。

需要初始化内存的内存分为三个区域,分别是SQ,CQ,SQEs。内核初始化SQ和CQ,SQ和CQ都是ring,此外,提交请求在SQ,CQ之间有一个间接数组,即内核提供了一个Submission Queue Entries(SQEs)数组。

io_uring_setup的逻辑可以分为以下三部分

  • 创建一个上下文结构io_ring_ctx用来管理整个会话。
  • 根据io_uring_params->sq_off/cq_off偏移量来实现SQ和CQ内存区的映射
  • 错误检查、权限检查、资源配额检查等检查逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/*
* Sets up an aio uring context, and returns the fd. Applications asks for a
* ring size, we return the actual sq/cq ring sizes (among other things) in the
* params structure passed in.
*/
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
// 检查params各成员是否valid

return io_uring_create(entries, &p, params);
}


// 真正执行setup的函数io_uring_create
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
struct io_ring_ctx *ctx;
struct io_uring_task *tctx;
struct file *file;
int ret;

// 检查p->flags的合法性
...

// 设置p->sq_entries和p->cq_entries,必须是2的幂次
...

// 创建io_ring_ctx,为io_ring_ctx分配内存
ctx = io_ring_ctx_alloc(p);


// 设置io_ring_ctx的flag
...


/*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
* polling again, they can rely on io_sq_thread to do polling
* work, which can reduce cpu usage and uring_lock contention.
*/



// 分配内存,为io_rings、cqes、sqs这三个紧邻的结构以及sqes分配内存
// allocate memory if app haven't, otherwise just map.
// The size = sizeof(io_rings) + p->cq_entries * sizeof(io_uring_cqe) + p->sq_entries * sizeof(u32) + p->sq_entries * sizeof(io_uring_sqe)
// 申请io_rings SQEs
ret = io_allocate_scq_urings(ctx, p);


// 处理poll模式的逻辑,包括初始化SQpoll内核线程
ret = io_sq_offload_create(ctx, p);



// 创建io_ring_ctx对应的file,之后用户需要这个file来访问io_ring_ctx
file = io_uring_get_file(ctx);
}



// 其中,我们对cq/sq可以进行的操作如下所示,在io_uring_get_file函数中,此结构会被放入到file->f_op中
static const struct file_operations io_uring_fops = {
.release = io_uring_release,
.mmap = io_uring_mmap,
#ifndef CONFIG_MMU
.get_unmapped_area = io_uring_nommu_get_unmapped_area,
.mmap_capabilities = io_uring_nommu_mmap_capabilities,
#else
.get_unmapped_area = io_uring_mmu_get_unmapped_area,
#endif
.poll = io_uring_poll,
#ifdef CONFIG_PROC_FS
.show_fdinfo = io_uring_show_fdinfo,
#endif
};


// io_uring_mmap的核心实现如下所示
// ctx存放于file中,通过offset获取ctx的各成员,以访问ctx->rings、cqes、sqes
// 即应用程序获取到file的fd后,通过对该file进行mmap,并传入指定的offset,即可访问ctx->rings、cqes、sqes
static void *io_uring_validate_mmap_request(struct file *file,
loff_t pgoff, size_t sz)
{
struct io_ring_ctx *ctx = file->private_data;
loff_t offset = pgoff << PAGE_SHIFT;
struct page *page;
void *ptr;

/* Don't allow mmap if the ring was setup without it */
if (ctx->flags & IORING_SETUP_NO_MMAP)
return ERR_PTR(-EINVAL);

switch (offset & IORING_OFF_MMAP_MASK) {
case IORING_OFF_SQ_RING:
case IORING_OFF_CQ_RING:
ptr = ctx->rings;
break;
case IORING_OFF_SQES:
ptr = ctx->sq_sqes;
break;
case IORING_OFF_PBUF_RING: {
unsigned int bgid;

bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
mutex_lock(&ctx->uring_lock);
ptr = io_pbuf_get_address(ctx, bgid);
mutex_unlock(&ctx->uring_lock);
if (!ptr)
return ERR_PTR(-EINVAL);
break;
}
default:
return ERR_PTR(-EINVAL);
}

page = virt_to_head_page(ptr);
if (sz > page_size(page))
return ERR_PTR(-EINVAL);

return ptr;
}

/*
* Magic offsets(byte-based) for the application to mmap the data it needs
*/
#define IORING_OFF_SQ_RING 0ULL
#define IORING_OFF_CQ_RING 0x8000000ULL
#define IORING_OFF_SQES 0x10000000ULL
#define IORING_OFF_PBUF_RING 0x80000000ULL
#define IORING_OF`F_PBUF_SHIFT 16
#define IORING_OFF_MMAP_MASK 0xf8000000ULL

如下图所示,io_uring_setup的主要功能由以下四个函数提供

io_uring_setup

  • io_ring_ctx_alloc,主要用来申请空间,初始化列表头、互斥锁、自旋锁等结构

  • io_allocate_scq_urings,初始化整个struct io_rings *rings,包括SQ/CQ头尾指针、SQECQE

    • SQCQ 头尾指针以及 CQE 都在 struct io_rings *rings 结构体中

    • SQE 则是在 struct io_ring_ctx *ctx 结构体中

  • io_sq_offload_create,根据用户通过 io_uring_setup 传递的 flags 来配置 io_uring 的运行方式

  • io_uring_get_fd 将 struct io_ring_ctx *ctx 暴露给用户态访问

io_sq_offload_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
__cold int io_sq_offload_create(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
/* Retain compatibility with failing for an invalid attach attempt */
// 检查是否和另外一个io_uring共享 SQ thread
...

if (ctx->flags & IORING_SETUP_SQPOLL) {
struct task_struct *tsk; // SQ thread
struct io_sq_data *sqd;

// sqd存放SQ thread的相关信息
sqd = io_get_sq_data(p, &attached);


// 设置ctx中SQ thread相关的信息
...
// 检查是否需要将SQ thread绑定到指定的cpu上
...

// 创建 SQ thread
tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);

// 例行操作,开启线程
wake_up_new_task(tsk);
if (ret)
goto err;
}
}

io_uring_enter

简介

通过使用io_uring_setup初始化的io_uring实例,io_uring_enter既可以提交IO请求,又可以收割IO完成事件。

1
2
3
int io_uring_enter(unsigned int fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags,
sigset_t *sig);

比较重要的flags如下

  • IORING_ENTER_GETEVENTS

    • 设置该flag后,io_uring_enter会收割至少min_complete个完成事件,否则会阻塞。当to_submit也设置了时,io_uring_enter既可以提交IO请求,又可以收割IO完成事件
  • IORING_ENTER_SQ_WAKEUP

    • 如果在io_uring_setup系统调用中,设置了**IORING_SETUP_SQPOLL **flag,即使用了SQ thread,那么sq ring中在长时间未有IO请求时,SQ thread会休眠,设置此flag后会唤醒SQ thread
  • IORING_ENTER_SQ_WAIT

    • 在设置了IORING_SETUP_SQPOLL flag后,SQ thread会处理sqes上的IO请求,那么应用程序就不知道sqes中是否还有空闲位置来提交IO请求,因此有可能在通过io_uring_enter来提交IO请求时,sqes中并没有空闲位置,那么此时就需要等待,直到有IO请求被内核处理,留出来空闲位置后,io_uring_enter才能返回
  • IORING_ENTER_REGISTERED_RING

    • 如果在多线程模式下,用io_uring_setup注册了一个io_uring实例,那么io_uring对应的file会被多个线程共享,因此在访问io_uring file对应的fd时,需要记录和设置其引用计数。例如,每次io_uring_enter前后都需要调用fdget/fdput,这样严重增大了开销。一种解决办法是将io_uring fd通过io_uring_register注册到current->io_uring_task->registered_rings中,之后便可通过该成员获取fd,不用处理fd的引用计数了。当启用此方法时,设置IORING_ENTER_REGISTERED_RING表示io_uring_enter中传入的fd不是真实的文件描述符,而是current->io_uring_task->registered_rings中的index。通过该index即可获取到对应的io_uring fd

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 处理submit和completion相关事件
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
u32, min_complete, u32, flags, const void __user *, argp,
size_t, argsz)
{
struct io_ring_ctx *ctx;
struct file *file;
long ret;


// 根据fd找到对应的file,再根据file得到io_ring_ctx
...
ctx = file->private_data;


/*
* For SQ polling, the thread will do all submissions and completions.
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT)
io_sqpoll_wait_sq(ctx); // current一直等待,直到sq_rings非满时

ret = to_submit;
} else if (to_submit) {
ret = io_uring_add_tctx_node(ctx);

mutex_lock(&ctx->uring_lock);
ret = io_submit_sqes(ctx, to_submit); // 最后是通过io_queue_sqe来提交sqe的

if (flags & IORING_ENTER_GETEVENTS) { // 处理completion事件
if (ctx->syscall_iopoll)
goto iopoll_locked;
/*
* Ignore errors, we'll soon call io_cqring_wait() and
* it should handle ownership problems if any.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) // 暂时不执行sqe,等到某一次调用io_uring_enter时,再统一执行多个sqes
(void)io_run_local_work_locked(ctx, min_complete); // The reason execute this line not in 3694 line is it needs get locked. Keep task work local to a io_ring_ctx, rather than to the submission task.
}
mutex_unlock(&ctx->uring_lock);
}

// 处理completion事件
if (flags & IORING_ENTER_GETEVENTS) {
int ret2;

// 未启用sq thread,但是启用了iopoll
if (ctx->syscall_iopoll) {
/*
* We disallow the app entering submit/complete with
* polling, but we still need to lock the ring to
* prevent racing with polled issue that got punted to
* a workqueue.
*/
mutex_lock(&ctx->uring_lock);
iopoll_locked:
ret2 = io_validate_ext_arg(flags, argp, argsz);
if (likely(!ret2)) {
min_complete = min(min_complete,
ctx->cq_entries);
ret2 = io_iopoll_check(ctx, min_complete);
}
mutex_unlock(&ctx->uring_lock);
} else { // 一直等待cq_ring
const sigset_t __user *sig;
struct __kernel_timespec __user *ts;

ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
if (likely(!ret2)) {
min_complete = min(min_complete,
ctx->cq_entries);
ret2 = io_cqring_wait(ctx, min_complete, sig,
argsz, ts);
}
}
}

io_uring_register

简介

主要用于注册/释放各种不同类型的缓冲区资源。通过提前注册这些缓冲区可以减轻后续每个 IO 的申请资源开销

1
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
void __user *arg, unsigned nr_args)
__releases(ctx->uring_lock)
__acquires(ctx->uring_lock)
{
int ret;

/*
* We don't quiesce the refs for register anymore and so it can't be
* dying as we're holding a file ref here.
*/

if (ctx->restricted) {
opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
if (!test_bit(opcode, ctx->restrictions.register_op))
return -EACCES;
}

// 根据操作码执行不同逻辑,以注册io_uring fd为例
switch (opcode) {
case IORING_REGISTER_RING_FDS:
ret = io_ringfd_register(ctx, arg, nr_args);
break;
}

return ret;
}



// 虽然只注册一个io_uring fd,但是current中的所有io_uring fds都需要更新,__arg指向一个
// io_uring_rsrc_update数组,nr_args是数组长度。
// 每个元素是一个io_uring_rsrc_update,存储一个io_uring fd以及这个io_uring fd想要存放
// 在current->io_uring_task->registerd_rings中的位置,即io_uring_rsrc_update->offset,
// 如果offset为-1,那么表示可以存放在任意位置。
//
// 函数的返回值表示有多少个io_uring fds是成功更新了的
//
int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
unsigned nr_args)
{
struct io_uring_rsrc_update __user *arg = __arg;
struct io_uring_rsrc_update reg;
struct io_uring_task *tctx;
int ret, i;


mutex_unlock(&ctx->uring_lock);
ret = __io_uring_add_tctx_node(ctx);
mutex_lock(&ctx->uring_lock);


tctx = current->io_uring;
for (i = 0; i < nr_args; i++) {
int start, end;

if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
ret = -EFAULT;
break;
}


if (reg.offset == -1U) {
start = 0;
end = IO_RINGFD_REG_MAX;
} else {
if (reg.offset >= IO_RINGFD_REG_MAX) {
ret = -EINVAL;
break;
}
start = reg.offset;
end = start + 1;
}

ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
if (ret < 0)
break;

reg.offset = ret;
if (copy_to_user(&arg[i], &reg, sizeof(reg))) {
fput(tctx->registered_rings[reg.offset]);
tctx->registered_rings[reg.offset] = NULL;
ret = -EFAULT;
break;
}
}

return i ? i : ret;
}


int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
int start, int end)
{
int offset;
// 从disired index处开始依次检验,如果都没位置,则该io_uring fd更新失败
for (offset = start; offset < end; offset++) {
offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
if (tctx->registered_rings[offset])
continue;

tctx->registered_rings[offset] = file;
return offset;
}
return -EBUSY;
}

static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
int start, int end)
{
struct file *file;
int offset;

file = fget(fd);
if (!file) {
return -EBADF;
} else if (!io_is_uring_fops(file)) {
fput(file);
return -EOPNOTSUPP;
}
offset = io_ring_add_registered_file(tctx, file, start, end);
if (offset < 0)
fput(file);
return offset;
}

io_uring api
http://example.com/io-uring-api-及实现/
作者
Yw
发布于
2024年3月26日
许可协议