Level Up Coding

Coding tutorials and news. The developer homepage gitconnected.com && skilled.dev && levelup.dev

Follow publication

Learning Rust: I/O Ring

--

Are you disappointed with select, poll, epoll or AIO? Try out the best I/O promise in the Linux landscape.

Linux has a rich history in managing I/O operations. Some mechanisms, like select and poll, which are part of POSIX, were developed in the 1980s and inherited from Unix systems. Kernel 2.5 introduced epoll in 2002, enhancing the performance of I/O operations, and it is still widely used today. The subsequent addition of AIO did not fully address the challenges in the asynchronous I/O domain. Until 2019, files could not be opened or stated truly asynchronously. The new I/O Ring (uring) seems to be a game-changer, reshaping how the system interacts with user applications.

The story isn’t about using liburing or io-uring from tokio. We are going to skip all given abstractions and talk directly to the kernel via system calls to discover and learn how the newest I/O non-blocking mechanism works. We will write two sample apps. The first one will display “Hello, World!”. The second one, a bit more complex, will print a file to the standard output. Are you ready to enjoy it?

Going bottom up may be very challenging to understand. In this story, we will begin by looking at the first demo app. It will simply print a message, but its complexity is already significant enough to get lost.

use crate::uring::*;

pub struct HelloCommand {
pub msg: &'static [u8],
}

pub enum HelloCommandExecute {
Succeeded(),
Failed(&'static [u8]),
}

impl IORingSubmitBuffer for &'static [u8] {
fn extract(self) -> (*const u8, usize) {
(self.as_ptr(), self.len())
}
}

fn fail(msg: &'static [u8]) -> HelloCommandExecute {
HelloCommandExecute::Failed(msg)
}

impl HelloCommand {
const IORING_INVALID_DESCRIPTOR: &'static [u8] = b"I/O Ring Init failed: Invalid Descriptor.\n";
const IORING_SETUP_FAILED: &'static [u8] = b"I/O Ring Init failed: Setup Failed.\n";
const IORING_MAPPING_FAILED: &'static [u8] = b"I/O Ring Init failed: Mapping Failed.\n";
const IORING_SUBMISSION_FAILED: &'static [u8] = b"I/O Ring entry submission failed.\n";
const IORING_SUBMISSION_MISMATCHED: &'static [u8] = b"I/O Ring entry submission mismatch.\n";
const IORING_COMPLETION_FAILED: &'static [u8] = b"I/O Ring entry completion failed.\n";
const IORING_COMPLETION_ERRORED: &'static [u8] = b"I/O Ring completed with failure.\n";
const IORING_SHUTDOWN_FAILED: &'static [u8] = b"I/O Ring shutdown failed.\n";
}

impl HelloCommand {
pub fn execute(&self) -> HelloCommandExecute {
let mut ring = match IORing::init(32) {
IORingInit::Succeeded(value) => value,
IORingInit::InvalidDescriptor(_) => return fail(HelloCommand::IORING_INVALID_DESCRIPTOR),
IORingInit::SetupFailed(_) => return fail(HelloCommand::IORING_SETUP_FAILED),
IORingInit::MappingFailed(_, _) => return fail(HelloCommand::IORING_MAPPING_FAILED),
};

match ring.submit([IORingSubmitEntry::write(2, self.msg, 0, 0)]) {
IORingSubmit::SubmissionFailed(_) => return fail(HelloCommand::IORING_SUBMISSION_FAILED),
IORingSubmit::SubmissionMismatched(_) => return fail(HelloCommand::IORING_SUBMISSION_MISMATCHED),
IORingSubmit::Succeeded(_) => (),
};

let entry = loop {
match ring.complete() {
IORingComplete::Succeeded(entry) => break entry,
IORingComplete::UnexpectedEmpty(_) => continue,
IORingComplete::CompletionFailed(_) => return fail(HelloCommand::IORING_COMPLETION_FAILED),
}
};

if entry.res < 0 {
return HelloCommandExecute::Failed(HelloCommand::IORING_COMPLETION_ERRORED);
}

if let IORingShutdown::Failed() = ring.shutdown() {
return fail(HelloCommand::IORING_SHUTDOWN_FAILED);
}

HelloCommandExecute::Succeeded()
}
}

