Learning Rust: Bare Threading
Do you know how a thread in Linux is spawned? What about using raw pipes to schedule work on a CPU? Can we transfer closures between threads?
Concurrency and multi-threading are essential in system programming. Concurrency refers to the ability to manage multiple I/O-bound tasks simultaneously, giving the impression that they are progressing at the same time, even if they are executed within a single thread. Multi-threading involves creating multiple threads to handle CPU-bound tasks, achieving true parallelism if the CPU has multiple cores. Both terms are sometimes confused because they both deal with the simultaneous execution of tasks.
One way of creating concurrency is an event loop. I recently played with uring available in Linux and created a working version of an async/await runtime in a no-std environment. It runs in the main thread and creates an impression of parallelly executed tasks. Let’s consider this example:
pub struct HelloCommand {
pub msg: &'static [u8],
}
impl HelloCommand {
pub async fn execute(self) -> Option<&'static [u8]> {
let stdout = open_stdout();
let written = match write_stdout(&stdout, self.msg).await {
StdOutWriteResult::Succeeded(_, written) => written,
StdOutWriteResult::OperationFailed(_, _) => return Some(APP_STDOUT_FAILED),
StdOutWriteResult::InternallyFailed() => return Some(APP_INTERNALLY_FAILED),
};
if written as usize != self.msg.len() {
return Some(APP_STDOUT_INCOMPLETE);
}
None
}
}
In this example, we print a message to the standard output. The key point worth noting is that it runs as an async function and doesn’t block. Behind the scenes, it uses the I/O Ring runtime.
We already know how to write to a std-out, so we can easily write to a pipe. A lot of Linux components are abstracted in Linux as file descriptors, therefore they share a similar mechanism. But what is a pipe, you may ask.
A pipe is a magic buffered tunnel that allows messages to be passed in only one direction. Specifically, there are writer and reader endpoints. Both of them have separate file descriptors. We can create them using the pipe2 system call. It accepts an array of two 32-bit integers for the created descriptors. It also takes optional flags:
pub fn sys_pipe2(pipefd: *mut u32, flags: u32) -> isize {
unsafe {
let ret: isize;
asm!(
"syscall",
in("rax") 293,
in("rdi") pipefd,
in("rsi") flags,
lateout("rcx") _,
lateout("r11") _,
lateout("rax") ret,
options(nostack)
);
ret
}
}
The system call delivers a pipe as two file descriptors. The first one is a reader, where the other one must be a writer, right? When we write a message into pipe the content is buffered by the kernel. We can check its default size behind kernel setting:
# cat /proc/sys/fs/pipe-max-size
1048576
Having all those facts in mind, we could write a Hello World application, which writes a message to a pipe, reads it, and finally prints it in the std-out:
pub struct PipeCommand {
pub msg: &'static [u8],
}
impl PipeCommand {
pub async fn execute(self) -> Option<&'static [u8]> {
let stdout = open_stdout();
let (reader, writer) = match create_pipe() {
CreatePipe::Succeeded((reader, writer)) => (reader, writer),
_ => return Some(APP_PIPE_CREATING_FAILED),
};
let reader = spawn(async move {
let buffer = match mem_alloc(1 * 4096) {
MemoryAllocation::Failed(_) => return Some(APP_MEMORY_ALLOC_FAILED),
MemoryAllocation::Succeeded(value) => value.droplet(),
};
let (buffer, cnt) = match read_pipe(&reader, buffer).await {
PipeReadResult::Succeeded(buffer, cnt) => (buffer, cnt),
_ => return Some(APP_PIPE_READING_FAILED),
};
let slice = match buffer.between(0, cnt as usize) {
HeapSlicing::Succeeded(value) => value,
_ => return Some(APP_MEMORY_SLICE_FAILED),
};
let written = match write_stdout(&stdout, &slice).await {
StdOutWriteResult::Succeeded(_, written) => written,
_ => return Some(APP_STDOUT_FAILED),
};
if written as usize != self.msg.len() {
return Some(APP_STDOUT_INCOMPLETE);
}
match close_pipe(reader).await {
PipeCloseResult::Succeeded() => None,
_ => Some(APP_PIPE_CLOSING_FAILED),
}
});
let writer = spawn(async move {
match write_pipe(&writer, self.msg).await {
PipeWriteResult::Succeeded(_, _) => (),
_ => return Some(APP_PIPE_WRITING_FAILED),
}
match close_pipe(writer).await {
PipeCloseResult::Succeeded() => None,
_ => Some(APP_PIPE_CLOSING_FAILED),
}
});
match reader.await {
SpawnResult::Succeeded() => (),
_ => return Some(APP_IO_SPAWNING_FAILED),
}
match writer.await {
SpawnResult::Succeeded() => (),
_ => return Some(APP_IO_SPAWNING_FAILED),
}
None
}
}
The listing may be a bit confusing, because it relies entirely on my own async/await runtime. It demonstrates that pipes are working well with I/O Ring.
Pipes are great for simplified IPC without relying on any synchronization primitives. Having some threads, we can pass messages between a main thread and a worker thread. A thread in Linux is an interesting thing. We don’t create a thread; we clone our own process and instruct the kernel that we will share virtual memory, file descriptors, signal handlers or the thread group.
The most interesting part of spawning a new thread is where the child thread starts. We don’t pass a function pointer like high-level libraries abstract for us. The new thread continues exactly where we finished calling the system call to clone ourselves. What distinguishes a parent thread from the child thread? The stack and the system call result. We are responsible for creating a stack in advance before calling the system call, and the system call will return a positive number in the parent and zero in the child thread.
The idea of cloning and using an if statement in our code to distinguish threads is a bit odd and, unfortunately, a recommended one. It may cause some issues when we want to use some variables available only on the parent stack. The compiler won’t be able to get it right. Fortunately, there is a great blog post by Chris Wellons. He describes a nice trick to avoid user code branching in favor of preparing a stack in such a way that the processor will automatically jump and call the correct function.
Let me explain it deeper. Imagine both stacks just after cloning:
+-----------------------------+ +-----------------------------+
| Parent | | Child |
0x1000 +-----------------------------+ +-----------------------------+ 0xa000
| | | |
| | | |
RSP | | | |
0x1d00 |-----------------------------| | |
| return: 0xb700 | | |
0x1e00 |-----------------------------| | |
| return: 0xb980 | | |
| variable: val3 | | |
| variable: val4 | | |
0x1f00 |-----------------------------| | |
| return: 0xb120 | | |
| variable: val1 | | |
| variable: val2 | | | RSP
0x2000 +-----------------------------+ +-----------------------------+ 0xb000
Remembering that the stack grows downwards, we can observe that the current parent RSP register points to 0x1d00
, which contains all local stack variables and stores a pointer to a function to continue (0xb700
) if the RET instruction is executed. The RET instruction would pop a value from the stack and place it in the RIP register. On the child side, we have nothing, so the child code cannot reference any variable or return to the caller.
Do you think we could tamper it? Yes, we can do it. What if a newly created stack would contain just at the top the address of the function to return, but in our case the address function to start a thread. It would mean if the code called RET the code behind the pointer would be executed. The trick will only work if both stacks are synchronized in a such way, that after a system call they will execute exactly the same code and each thread will execute correct return call.
It might look like the following memory map:
+-----------------------------+ +-----------------------------+
| Parent | | Child |
0x1000 +-----------------------------+ +-----------------------------+ 0xa000
| | | |
| | | |
RSP | | | |
0x1d00 |-----------------------------| | |
| return: 0xb700 | | |
0x1e00 |-----------------------------| | |
| return: 0xb980 | | |
| variable: val3 | | |
| variable: val4 | | |
0x1f00 |-----------------------------| | |
| return: 0xb120 | | | RSP
| variable: val1 | |-----------------------------| 0xaf80
| variable: val2 | | return: 0xb800 |
0x2000 +-----------------------------+ +-----------------------------+ 0xb000
Imagine that we are still in a parent call and our stack points to 0x1d00
. The stack contains information that in case of executing the RET instruction, it will continue at 0xb700
(and increase RSP). If the code we are currently executing clones a process as a thread, passing 0xaf80
as the new RSP for a child, it will also encounter the same RET instruction, but it will continue at 0xb800
because the stack points to it. It’s beautiful.
We can go one extra mile and prepare the stack deeper. I wish the code behind 0xb800
could also be able to use a function argument — a pointer to a struct containing worker arguments. I would like to place a struct at the end of the stack (its bottom) and a pointer to it in the RDI register (System V ABI). The final memory layout might look like this one:
+-----------------------------+ +-----------------------------+
| Parent | | Child |
0x1000 +-----------------------------+ +-----------------------------+ 0xa000
| | | |
RSP | | | |
0x1c00 |-----------------------------| | |
| current RDI value | | | RSP
0x1d00 |-----------------------------| |-----------------------------| 0xad00
| return: 0xb700 | | 0xae80 |
0x1e00 |-----------------------------| |-----------------------------| 0xae00
| return: 0xb980 | | return: 0xb800 |
| variable: val3 | |-----------------------------| 0xae80
| variable: val4 | | struct: |
0x1f00 |-----------------------------| | field: val1 |
| return: 0xb120 | | field: val2 |
| variable: val1 | | field: val3 |
| variable: val2 | | field: val4 |
0x2000 +-----------------------------+ +-----------------------------+ 0xb000
You can notice that currently the top of the stack contains a value to be popped into the RDI register just before returning. In this way, it will contain the previous unchanged value of RDI in the parent thread, and a pointer to the struct to be seen as the first parameter of the function located at 0xae80
memory address.
Now, we can write rust and assembly code for it:
arch::global_asm!(
"
.global _start_thread;
_start_thread:
push rdi; // flags
sub rsi, 16; // stack
mov [rsi], rcx; // seed
mov [rsi + 8], rdx; // func
mov rax, 56;
syscall;
pop rdi;
ret
"
);
#[repr(C)]
struct WorkerArgs {
...
}
extern "C" {
fn _start_thread(flags: u64, stack: *mut (), func: extern "C" fn(&WorkerArgs) -> !, seed: u64) -> isize;
}
unsafe fn start_thread(heap: &Heap, func: extern "C" fn(&WorkerArgs) -> !, args: WorkerArgs) -> isize {
// preparing flags to clone as thread
let flags = CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND | CLONE_THREAD;
// pointing to the end of the created stack
let size = mem::size_of::<WorkerArgs>();
let stack = (heap.ptr as *mut u8).add(heap.len - size);
// copy worker args on new stack
*(stack as *mut WorkerArgs) = args;
// we don't care about handing negative results here
_start_thread(flags, stack as *mut (), func, stack as u64)
}
Remembering the System V ABI, we know that all four parameters will be placed in the RDI, RSI, RDX, and RCX registers. Thus, we can magically rearrange the memory. The parent thread will return to the caller of the start_thread
function, and the child thread will start the passed function and will never return.
The third component in our equation are Rust’s closures. When you work with them in Rust, you may have the impression they are just like function pointers. But actually, closures are a bit more like structures with a single function delivered behind a trait. If we reflect on them, we can imagine they have a state, similar to Rust’s futures. The state is automatically intercepted by the compiler from the surrounding environment, sometimes as references and sometimes as values. What does it mean for us? Exactly like futures, we can try to copy them to the heap. Imagine a heap containing the following data:
#[repr(C)]
struct CallableHeader {
data: [usize; 4],
call: fn(&Heap) -> Option<&'static [u8]>,
}
#[repr(C)]
struct CallableArgs<F, R, E>
where
F: FnOnce() -> Result<R, E>,
{
header: CallableHeader,
target: Option<F>,
result: Option<Result<R, E>>,
}
The header contains an arbitrary 32 bytes and a pointer to a function. It’s fixed on top of the heap. It is extended by a captured closure and its result is gathered at the end of its execution. Notice that the header has a fixed size and does not depend on the type of closure or its returned value size.
The data on the heap may be wrapped with the following struct:
pub struct CallableTarget {
target: Heap,
call: fn(&Heap) -> Option<&'static [u8]>,
}
pub enum CallableTargetAllocate {
Succeeded(CallableTarget),
AllocationFailed(isize),
}
impl CallableTarget {
pub fn allocate<F, R, E>(target: F) -> CallableTargetAllocate
where
F: FnOnce() -> Result<R, E> + Send,
{
fn call<F, R, E>(target: &Heap) -> Option<&'static [u8]>
where
F: FnOnce() -> Result<R, E>,
{
let mut args: View<CallableArgs<F, R, E>> = target.view();
let result: Option<&[u8]> = args.call();
result
}
let len = mem::size_of::<CallableArgs<F, R, E>>();
trace1(b"allocating callable; size=%d\n", len);
let ((ptr, len), mut data) = match mem_alloc(len) {
MemoryAllocation::Succeeded(heap) => (heap.as_ptr(), heap.boxed::<CallableArgs<F, R, E>>()),
MemoryAllocation::Failed(err) => return CallableTargetAllocate::AllocationFailed(err),
};
data.result = None;
data.target = Some(target);
data.header = CallableHeader {
data: [ptr, len, 0, 0],
call: call::<F, R, E>,
};
CallableTargetAllocate::Succeeded(Self {
target: data.into(),
call: call::<F, R, E>,
})
}
}
impl<F, R, E> CallableArgs<F, R, E>
where
F: FnOnce() -> Result<R, E>,
{
pub fn call(&mut self) -> Option<&'static [u8]> {
self.result = match self.target.take() {
None => return Some(b"calling callable; failed"),
Some(target) => Some(target.call_once(())),
};
None
}
}
The struct offers the allocation of a new callable, which allocates enough memory for both structs and moves the closure to the heap. It erases all the types but preserves a pointer to a function that remembers all generic types. We can also call the callable and fetch the collected result:
impl CallableTarget {
pub fn call(&mut self) -> Option<&'static [u8]> {
trace2(
b"dispatching callable; target=%x, size=%d\n",
self.target.ptr,
self.target.len,
);
(self.call)(&mut self.target)
}
pub fn result<F, R, E>(self) -> Option<Result<R, E>>
where
F: FnOnce() -> Result<R, E>,
{
let value = self.target.view::<CallableArgs<F, R, E>>().result.take();
self.release();
value
}
pub fn release(mut self) {
trace1(b"releasing callable; addr=%x\n", self.target.ptr);
mem_free(&mut self.target);
}
}
So far we have learnt that we have pipes that allow us to send bytes. We saw that it may work within the same thread and with I/O Ring, but pipes will also work with multiple threads. We have learnt how to spawn a new thread using a system call. We managed to avoid branching by writing smart assembly code. Finally, we built a struct which takes a callable, erases its types, but still allows us to call it. We are well prepared to connect all of them to build an engine that runs closures on available threads.
Let’s execute a callable task in a worker thread. We will start with the following structures:
pub struct Worker {
incoming: u32,
outgoing: u32,
}
#[repr(C)]
struct WorkerArgs {
stack_ptr: usize,
stack_len: usize,
incoming: u32,
outgoing: u32,
}
A worker contains file descriptors for two pipes. One is outgoing to send a payload to a thread, and the other is to receive a payload from a thread. Having two pairs of pipes enables bi-directional communication. When we create a thread, we also receive two file descriptors from the thread’s perspective. The worker thread will also be responsible for managing its stack, as it needs to be destroyed at the end of the worker’s life. How do we create a worker? Let’s analyze it:
pub enum WorkerStart {
Succeeded(Worker),
StartFailed(isize),
StackFailed(isize),
PipesFailed(isize),
}
impl Worker {
pub fn start() -> WorkerStart {
let mut pipefd = [0; 4];
let ptr = pipefd.as_mut_ptr();
fn release_pipes(result: isize, pipefd: [u32; 4]) -> WorkerStart {
for fd in pipefd {
if fd > 0 {
sys_close(fd);
}
}
WorkerStart::PipesFailed(result)
}
match sys_pipe2(unsafe { ptr.add(0) }, O_DIRECT) {
result if result < 0 => return release_pipes(result, pipefd),
_ => (),
}
match sys_pipe2(unsafe { ptr.add(2) }, O_DIRECT) {
result if result < 0 => return release_pipes(result, pipefd),
_ => (),
}
// we need to have a stack
let mut heap = match mem_alloc(4096) {
MemoryAllocation::Succeeded(heap) => heap,
MemoryAllocation::Failed(err) => {
release_pipes(err, pipefd);
return WorkerStart::StackFailed(err);
}
};
// args will be passed directly to newly created thread
// and must contain incoming and outgoing pipes
let args = WorkerArgs {
stack_ptr: heap.ptr,
stack_len: heap.len,
incoming: pipefd[0],
outgoing: pipefd[3],
};
// now we can start a thread
let tid = match unsafe { start_thread(&heap, worker_callback, args) } {
result if result > 0 => result as u32,
result => {
mem_free(&mut heap);
release_pipes(result, pipefd);
return WorkerStart::StartFailed(result);
}
};
trace4(
b"worker spawned; tid=%d, heap=%x, in=%d, out=%d\n",
tid,
heap.ptr,
pipefd[3],
pipefd[1],
);
let worker = Worker {
incoming: pipefd[2],
outgoing: pipefd[1],
};
WorkerStart::Succeeded(worker)
}
pub fn release(self) {
sys_close(self.incoming);
sys_close(self.outgoing);
}
}
The code creates two pairs of pipes, a stack of 4096 bytes, and worker arguments. Then a magic function is called to rearrange the stack, copy the args, and call the kernel. If everything works as expected, the following function is the entry point of the thread:
extern "C" fn worker_callback(args: &WorkerArgs) -> ! {
...
let res = sys_close(args.incoming);
trace2(b"terminating thread; in=%d, res=%d\n", args.incoming, res);
let res = sys_close(args.outgoing);
trace2(b"terminating thread; out=%d, res=%d\n", args.outgoing, res);
// releasing stack memory and exiting current thread
trace2(b"terminating thread; heap=%x, len=%d\n", args.stack_ptr, args.stack_len);
unsafe { _stop_thread(args.stack_ptr, args.stack_len) }
}
What happens here? We are just closing both pipes without inspecting the results. We couldn’t react at all if any result is negative. We also free the allocated stack memory and exit the thread. How? Look at the following assembly snippet:
arch::global_asm!(
"
.global _stop_thread;
_stop_thread:
mov rax, 11;
syscall;
mov rax, 60;
syscall;
"
);
extern "C" {
fn _stop_thread(stack_ptr: usize, stack_len: usize) -> !;
}
The code is responsible for freeing memory using the munmap (11) system call, followed by the exit (60) system call. It’s done in assembly to avoid any accidental access to the stack once the memory has been freed.
What could we place in the worker thread? We won’t put any async code, because it would complicate everything. What about a simple loop waiting for callable tasks to be executed? Let’s see this idea in the following snippet:
extern "C" fn worker_callback(args: &WorkerArgs) -> ! {
let mut buffer: [usize; 2] = [0; 2];
let ptr = buffer.as_mut_ptr() as *mut ();
loop {
// read 16-bytes from the main thread
let received = sys_read(args.incoming, ptr, 16);
trace2(b"worker received bytes; fd=%d, size=%d\n", args.incoming, received);
if received != 16 {
break;
}
trace2(b"worker received bytes; addr=%x, size=%d\n", buffer[0], buffer[1]);
let heap = Heap::at(buffer[0], buffer[1]);
let mut target: CallableTarget = CallableTarget::from(heap);
// calling the function behind the heap
match target.call() {
None => trace0(b"worker called target; successfully\n"),
Some(err) => trace1(b"worker called target; %s\n", err),
}
// reporting one byte
let res = sys_write(args.outgoing, ptr, 1);
trace1(b"worker completed; res=%d\n", res);
}
let res = sys_close(args.incoming);
trace2(b"terminating thread; in=%d, res=%d\n", args.incoming, res);
let res = sys_close(args.outgoing);
trace2(b"terminating thread; out=%d, res=%d\n", args.outgoing, res);
// releasing stack memory and exiting current thread
trace2(b"terminating thread; heap=%x, len=%d\n", args.stack_ptr, args.stack_len);
unsafe { _stop_thread(args.stack_ptr, args.stack_len) }
}
The above snippet enters an almost infinite loop and waits for 16 bytes of data. It expects that such pairs of heap pointers and heap lengths will reconstruct a callable struct. Indeed, it’s possible. Easy-peasy, we can call the callable and write back to the outgoing pipe one byte. The one byte is used only as a signal that we are ready. We won’t free the callable object, and we won’t interpret the result. But how are we able to reconstruct a callable from just a pointer? Let’s examine it:
#[repr(C)]
struct CallableHeader {
data: [usize; 4],
call: fn(&Heap) -> Option<&'static [u8]>,
}
#[repr(C)]
struct CallableArgs<F, R, E>
where
F: FnOnce() -> Result<R, E>,
{
header: CallableHeader,
target: Option<F>,
result: Option<Result<R, E>>,
}
impl CallableTarget {
fn new(target: Heap, call: fn(&Heap) -> Option<&'static [u8]>) -> Self {
Self { target, call }
}
pub fn from(heap: Heap) -> Self {
let header: View<CallableHeader> = heap.view();
let target: CallableTarget = CallableTarget::new(heap, header.call);
target
}
}
Do you remember that the header is a fixed size and it’s at the beginning of our heap? We can just read it to extract a call field, which together with the heap represents a callable target.
We learned how to build a worker with an infinite loop accepting any closure wrapped in a callable target object. Now, it’s time to send some work to it. Do you remember that our worker structure contains two file descriptors? They are meant to communicate with the loop we constructed in the previous paragraph.
pub enum WorkerExecute {
Succeeded(IORingSubmitEntry<*const u8>),
OutgoingPipeFailed(isize)
}
impl Worker {
pub fn execute(&mut self, callable: &CallableTarget) -> WorkerExecute {
let ((ptr, _), len) = (callable.as_ptr(), 16);
// we expect here to not have any blocking operation because worker waits for it
trace2(b"worker sends bytes; ptr=%x, len=%d\n", ptr, len);
let res = sys_write(self.outgoing, ptr as *mut (), len);
// we sends exactly 16 bytes, containing (ptr, len) of the heap
trace3(b"worker sends bytes; fd=%d, size=%d, res=%d\n", self.outgoing, len, res);
if res != len as isize {
return WorkerExecute::OutgoingPipeFailed(res);
}
// asynchronous operation has to be returned referencing callable's header
WorkerExecute::Succeeded(IORingSubmitEntry::read(self.incoming, (ptr + 16) as *const u8, 1, 0))
}
}
Initially, we get a pointer to the heap of our callable, then we send the first 16 bytes to the outgoing pipe. But why 16 bytes? You probably remember that this number is also used when reading from the other end of the pipe. The other end of the pipe expects that the 16 bytes are a pair of a pointer and a length of the heap. Where are those values set? It’s done during the allocation of a callable:
impl CallableTarget {
pub fn allocate<F, R, E>(target: F) -> CallableTargetAllocate
where
F: FnOnce() -> Result<R, E> + Send,
{
...
let ((ptr, len), mut data) = match mem_alloc(len) {
MemoryAllocation::Succeeded(heap) => (heap.as_ptr(), heap.boxed::<CallableArgs<F, R, E>>()),
MemoryAllocation::Failed(err) => return CallableTargetAllocate::AllocationFailed(err),
};
data.result = None;
data.target = Some(target);
data.header = CallableHeader {
data: [ptr, len, 0, 0],
call: call::<F, R, E>,
};
CallableTargetAllocate::Succeeded(Self {
target: data.into(),
call: call::<F, R, E>,
})
}
}
The final returned operation during the schedule is an async read operation to some place in the callable’s heap. We need to read just one byte, which is not going to be interpreted, but is just a signal triggering a uring. This avoids using any synchronization primitives. How? I/O Ring will notify us when the operation is completed.
If we know how to manage a single worker, let’s try to instantiate it with a pool struct. The pool will be responsible for managing multiple workers and ensuring that jobs are enqueued and dequeued in a non-blocking fashion. We could start as in the following listing:
const WORKERS_COUNT: usize = 8;
pub struct IORuntimePool {
workers_completers: [Option<u64>; WORKERS_COUNT],
workers_array: [Option<Worker>; WORKERS_COUNT],
workers_slots: [usize; WORKERS_COUNT],
workers_count: usize,
queue_incoming: u32,
queue_outgoing: u32,
queue_counter: usize,
}
The code assumes we are working with 8 threads. It also declares a queue with a counter and an internal pipe. Why a pipe? It will be used to avoid blocking operations when we cannot schedule work. This will happen if more CPU tasks are requested than there are available workers. Let’s see how we can allocate it:
pub enum IORuntimePoolAllocation {
Succeeded(Boxed<IORuntimePool>),
AllocationFailed(isize),
ThreadingFailed(isize),
QueueFailed(isize),
}
impl IORuntimePool {
pub fn allocate() -> IORuntimePoolAllocation {
let queue = match PipeChannel::create() {
Ok(value) => value,
Err(err) => return IORuntimePoolAllocation::QueueFailed(err),
};
let mut instance: Boxed<IORuntimePool> = match mem_alloc(mem::size_of::<IORuntimePool>()) {
MemoryAllocation::Succeeded(heap) => heap.boxed(),
MemoryAllocation::Failed(err) => return IORuntimePoolAllocation::AllocationFailed(err),
};
for i in 0..WORKERS_COUNT {
let worker = match Worker::start() {
WorkerStart::Succeeded(worker) => worker,
WorkerStart::StartFailed(err) => return IORuntimePoolAllocation::ThreadingFailed(err),
WorkerStart::PipesFailed(err) => return IORuntimePoolAllocation::ThreadingFailed(err),
WorkerStart::StackFailed(err) => return IORuntimePoolAllocation::AllocationFailed(err),
};
instance.workers_array[i] = Some(worker);
instance.workers_slots[i] = i;
}
let (incoming, outgoing) = queue.extract();
instance.queue_counter = 0;
instance.workers_count = 0;
instance.queue_incoming = incoming;
instance.queue_outgoing = outgoing;
IORuntimePoolAllocation::Succeeded(instance)
}
}
The function first creates a pipe, then allocates memory on the heap for itself, and finally starts N workers. Let’s skip its destruction and focus on how we could schedule some work. The scheduling algorithm will perform two stages. The first one will be acquiring a worker, and the second stage is to execute the work. Both stages may block, which we don’t like because they have to be executed in the event loop. We will use I/O Ring to avoid blocking.
Let’s consider the first case, when a worker is available and we call it to execute a callable. The function will accept an I/O Ring submitter, two already prepared completers and a callable to be called. Completers are some lightweight structures I introduced in my I/O Runtime to carry information about scheduled ongoing I/O operation. They are always passed as user data in each I/O Ring operation. The first completer is responsible for notifying that queuing a task is completed; the second one will complete only if the callable is executed.
pub enum IORuntimePoolExecute {
Queued(),
Executed(),
ScheduleFailed(),
ExecutionFailed(),
InternallyFailed(),
}
impl IORuntimePool {
pub fn execute(
&mut self,
submitter: &mut IORingSubmitter,
completers: [&IORingCompleterRef; 2],
callable: &CallableTarget,
) -> IORuntimePoolExecute {
// acquire worker
if let Some(slot) = self.workers_slots.get(self.workers_count) {
let worker = match self.workers_array.get_mut(*slot) {
Some(Some(worker)) => worker,
_ => return IORuntimePoolExecute::InternallyFailed(),
};
// confirm queuing
let op = IORingSubmitEntry::noop();
match submitter.submit(completers[0].encode(), [op]) {
IORingSubmit::Succeeded(_) => (),
_ => return IORuntimePoolExecute::ScheduleFailed(),
}
// prepare execute op
let op = match worker.execute(callable) {
WorkerExecute::Succeeded(op) => op,
_ => return IORuntimePoolExecute::InternallyFailed(),
};
// confirm execution
match submitter.submit(completers[1].encode(), [op]) {
IORingSubmit::Succeeded(_) => (),
_ => return IORuntimePoolExecute::ExecutionFailed(),
}
// update internal counter and correlate worker with completer
self.workers_count += 1;
self.workers_completers[*slot] = Some(completers[1].encode());
return IORuntimePoolExecute::Executed();
}
...
}
}
In the above code, after successfully acquiring a worker, we schedule a noop operation with the first completer because we don’t need to wait for a worker, but we still need to complete it. We can immediately schedule the actual work — the worker’s execute function returned as an I/O Ring operation, which we connect with the second completer.
In the case of not having any available worker, we need to queue a callable using the internal pipe. It will be read later when a worker becomes ready:
impl IORuntimePool {
pub fn execute(
&mut self,
submitter: &mut IORingSubmitter,
completers: [&IORingCompleterRef; 2],
callable: &CallableTarget,
) -> IORuntimePoolExecute {
...
// append encoded completer to a callable header
let (ptr, len) = unsafe {
let (ptr, _) = callable.as_ptr();
let encoded = (ptr + 16) as *mut u64;
*encoded = completers[1].encode();
(ptr as *const u8, 24)
};
// notify when queuing happened
let op = IORingSubmitEntry::write(self.queue_outgoing, ptr, len, 0);
match submitter.submit(completers[0].encode(), [op]) {
IORingSubmit::Succeeded(_) => (),
_ => return IORuntimePoolExecute::ScheduleFailed(),
}
return IORuntimePoolExecute::Queued();
}
}
The code appends a completer to the header of the callable’s heap and uses the I/O Ring to notify when the message is consumed. The symmetric read happens when we detect any worker’s availability:
pub enum IORuntimePoolTrigger {
Succeeded(bool),
ExecutionFailed(),
InternallyFailed(),
}
impl IORuntimePool {
pub fn trigger(&mut self, submitter: &mut IORingSubmitter) -> IORuntimePoolTrigger {
if self.queue_counter <= 0 {
return IORuntimePoolTrigger::Succeeded(false);
}
if let Some(slot) = self.workers_slots.get(self.workers_count) {
trace1(b"acquired worker; slot=%d\n", *slot);
// worker still theoretically may fail
let worker = match self.workers_array.get_mut(*slot) {
Some(Some(worker)) => worker,
_ => return IORuntimePoolTrigger::InternallyFailed(),
};
// buffer is needed to collect data from the pipe
let mut buffer: [u8; 24] = [0; 24];
let ptr = buffer.as_mut_ptr() as *mut ();
// we expect to read ptr, len, encoded completer triple from a queue
let result = sys_read(self.queue_incoming, ptr, 24);
trace1(b"acquired callable; res=%d\n", result);
if result != 24 {
return IORuntimePoolTrigger::InternallyFailed();
} else {
self.queue_counter -= 1;
}
// decoding payload
let ptr = ptr as *const usize;
let len = unsafe { ptr.add(1) };
let encoded = unsafe { ptr.add(2) as *const u64 };
// rebuilding callable
let heap = unsafe { Heap::at(*ptr, *len) };
let callable: CallableTarget = CallableTarget::from(heap);
// then we try to follow known path
let op = match worker.execute(&callable) {
WorkerExecute::Succeeded(op) => op,
_ => return IORuntimePoolTrigger::InternallyFailed(),
};
// by registering it within I/O Ring
match submitter.submit(unsafe { *encoded }, [op]) {
IORingSubmit::Succeeded(_) => (),
_ => return IORuntimePoolTrigger::ExecutionFailed(),
}
// not forgetting about maintaining the state
self.workers_completers[*slot] = unsafe { Some(*encoded) };
self.workers_count += 1;
}
IORuntimePoolTrigger::Succeeded(true)
}
}
It reads 24 bytes from the pipe and decodes them as a callable to send it directly to a worker with an already covered path.
You may be wondering initially if reading and writing to a pipe might cause message splits. No, it won’t. The pipes in Linux are compatible with the POSIX.1 specification and provide a guarantee about some behavior. Let’s see what the man 7 pipe
says about it.
PIPE_BUF
POSIX.1 says that writes of less than PIPE_BUF bytes must be atomic:
the output data is written to the pipe as a contiguous sequence.
Writes of more than PIPE_BUF bytes may be nonatomic: the kernel may in‐
terleave the data with data written by other processes. POSIX.1 re‐
quires PIPE_BUF to be at least 512 bytes. (On Linux, PIPE_BUF is 4096
bytes.) The precise semantics depend on whether the file descriptor is
nonblocking (O_NONBLOCK), whether there are multiple writers to the
pipe, and on n, the number of bytes to be written:
O_NONBLOCK disabled, n <= PIPE_BUF
All n bytes are written atomically; write(2) may block if there
is not room for n bytes to be written immediately
O_NONBLOCK enabled, n <= PIPE_BUF
If there is room to write n bytes to the pipe, then write(2)
succeeds immediately, writing all n bytes; otherwise write(2)
fails, with errno set to EAGAIN.
O_NONBLOCK disabled, n > PIPE_BUF
The write is nonatomic: the data given to write(2) may be inter‐
leaved with write(2)s by other process; the write(2) blocks un‐
til n bytes have been written.
O_NONBLOCK enabled, n > PIPE_BUF
If the pipe is full, then write(2) fails, with errno set to EA‐
GAIN. Otherwise, from 1 to n bytes may be written (i.e., a
"partial write" may occur; the caller should check the return
value from write(2) to see how many bytes were actually writ‐
ten), and these bytes may be interleaved with writes by other
processes.
It means we don’t have to worry about packet splits, because PIPE_BUF
is not smaller than a page (4096 bytes) and we are sending just 24 bytes.
Once a pool is working, we can use it within the I/O Runtime. We need to define a few bindings to the context as follows:
impl IORingRuntime {
fn trigger(&mut self, completer: &IORingCompleterRef) {
// first release worker behind completer
self.pool.release_worker(completer);
// then trigger likely pending callable
self.pool.trigger(&mut self.submitter);
}
}
impl IORingRuntimeContext {
pub fn trigger(&mut self, completer: &IORingCompleterRef) {
unsafe { (*self.runtime).trigger(completer) }
}
}
pub enum IORingRuntimeExecute {
Queued(IORingCompleterRef, IORingCompleterRef),
Executed(IORingCompleterRef, IORingCompleterRef),
NotEnoughSlots(),
InternallyFailed(),
}
impl IORingRuntime {
fn execute(&mut self, task: &IORingTaskRef, callable: &CallableTarget) -> IORingRuntimeExecute {
let queued = match self.registry.append_completer(task.clone()) {
IORingRegistryAppend::Succeeded(completer) => completer,
IORingRegistryAppend::NotEnoughSlots() => return IORingRuntimeExecute::NotEnoughSlots(),
IORingRegistryAppend::InternallyFailed() => return IORingRuntimeExecute::InternallyFailed(),
};
let executed = match self.registry.append_completer(task.clone()) {
IORingRegistryAppend::Succeeded(completer) => completer,
IORingRegistryAppend::NotEnoughSlots() => return IORingRuntimeExecute::NotEnoughSlots(),
IORingRegistryAppend::InternallyFailed() => return IORingRuntimeExecute::InternallyFailed(),
};
match self.pool.execute(&mut self.submitter, [&queued, &executed], callable) {
IORuntimePoolExecute::Queued() => IORingRuntimeExecute::Queued(queued, executed),
IORuntimePoolExecute::Executed() => IORingRuntimeExecute::Executed(queued, executed),
IORuntimePoolExecute::ScheduleFailed() => IORingRuntimeExecute::InternallyFailed(),
IORuntimePoolExecute::ExecutionFailed() => IORingRuntimeExecute::InternallyFailed(),
IORuntimePoolExecute::InternallyFailed() => IORingRuntimeExecute::InternallyFailed(),
}
}
}
impl IORingRuntimeContext {
pub fn execute(&mut self, callable: &CallableTarget) -> IORingRuntimeExecute {
unsafe { (*self.runtime).execute(&self.task, callable) }
}
}
With such bindings, we can adjust the I/O Ring Task Token:
impl IORingTaskToken {
pub fn execute(waker: &Waker, task: &CallableTarget) -> Option<(IORingTaskToken, IORingTaskToken)> {
match Self::context(waker).execute(task) {
IORingRuntimeExecute::Queued(queued, executed) => Some((
IORingTaskToken::from_queue(queued),
IORingTaskToken::from_execute(executed),
)),
IORingRuntimeExecute::Executed(queued, executed) => Some((
IORingTaskToken::from_op(queued),
IORingTaskToken::from_execute(executed),
)),
IORingRuntimeExecute::NotEnoughSlots() => None,
IORingRuntimeExecute::InternallyFailed() => None,
}
}
pub fn extract(self, waker: &Waker) -> IORingTaskTokenExtract {
let context = Self::context(waker);
let value = match context.extract(&self.completer) {
IORingRuntimeExtract::Succeeded(value) => value,
IORingRuntimeExtract::NotCompleted() => return IORingTaskTokenExtract::Failed(self),
IORingRuntimeExtract::NotFound() => return IORingTaskTokenExtract::Failed(self),
};
if let IORingTaskTokenKind::Queue = self.kind {
// enqueue sent callable
context.enqueue(&self.completer);
}
if let IORingTaskTokenKind::Execute = self.kind {
// trigger awaiting callable
context.trigger(&self.completer);
}
IORingTaskTokenExtract::Succeeded(value)
}
}
We always get two tokens, and both of them will report completeness. The second one will try to inform the pool using a trigger that awaiting callables may be available for scheduling.
How could we benefit from it? Let’s imagine we managed to write a future-spawning task on the CPU:
pub fn spawn_cpu<'a, F, R, E>(target: F) -> Option<SpawnCPU<'a, F, R, E>>
where
F: FnOnce() -> Result<R, E> + Unpin + Send + 'a,
R: Unpin + Send,
E: Unpin + Send,
{
let task = match CallableTarget::allocate(target) {
CallableTargetAllocate::Succeeded(target) => target,
CallableTargetAllocate::AllocationFailed(_) => return None,
};
Some(SpawnCPU {
queued: None,
executed: None,
phantom: PhantomData,
task: Some(task),
})
}
pub struct SpawnCPU<'a, F, R, E>
where
F: Unpin,
R: Unpin,
{
task: Option<CallableTarget>,
queued: Option<IORingTaskToken>,
executed: Option<IORingTaskToken>,
phantom: PhantomData<(&'a F, R, E)>,
}
which can later be polled using the obtained token:
pub enum SpawnCPUResult<R, E> {
Succeeded(Option<Result<R, E>>),
OperationFailed(),
InternallyFailed(),
}
impl<'a, F, R, E> Future for SpawnCPU<'a, F, R, E>
where
F: FnOnce() -> Result<R, E> + Unpin,
R: Unpin,
E: Unpin,
{
type Output = SpawnCPUResult<R, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
trace0(b"# polling spawn-cpu\n");
if let Some(token) = this.queued.take() {
trace0(b"# polling spawn-cpu; extracting queued\n");
let result = match token.extract(cx.waker()) {
IORingTaskTokenExtract::Succeeded(value) => Some(value),
IORingTaskTokenExtract::Failed(token) => {
this.queued = Some(token);
None
}
};
if let Some(result) = result {
return if result < 0 {
Poll::Ready(SpawnCPUResult::OperationFailed())
} else {
trace1(b"# polling spawn-cpu; stage=queued, res=%d\n", result);
Poll::Pending
};
}
}
if let Some(token) = this.executed.take() {
trace0(b"# polling spawn-cpu; extracting executed\n");
let result = match token.extract(cx.waker()) {
IORingTaskTokenExtract::Succeeded(value) => Some(value),
IORingTaskTokenExtract::Failed(token) => {
this.executed = Some(token);
return Poll::Pending;
}
};
if let Some(result) = result {
return if result < 0 {
Poll::Ready(SpawnCPUResult::OperationFailed())
} else {
trace1(b"# polling spawn-cpu; stage=executed, res=%d\n", result);
let result = match this.task.take() {
None => SpawnCPUResult::InternallyFailed(),
Some(task) => SpawnCPUResult::Succeeded(task.result::<F, R, E>()),
};
Poll::Ready(result)
};
}
}
if this.queued.is_some() || this.executed.is_some() {
return Poll::Pending;
}
let task = match &this.task {
Some(task) => task,
None => return Poll::Ready(SpawnCPUResult::InternallyFailed()),
};
match IORingTaskToken::execute(cx.waker(), task) {
Some((queued, executed)) => {
trace2(b"callable; scheduled, qid=%d, eid=%d\n", queued.cid(), executed.cid());
this.queued = Some(queued);
this.executed = Some(executed);
Poll::Pending
}
None => {
trace0(b"callable not scheduled\n");
Poll::Ready(SpawnCPUResult::OperationFailed())
}
}
}
}
impl<'a, F, R, E> Drop for SpawnCPU<'a, F, R, E>
where
F: Unpin,
R: Unpin,
{
fn drop(&mut self) {
if let Some(task) = self.task.take() {
let (ptr, len) = task.as_ptr();
trace2(b"callable; releasing task, heap=%x, size=%d\n", ptr, len);
task.release();
}
}
}
It would lead us to the ability to schedule some code directly on threads without leaving single-threaded concurrency model in the async/await code. Let’s consider this example:
pub struct ThreadCommand {
pub ios: u32,
pub cpus: u32,
}
impl ThreadCommand {
pub async fn execute(self) -> Option<&'static [u8]> {
for j in 0..self.ios {
let task = spawn(async move {
for i in 0..self.cpus {
let value = match spawn_cpu(move || -> Result<u32, ()> { Ok(i + j) }) {
None => return Some(APP_CPU_SPAWNING_FAILED),
Some(task) => match task.await {
SpawnCPUResult::Succeeded(value) => value,
SpawnCPUResult::OperationFailed() => return Some(APP_INTERNALLY_FAILED),
SpawnCPUResult::InternallyFailed() => return Some(APP_INTERNALLY_FAILED),
},
};
if let Some(Ok(val)) = value {
trace3(b"completed %d %d %d\n", i, j, val);
}
}
None
});
match task.await {
SpawnResult::Succeeded() => (),
SpawnResult::OperationFailed() => return Some(APP_INTERNALLY_FAILED),
SpawnResult::InternallyFailed() => return Some(APP_INTERNALLY_FAILED),
}
}
None
}
}
The above code snippet does nothing noticeable from the visible outcomes. If both parameters are equal to 100, it computes 10,000 basic sums. It tries to use 100 async tasks with 100 thread calls within each task. The most important observation is that it doesn’t block the main thread which runs the event loop, making our application very responsive.
Let’s try to build something which actually may benefit from CPU and I/O at the same time. Do you know sha1sum
? It’s a command line tool to compute hashes for multiple files. It may be used in the following way:
vscode ➜ /workspaces/learning-rust (async-await-pipes) $ ls -la en*
-rw-r--r-- 1 vscode vscode 1013471918 Sep 2 2023 enwiki-20230901-pages-articles-multistream1.xml-p1p41242
-rw-r--r-- 1 vscode vscode 3854846248 Sep 6 2023 enwiki-20230901-pages-meta-history10.xml-p5136726p5137515.7z
-rw-r--r-- 1 vscode vscode 259750638 May 2 04:25 enwiki-20240501-pages-articles-multistream-index.txt.bz2
-rw-r--r-- 1 vscode vscode 23184900944 May 2 04:23 enwiki-20240501-pages-articles-multistream.xml.bz2
-rw-r--r-- 1 vscode vscode 23648605741 Jun 8 16:01 enwiki-20240601-pages-articles-multistream.xml.bz2
-rw-r--r-- 1 vscode vscode 41513214739 Jun 8 20:27 enwiki-20240601-pages-meta-current.xml.bz2
vscode ➜ /workspaces/learning-rust (async-await-pipes) $ time sha1sum en*
55375e89bccfb6851c9a60b933c6e1e458114ca3 enwiki-20230901-pages-articles-multistream1.xml-p1p41242
f020288be05a1e18da944b8df154ce84976f9b2a enwiki-20230901-pages-meta-history10.xml-p5136726p5137515.7z
134239721b29faa9264ecb7f59740444f640ec77 enwiki-20240501-pages-articles-multistream-index.txt.bz2
1412bbb2eb36d7d119b2f13a93ff9818db14cd3f enwiki-20240501-pages-articles-multistream.xml.bz2
54ee62ce7111daffd5be267ec863dc5f54360ed8 enwiki-20240601-pages-articles-multistream.xml.bz2
450dba4f17de7870cc088d800632e385fd6524a6 enwiki-20240601-pages-meta-current.xml.bz2
real 2m20.681s
user 1m54.684s
sys 0m25.853s
In my current directory, I had a few quite large Wikipedia files. The tool is able to compute the hashes very fast using one core at 100%. It reads file by file, whereas my file system is not a bottleneck. We could theoretically do it much faster. Let’s draw an alternative:
pub struct Sha1Command {
pub args: &'static ProcessArguments,
}
impl Sha1Command {
pub async fn execute(self) -> Option<&'static [u8]> {
for arg in 2..self.args.len() {
// a task will be spawned for each argument
let task = spawn(async move {
// an auto dropped memory for a buffer
let buffer: Droplet<Heap> = match mem_alloc(32 * 4096) {
MemoryAllocation::Succeeded(value) => value.droplet(),
MemoryAllocation::Failed(_) => return Some(APP_MEMORY_ALLOC_FAILED),
};
// a path of the file to hash
let path: ProcessArgument = match self.args.get(arg) {
None => return Some(APP_ARGS_FAILED),
Some(value) => value,
};
// a file descriptor for a file we opened
let file: FileDescriptor = match open_file(&path).await {
FileOpenResult::Succeeded(value) => value,
_ => return Some(APP_FILE_OPENING_FAILED),
};
let mut file_offset = 0;
let mut buffer_offset = 0;
let mut sha1 = Sha1::new();
loop {
while buffer_offset < buffer.len {
// slice a buffer to try it fill till the end
let buffer: HeapSlice = match buffer.between(buffer_offset, buffer.len) {
HeapSlicing::Succeeded(value) => value,
_ => return Some(APP_MEMORY_SLICE_FAILED),
};
// and read bytes into sliced memory from a given file offset
let read = match read_file(&file, buffer, file_offset).await {
FileReadResult::Succeeded(_, read) => read as usize,
_ => return Some(APP_FILE_READING_FAILED),
};
// both counters have to be incremented
buffer_offset += read;
file_offset += read as u64;
// and in case of end of file we return what we managed to read
if read == 0 {
break;
}
}
// let's slice till 512-bits boundary, as sha1 requires
let slice = match buffer.between(0, buffer_offset / 64 * 64) {
HeapSlicing::Succeeded(val) => val,
_ => return Some(APP_MEMORY_SLICE_FAILED),
};
// to process it outside event loop
let task = spawn_cpu(move || -> Result<Sha1, ()> {
// just processing a slice and returning new self
Ok(sha1.update(slice.ptr() as *const u8, slice.len()))
});
// the cpu task has to be awaited
sha1 = match task {
None => return Some(APP_CPU_SPAWNING_FAILED),
Some(task) => match task.await {
SpawnCPUResult::Succeeded(Some(Ok(sha1))) => sha1,
_ => return Some(APP_CPU_SPAWNING_FAILED),
},
};
// and in case we didn't full entire buffer
// we may assume the file is completed
if buffer_offset < buffer.len {
break;
}
// otherwise start filling buffer from the beginning
buffer_offset = 0;
}
// the buffer may have remainder between 0 and 63 bytes
let slice: HeapSlice = match buffer.between(buffer_offset / 64 * 64, buffer_offset) {
HeapSlicing::Succeeded(slice) => slice,
_ => return Some(APP_MEMORY_SLICE_FAILED),
};
// which needs to be finalized
let task = move || -> Result<[u32; 5], ()> {
// returning final hash as [u32; 5]
Ok(sha1.finalize(slice.ptr() as *mut u8, slice.len(), file_offset))
};
// a cpu task has to be awaited
let hash: [u32; 5] = match spawn_cpu(task) {
None => return Some(APP_CPU_SPAWNING_FAILED),
Some(task) => match task.await {
SpawnCPUResult::Succeeded(Some(Ok(hash))) => hash,
_ => return Some(APP_CPU_SPAWNING_FAILED),
},
};
// a message like sha1sum output is constructed
let mut msg = [0; 160];
let len = format6(
&mut msg,
b"%x%x%x%x%x %s\n",
hash[0],
hash[1],
hash[2],
hash[3],
hash[4],
path.as_ptr(),
);
// to be printed asynchronously in the stdout
let stdout = open_stdout();
match write_stdout(&stdout, (msg, len)).await {
StdOutWriteResult::Succeeded(_, _) => (),
_ => return Some(APP_STDOUT_FAILED),
}
// and finally we close a file
match close_file(file).await {
FileCloseResult::Succeeded() => (),
_ => return Some(APP_FILE_CLOSING_FAILED),
}
None
});
// and task has to be awaited to be executed
match task.await {
SpawnResult::Succeeded() => (),
_ => return Some(APP_IO_SPAWNING_FAILED),
}
}
None
}
}
The code does exactly the same job: it opens all files expanded by bash as separate tasks, allocates the same buffer of 128kB, and outputs hashes in the same format. I implemented a poorly optimized SHA1 algorithm which seems to be twice as slow compared to sha1sum. How does it run? Let’s check it:
vscode ➜ /workspaces/learning-rust (async-await-pipes) $ time /tmp/cargo/x86_64-unknown-none/release/learning-rust sha1sum en*
134239721b29faa9264ecb7f59740444f640ec77 enwiki-20240501-pages-articles-multistream-index.txt.bz2
55375e89bccfb6851c9a60b933c6e1e458114ca3 enwiki-20230901-pages-articles-multistream1.xml-p1p41242
f020288be05a1e18da944b8df154ce84976f9b2a enwiki-20230901-pages-meta-history10.xml-p5136726p5137515.7z
1412bbb2eb36d7d119b2f13a93ff9818db14cd3f enwiki-20240501-pages-articles-multistream.xml.bz2
54ee62ce7111daffd5be267ec863dc5f54360ed8 enwiki-20240601-pages-articles-multistream.xml.bz2
450dba4f17de7870cc088d800632e385fd6524a6 enwiki-20240601-pages-meta-current.xml.bz2
real 2m4.307s
user 3m53.123s
sys 0m39.138s
We can see it produces the same hashes, just in another order. Also, timing is a bit different. Time spent in the user space is twice as large. This is because of my poor SHA1 implementation. The kernel time is also increased, partially because of I/O Ring overhead, but the heap allocation for each closure could contribute to it as well.
We ended our ambitious PoC with a working example, not leaving the no-std and no-main environment. We learned how to deal with pipes and how to clone ourselves to run a thread. Finally, we touched closures and managed to move them between threads to not block the running event loop dedicated exclusively to I/O and perform CPU-intensive operations. With all these features, we didn’t use any synchronization primitives! Isn’t it a utopian concurrency model?
https://github.com/amacal/learning-rust/tree/async-await-pipes