The code represents a command that can be executed. The main function creates a new I/O Ring, then submits a buffer with a message for printing to the standard output. This is followed by an event processing loop, which is expected to loop only once. Finally, we check if we printed at least one character and close the I/O Ring to release all resources. When executed, it simply prints “Hello, World!”.

You’ve already learned that the I/O Ring must be created before using it. And probably noticed that we can submit I/O operations and loop to receive their completions. Finally, you are aware that the I/O Ring is a resource that needs to be cleaned up.

But what actually is an I/O Ring? You might think of it as having two queues. The first one accepts outgoing I/O requests, and the second one delivers the results of previously scheduled requests. The general idea is that submitting a request is a non-blocking operation, while looping and waiting for completion may block until something is received. You may notice in the following visualization that the Completion Queue (CQ) doesn’t deliver events in the same order as the Submission Queue (SQ).

+------------------------------------------------+
| Submission Queue (SQ) |
| +-------+ +-------+ +-------+ +-------+ |
| | SQE 1 | | SQE 2 | | SQE 3 | | SQE 4 | .. |
| +-------+ +-------+ +-------+ +-------+ |
+------------------------------------------------+
|
|
V
+------------------------------------------------+
| Completion Queue (CQ) |
| +-------+ +-------+ +-------+ +-------+ |
| | CQE 3 | | CQE 1 | | CQE 4 | | CQE 2 | .. |
| +-------+ +-------+ +-------+ +-------+ |
+------------------------------------------------+

When we discuss rings, buffers, or queues, we are referring to a memory structure with multiple slots, each having its own location. The advantage of the I/O Ring is that the memory is shared between our application and the kernel. To work with the I/O Ring, we need to initialize the ring and create a shared memory mapping. We will use the io_uring_setup system call (425) to create a new file descriptor for an I/O Ring.

pub fn sys_io_uring_setup(entries: u32, params: *mut io_uring_params) -> isize;

#[repr(C)]
pub struct io_uring_params {
pub sq_entries: u32,
pub cq_entries: u32,
pub flags: u32,
pub sq_thread_cpu: u32,
pub sq_thread_idle: u32,
pub features: u32,
pub wq_fd: u32,
pub resv: [u32; 3],
pub sq_off: io_sqring_offsets,
pub cq_off: io_cqring_offsets,
}

#[repr(C)]
pub struct io_cqring_offsets {
pub head: u32,
pub tail: u32,
pub ring_mask: u32,
pub ring_entries: u32,
pub overflow: u32,
pub cqes: u32,
pub flags: u32,
pub resv1: u32,
pub user_addr: u64,
}

#[repr(C)]
pub struct io_sqring_offsets {
pub head: u32,
pub tail: u32,
pub ring_mask: u32,
pub ring_entries: u32,
pub flags: u32,
pub dropped: u32,
pub array: u32,
pub resv1: u32,
pub user_addr: u64,
}

The system call requires us to pass the size of the SQ queue and a prefilled structure. In our examples, we will pass only the SQ queue size together with an empty structure. As a result, we will receive a file descriptor (or an error) and a structure filled by the kernel.

Just imagine we received a file descriptor number 3. The kernel also prefilled the structure, as shown in the following example (taken from strace output).

io_uring_params {
sq_entries: 32,
cq_entries: 64,
...
sq_off: io_sqring_offsets {
head: 0,
tail: 64,
ring_mask: 256,
ring_entries: 264,
flags: 276,
dropped: 272,
array: 1344,
},
cq_off: io_cqring_offsets {
head: 128,
tail: 192,
ring_mask: 260,
ring_entries: 268,
overflow: 284,
cqes: 320,
flags: 280,
},
}

We can interpret the result as follows:

  • SQ/CQ sizes: in slots, generally the CQ is twice as large as the SQ.
  • SQ Offsets: relative positions within the SQ memory mapping.
  • CQ Offsets: relative positions within the CQ memory mapping.

The prefilled structure is not the memory mapping itself; it provides information on where to find relevant details once the mapping is established. To create a mapping, we use the file descriptor returned by the previous system call. This descriptor allows access to three key areas:

  • IORING_OFF_SQ_RING: holds information about the SQ Ring.
  • IORING_OFF_SQES: holds information about SQ Ring Entries.
  • IORING_OFF_CQ_RING: holds information about the CQ Ring + Entries.

All three mappings can be created using the mmap system call. For each mapping, we need to provide the file descriptor of the I/O Ring, an offset (using predefined constants), and the mapped area length, which can be computed based on the parameters in the structure. In the example, strace recorded the following system calls:

mmap(NULL, 1472, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 3, 0) = 0x7fc1bbc11000
mmap(NULL, 2048, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 3, 0x10000000) = 0x7fc1bbc10000
mmap(NULL, 1344, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_POPULATE, 3, 0x8000000) = 0x7fc1bbc0f000

You may have noticed that we called mmap three times. Each time we passed an exact size, the same flags, and the file descriptor. The three offsets are different, indicating that each call maps a different memory location.

Using simple pointer arithmetic, we can locate the exact positions of some very interesting data:

  • SQ_TAIL = 0x7fc1bbc11000 + 64 => the u32 SQ Tail
  • SQ_RING_MASK = 0x7fc1bbc11000 + 256 => the u32 SQ Ring Mask
  • SQ_ARRAY = 0x7fc1bbc11000 + 1344 => the [u32; 32] SQ Index Array
  • CQ_HEAD = 0x7fc1bbc0f000 + 128 => the u32 CQ Head
  • CQ_TAIL = 0x7fc1bbc0f000 + 192 -> the u32 CQ Tail
  • CQ_RING_MASK = 0x7fc1bbc0f000 + 260 => the u32 CQ Ring Mask

The next two arrays are particularly noteworthy:

  • SQES = 0x7fc1bbc10000 => the array of SQ entries; they are substantial and likely mapped to a separate memory page
  • CQES = 0x7fc1bbc0f000 + 32 => the array of CQ entries; they are relatively small

Having all those pointers allows us to fully interact with I/O Ring. It may seem quite complex at the beginning, but after a short period of deeper reflection, it turns out to make a lot of sense.

When we understood the structure of I/O Ring and all four operations to initialize it we can write Rust code for it:

pub struct IORing {
fd: u32,

sq_ptr: *mut u8,
sq_ptr_len: usize,
sq_tail: *mut u32,
sq_ring_mask: *mut u32,
sq_array: *mut u32,

sq_sqes: *mut io_uring_sqe,
sq_sqes_len: usize,

cq_ptr: *mut u8,
cq_ptr_len: usize,
cq_head: *mut u32,
cq_tail: *mut u32,
cq_ring_mask: *mut u32,
cq_cqes: *mut io_uring_cqe,
}

pub enum IORingInit {
Succeeded(IORing),
InvalidDescriptor(isize),
SetupFailed(isize),
MappingFailed(&'static [u8], isize),
}

impl IORing {
pub fn init(entries: u32) -> IORingInit {
let mut params = io_uring_params::default();
let fd: u32 = match sys_io_uring_setup(entries, &mut params as *mut io_uring_params) {
value if value < 0 => return IORingInit::SetupFailed(value),
value => match value.try_into() {
Err(_) => return IORingInit::InvalidDescriptor(value),
Ok(value) => value,
},
};

fn map<T>(fd: u32, array: u32, count: u32, offset: usize) -> (isize, usize) {
let array = array as usize;
let size = core::mem::size_of::<T>() as usize;

let addr = null_mut();
let length = array + size * count as usize;

let prot = IORing::PROT_READ | IORing::PROT_WRITE;
let flags = IORing::MAP_SHARED | IORing::MAP_POPULATE;

(sys_mmap(addr, length, prot, flags, fd as usize, offset), length)
}

let sq_array = params.sq_off.array;
let sq_entries = params.sq_entries;

let offset = IORing::IORING_OFF_SQ_RING;
let (sq_ptr, sq_ptr_len) = match map::<u32>(fd, sq_array, sq_entries, offset) {
(res, _) if res <= 0 => return IORingInit::MappingFailed(b"SQ_RING", res),
(res, len) => (res as *mut u8, len),
};

let sq_tail = (sq_ptr as usize + params.sq_off.tail as usize) as *mut u32;
let sq_ring_mask = (sq_ptr as usize + params.sq_off.ring_mask as usize) as *mut u32;
let sq_array = (sq_ptr as usize + params.sq_off.array as usize) as *mut u32;

let offset = IORing::IORING_OFF_SQES;
let (sq_sqes, sq_sqes_len) = match map::<io_uring_sqe>(fd, 0, sq_entries, offset) {
(res, _) if res <= 0 => return IORingInit::MappingFailed(b"SQ_SQES", res),
(res, len) => (res as *mut io_uring_sqe, len),
};
let cq_array = params.cq_off.cqes;
let cq_entries = params.cq_entries;

let offset = IORing::IORING_OFF_CQ_RING;
let (cq_ptr, cq_ptr_len) = match map::<io_uring_cqe>(fd, cq_array, cq_entries, offset) {
(res, _) if res <= 0 => return IORingInit::MappingFailed(b"CQ_RING", res),
(res, len) => (res as *mut u8, len),
};

let cq_head = (cq_ptr as usize + params.cq_off.head as usize) as *mut u32;
let cq_tail = (cq_ptr as usize + params.cq_off.tail as usize) as *mut u32;
let cq_ring_mask = (cq_ptr as usize + params.cq_off.ring_mask as usize) as *mut u32;
let cq_cqes = (sq_ptr as usize + params.cq_off.cqes as usize) as *mut io_uring_cqe;

IORingInit::Succeeded(IORing {
fd: fd,
sq_ptr: sq_ptr,
sq_ptr_len: sq_ptr_len,
sq_tail: sq_tail,
sq_ring_mask: sq_ring_mask,
sq_array: sq_array,
sq_sqes: sq_sqes,
sq_sqes_len: sq_sqes_len,
cq_ptr: cq_ptr,
cq_ptr_len: cq_ptr_len,
cq_head: cq_head,
cq_tail: cq_tail,
cq_ring_mask: cq_ring_mask,
cq_cqes: cq_cqes,
})
}
}

Indeed, you may notice a lot of pointers. The IORing struct is designed to conceal them all, providing a relatively safe interface for any user. In the initial app, you don’t notice any pointers, do you?

I/O Ring alone doesn’t make sense if it doesn’t receive any submissions. We know that to interact with it, we need to have a file descriptor and properly mapped memory regions. When we want to submit a new async operation, we add it manually within the memory region (in three steps) and notify the kernel via a system call that we did it. We could even add n submissions and only call the kernel once.

There are all the steps to add a new submission:

  1. Find the current tail index with the applied ring mask.
  2. Fill out a Submission Queue Array pointing at the given tail index location; the value should be the index of the new entry within the Submission Queue Entry mapping (next step).
  3. Fill out a Submission Queue Entry at the desired free slot within the mapping; for simplicity, we will always write at the current tail index.
  4. Increase the current tail index by one and apply the ring mask.
  5. Notify the kernel via the io_uring_enter system call using the file descriptor.

It may sound quite complex; and it is. There are a lot of mistakes to be made here, such as incorrect pointer arithmetic, mishandling volatile memory, or missing ring mask application. I’ve already spent several hours troubleshooting it. You may visualize it as follows:

[SQ Metadata]
+-------------+-------+------+
| Ring Mask | Head | Tail |
+-------------+-------+------+
| 7 | 2 | 5 |
+-------------+-------+------+

[SQ Indices Array]
+---+---+---+---+---+---+---+---+
| | | 2 | 3 | 4 | 5 | | |
+---+---+---+---+---+---+---+---+
0 1 2 3 4 5 6 7

[SQ Entries Array]
+---+---+---+---+---+---+---+---+
| | | B | C | D | E | | | <- E represents the new submission
+---+---+---+---+---+---+---+---+
0 1 2 3 4 5 6 7

In the above example, we are adding a new SQ index in slot number 5 (because tail tells us so). For simplification, we wrote 5 and placed the SQE in the same index. However, the I/O Ring allows us to manage SQEs as we wish. For instance, it could be:

[SQ Metadata]
+-------------+-------+------+
| Ring Mask | Head | Tail |
+-------------+-------+------+
| 7 | 2 | 5 |
+-------------+-------+------+

[SQ Indices Array]
+---+---+---+---+---+---+---+---+
| | | 7 | 5 | 1 | 2 | | |
+---+---+---+---+---+---+---+---+
0 1 2 3 4 5 6 7

[SQ Entries Array]
+---+---+---+---+---+---+---+---+
| | D | E | | | C | | B | <- E represents the new submission
+---+---+---+---+---+---+---+---+
0 1 2 3 4 5 6 7

You may wonder what is inside each array. The first array of indices is simply an array of u32. However, the second array is more intriguing. You can observe its simplified Rust struct (64 bytes) in the following snippet. The actual C header contains numerous unions representing various scenarios.

#[repr(C)]
pub struct io_uring_sqe {
pub opcode: u8,
pub flags: u8,
pub ioprio: u16,
pub fd: i32,
pub off: u64,
pub addr: u64,
pub len: u32,
pub rw_flags: u32,
pub user_data: u64,
pub buf_index: u16,
pub personality: u16,
pub splice_fd_in: i32,
pub __pad2: [u64; 2],
}

Finally, I managed to abstract it in the following code:

pub enum IORingSubmit {
Succeeded(usize),
SubmissionFailed(isize),
SubmissionMismatched(usize),
}

impl IORing {
pub fn submit<T, const C: usize>(&mut self, entries: [IORingSubmitEntry<T>; C]) -> IORingSubmit
where
T: IORingSubmitBuffer,
{
let min_complete = 0;
let to_submit = entries.len() as u32;

for entry in entries.into_iter() {
let ring_mask = unsafe { read_volatile(self.sq_ring_mask) };
let sq_tail = unsafe { read_volatile(self.sq_tail) & ring_mask };

let (opcode, fd, ptr, len, offset, user_data) = match entry {
...
};

unsafe {
let sqe = self.sq_sqes.offset(sq_tail as isize);

(*sqe).opcode = opcode;
(*sqe).fd = fd as i32;
(*sqe).addr = ptr as u64;
(*sqe).len = len as u32;
(*sqe).off = offset;
(*sqe).user_data = user_data;

write_volatile(self.sq_array.add(sq_tail as usize), sq_tail);
write_volatile(self.sq_tail, (sq_tail + 1) & ring_mask);
}
}

let submitted = match sys_io_uring_enter(self.fd, to_submit, min_complete, 0, null(), 0) {
value if value < 0 => return IORingSubmit::SubmissionFailed(value),
value => value as usize,
};

if submitted != to_submit as usize {
IORingSubmit::SubmissionMismatched(submitted)
} else {
IORingSubmit::Succeeded(submitted)
}
}
}

The code replicates all the described steps. It supports only few predefined async operations, deals with submission errors, and carefully manages all pointers. You may notice we instruct the compiler that our mapped memory is volatile and that any read or write should not be overly optimized.

One note: we are solely responsible for managing the tail of the queue, but we don’t touch the head at all. The Linux kernel takes responsibility for reading and thus maintaining the head.

When we need to engage with the Completion Queue, we switch roles and become consumers of a very similar queue, but here, the kernel acts as the producer. Imagine a simplified queue (twice as large) that lacks an intermediate layer of indices. We can identify the positions of the head and the tail. If managed correctly, we need to consume as many items as are situated between the head and tail. We might even attempt this without any system call. It’s expected that the sequence of modifications in the queue will be maintained, ensuring that the most recent changes to the head or tail emulate atomicity.

Let’s visualize the Completion Queue:

[CQ Metadata]
+-------------+-------+------+
| Ring Mask | Head | Tail |
+-------------+-------+------+
| 7 | 3 | 6 |
+-------------+-------+------+

[CQ Entries Array]
+---+---+---+---+---+---+---+---+
| | | | A | B | C | D | | <- Entries represent completed operations
+---+---+---+---+---+---+---+---+
0 1 2 3 4 5 6 7

All CQ entries are relatively small, and their size depends on the enabled I/O Ring flags. But let’s assume they are structured like the following:

#[repr(C)]
pub struct io_uring_cqe {
pub user_data: u64,
pub res: i32,
pub flags: u32,
}

We see that each entry returns the user data we passed when we queued the SQE. It’s very useful for passing the context and correlating operations. We also receive a result, which must be interpreted according to each operation. A negative value mostly indicates an error. As far as I have noticed, the flags field is not used.

Let’s discuss the following implementation of checking the completion queue:

pub enum IORingComplete {
Succeeded(IORingCompleteEntry),
UnexpectedEmpty(usize),
CompletionFailed(isize),
}

pub struct IORingCompleteEntry {
pub res: i32,
pub flags: u32,
pub user_data: u64,
}

impl IORing {
fn extract(&self) -> Option<IORingCompleteEntry> {
let ring_mask = unsafe { read_volatile(self.cq_ring_mask) };
let cq_head = unsafe { read_volatile(self.cq_head) };
let cq_tail = unsafe { read_volatile(self.cq_tail) };

if cq_head == cq_tail {
return None;
}

let index = cq_head & ring_mask;
let entry = unsafe { self.cq_cqes.offset(index as isize) };
let entry = IORingCompleteEntry {
res: unsafe { (*entry).res },
flags: unsafe { (*entry).flags },
user_data: unsafe { (*entry).user_data },
};

unsafe { write_volatile(self.cq_head, cq_head + 1) };
Some(entry)
}

pub fn complete(&self) -> IORingComplete {
if let Some(entry) = self.extract() {
return IORingComplete::Succeeded(entry);
}

let to_submit = 0;
let min_complete = 1;
let flags = IORing::IORING_ENTER_GETEVENTS;

let count = match sys_io_uring_enter(self.fd, to_submit, min_complete, flags, null(), 0) {
value if value < 0 => return IORingComplete::CompletionFailed(value),
value => value as usize,
};

if let Some(entry) = self.extract() {
IORingComplete::Succeeded(entry)
} else {
IORingComplete::UnexpectedEmpty(count)
}
}
}

The extract function carefully tries to determine if any entry reached the queue. If tail is equal to head, we conclude that nothing was added. Therefore, we try to wait for one event using the io_uring_enter system call, additionally passing a flag to block the call until one event is ready to be returned. Then we do another attempt of extraction.

Interestingly, what I noticed is the rare situation (1 case per 1M calls) where we receive information that something is ready, but upon checking, head/tail cannot extract it. In this case, we report it to the caller.

Yes, we have already covered all three most important functions: I/O Ring initialization, submission of new entries, and checking completions. One remaining function is to shut down the I/O Ring. It won’t be called too often. The I/O Ring buffer is meant to live long, as long as the event loop lives. But eventually, at the end of the loop’s life, we can gracefully close it:

pub enum IORingShutdown {
Succeeded(),
Failed(),
}

impl IORing {
pub fn shutdown(self) -> IORingShutdown {
let mut failed = false;

failed = failed || 0 != sys_munmap(self.sq_ptr, self.sq_ptr_len);
failed = failed || 0 != sys_munmap(self.sq_sqes as *mut u8, self.sq_sqes_len);
failed = failed || 0 != sys_munmap(self.cq_ptr, self.cq_ptr_len);
failed = failed || 0 > sys_close(self.fd);

if failed {
IORingShutdown::Failed()
} else {
IORingShutdown::Succeeded()
}
}
}

It’s time for a demo of the second app. This time, we’re going to build a cat-like application to print the content of an arbitrary file passed as a command-line argument. We will print it to the standard output. The very beginning resembles our initial “Hello, World!” application:

pub enum CatCommandExecute {
Succeeded(),
Failed(&'static [u8]),
}

fn fail(msg: &'static [u8]) -> CatCommandExecute {
CatCommandExecute::Failed(msg)
}

impl CatCommand<'_> {
const IORING_INVALID_DESCRIPTOR: &'static [u8] = b"I/O Ring Init failed: Invalid Descriptor.\n";
const IORING_SETUP_FAILED: &'static [u8] = b"I/O Ring init failed: Setup Failed.\n";
const IORING_MAPPING_FAILED: &'static [u8] = b"I/O Ring init failed: Mapping Failed.\n";
const IORING_SUBMISSION_FAILED: &'static [u8] = b"I/O Ring entry submission failed.\n";
const IORING_SUBMISSION_MISMATCHED: &'static [u8] = b"I/O Ring entry submission mismatch.\n";
const IORING_COMPLETION_FAILED: &'static [u8] = b"I/O Ring entry completion failed.\n";
const IORING_UNKNOWN_USER_DATA: &'static [u8] = b"I/O Ring returned unknown user data.\n";
const IORING_SHUTDOWN_FAILED: &'static [u8] = b"I/O Ring shutdown failed.\n";

const MEMORY_ALLOCATION_FAILED: &'static [u8] = b"Cannot allocate memory.\n";
const MEMORY_DEALLOCATION_FAILED: &'static [u8] = b"Cannot release memory.\n";
const MEMORY_SLICING_FAILED: &'static [u8] = b"Cannot slice memory.\n";

const FILE_OPENING_FAILED: &'static [u8] = b"Cannot open source file.\n";
const FILE_READING_FAILED: &'static [u8] = b"Cannot read source file.\n";
const FILE_WRITING_FAILED: &'static [u8] = b"Cannot read target file.\n";
const FILE_CLOSING_FAILED: &'static [u8] = b"Cannot close source file.\n";

const APP_INVALID_TOKEN_STATE: &'static [u8] = b"Invalid token state.\n";
}

impl CatCommand<'_> {
pub fn execute(&self) -> CatCommandExecute {
let mut ring = match IORing::init(32) {
IORingInit::Succeeded(value) => value,
IORingInit::InvalidDescriptor(_) => return fail(CatCommand::IORING_INVALID_DESCRIPTOR),
IORingInit::SetupFailed(_) => return fail(CatCommand::IORING_SETUP_FAILED),
IORingInit::MappingFailed(_, _) => return fail(CatCommand::IORING_MAPPING_FAILED),
};

let mut buffer = match mem_alloc(32 * 4096) {
MemoryAllocation::Failed(_) => return fail(CatCommand::MEMORY_ALLOCATION_FAILED),
MemoryAllocation::Succeeded(value) => value,
};

if let Err(msg) = copy(&mut ring, &mut buffer, self.src, 1) {
return fail(msg);
}

if let IORingShutdown::Failed() = ring.shutdown() {
return fail(CatCommand::IORING_SHUTDOWN_FAILED);
}

if let MemoryDeallocation::Failed(_) = mem_free(buffer) {
return fail(CatCommand::MEMORY_DEALLOCATION_FAILED);
}

CatCommandExecute::Succeeded()
}
}

In this snippet, we are additionally allocating 128kB for the buffer and passing control to the copy function, which is responsible for looping through all completions. The copy function is also responsible for maintaining various states:

enum TokenState {
Idle(),
Opening(),
Reading(u32, usize),
Read(u32, usize, bool),
Writing(u32, usize, usize),
Closing(),
}

fn submit<T>(ring: &mut IORing, op: IORingSubmitEntry<T>) -> Result<(), &'static [u8]>
where
T: IORingSubmitBuffer,
{
match ring.submit([op]) {
IORingSubmit::SubmissionFailed(_) => Err(CatCommand::IORING_SUBMISSION_FAILED),
IORingSubmit::SubmissionMismatched(_) => Err(CatCommand::IORING_SUBMISSION_MISMATCHED),
IORingSubmit::Succeeded(_) => Ok(()),
}
}

fn copy(ring: &mut IORing, buf: &MemoryAddress, src: &CStr, dst: u32) -> Result<(), &'static [u8]> {
let mut tokens = [TokenState::Idle(), TokenState::Idle()];
submit(ring, IORingSubmitEntry::open_at(src, 0))?;
tokens[0] = TokenState::Opening();

loop {
let entry = loop {
match ring.complete() {
IORingComplete::UnexpectedEmpty(_) => continue,
IORingComplete::Succeeded(entry) => break entry,
IORingComplete::CompletionFailed(_) => return Err(CatCommand::IORING_COMPLETION_FAILED),
}
};

let token = match tokens.get(entry.user_data as usize) {
None => return Err(CatCommand::IORING_UNKNOWN_USER_DATA),
Some(value) => value,
};

match token {
TokenState::Opening() => {
let fd: u32 = match entry.res {
value if value < 0 => return Err(CatCommand::FILE_OPENING_FAILED),
value => match value.try_into() {
Err(_) => return Err(CatCommand::IORING_INVALID_DESCRIPTOR),
Ok(value) => value,
},
};

submit(ring, IORingSubmitEntry::read(fd, buf, 0, 0))?;
tokens[0] = TokenState::Reading(fd, 0);
}
TokenState::Reading(fd, len) => {
let read = match entry.res {
value if value < 0 => return Err(CatCommand::FILE_READING_FAILED),
value => value as usize,
};

let buf = match buf.between(0, read) {
MemorySlicing::Succeeded(data) => data,
_ => return Err(CatCommand::MEMORY_SLICING_FAILED),
};

submit(ring, IORingSubmitEntry::write(dst, &buf, 0, 1))?;
tokens[0] = TokenState::Read(*fd, *len + read, read == 0);
tokens[1] = TokenState::Writing(dst, read, 0);
}
TokenState::Writing(dst_fd, len, off) => {
let written = match entry.res {
value if value < 0 => return Err(CatCommand::FILE_WRITING_FAILED),
value => value as usize,
};

if *len == *off + written {
match tokens[0] {
TokenState::Read(src_fd, read, completed) if !completed => {
submit(ring, IORingSubmitEntry::read(src_fd, buf, read as u64, 0))?;
tokens[1] = TokenState::Idle();
tokens[0] = TokenState::Reading(src_fd, read);
}
TokenState::Read(src_fd, _, _) => {
submit(ring, IORingSubmitEntry::close(src_fd, 0))?;
tokens[1] = TokenState::Idle();
tokens[0] = TokenState::Closing();
}
_ => return Err(CatCommand::APP_INVALID_TOKEN_STATE),
}
} else {
let buf = match buf.between(*off + written, *len) {
MemorySlicing::Succeeded(data) => data,
_ => return Err(CatCommand::MEMORY_SLICING_FAILED),
};

submit(ring, IORingSubmitEntry::write(*dst_fd, &buf, 0, 1))?;
tokens[1] = TokenState::Writing(*dst_fd, *len, *off + written);
}
}
TokenState::Closing() => {
return match entry.res {
value if value < 0 => Err(CatCommand::FILE_CLOSING_FAILED),
_ => Ok(()),
}
}
_ => return Err(CatCommand::APP_INVALID_TOKEN_STATE),
}
}
}

You can see that the code is quite clumsy and one can easily get lost. It’s the drawback of dealing directly with an event loop without any abstraction (I don’t even mean async/await). Here, I used only two file descriptors, but imagine writing a torrent client when it operates on hundreds of network sockets and potentially thousands of files.

Nevertheless, it’s only a demonstration proving that I/O Ring works fully asynchronously, even with opening and closing a file.

Do you expect any benchmark results? I did try to compare it with the regular cat command, where the input file was 22GB, transferred from my host Windows machine. Here is what I got:

# I/O Ring
real 2m9.968s
user 0m0.170s
sys 0m10.327s

# regular cat
real 2m31.557s
user 0m0.103s
sys 0m4.365s

You can see that the time spent in the kernel is significantly higher in the I/O Ring version, indicating that the value added in such a simple application is questionable. I also compared the number of system calls in both versions. Both of them made exactly the same number of system calls, around 350k. What’s the conclusion? The system calls related to I/O Ring are more expensive compared to regular read/write system calls. Or is it just an oversimplification / confirmation bias?

The world of truly asynchronous I/O is already open. Now, in Linux, we can do almost everything asynchronously with sockets and files. Imagine being able to write an I/O app using only a single thread, which can handle thousands of operations without any issue, as long as your I/O hardware can cope with it and you are not mixing in CPU-bound operations.

If you are interested in the implementation details, don’t hesitate to clone my examples and play with them: https://github.com/amacal/learning-rust/blob/io-uring/src/hello.rs

--

--

Written by Adrian Macal

Software & Data Engineer by day, low-level coder by night.

Responses (3)