Asynchronous programming

Since you are now familiar with the basic Rust concepts, let's ease our way into some asynchronous programming.

The Rust standard library provides a way to execute tasks on different threads. In this book we'll focus on the relatively new async/.await standard. We use tokio as our asynchronous runtime. There are other good asynchronous runtimes for Rust, like: async_std, but because of my/our practical experience with tokio, this is the one we'll use.

To get a quick introduction on the topic, I'd recommend a quick read through this blog post: Async-await on stable Rust!

To get started, we need to pull ths tokio crate into our Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["full"] }

You can 'tune' your application by removing unneeded tokio features.

use std::time::Duration;
use tokio::spawn;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let async_task = spawn(async {
        println!("Async task started.");
        sleep(Duration::from_secs(1)).await;
        println!("Async task done.");
    });

    println!("Launching task...");
    async_task.await.expect("async task failed");
    println!("Ready!");
}

This example is best experienced on your local machine, where you can see the output in real-time.

Let's explore this example in a bit more detail.

The async statement marks a function for asynchronous execution. These functions return a Future. These Futures are not immediately scheduled. Actually, if a Future is never await-ed, it may never be executed.

Keep your eye out for any compiler warnings, or Clippy hints, while writing async code. Especially for things like:

warning: unused something that must be used ... futures do nothing unless you .await or poll them

The tokio::spawn(async {}) commands wraps an asynchronous task. You can return values from such a function in the normal way. The result can be captured after the .await completes.

Every time an .await is encountered in the code, the scheduler will temporarily pause the task and see if there is other work to be executed.

Let's extend our example to run a bunch of parallel tasks and await the results.

use std::time::Duration;
use tokio::spawn;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let mut tasks = vec![];
    for id in 0..5 {
        let t = spawn(async move {
            println!("Async task {} started.", id);
            sleep(Duration::from_millis((5 - id) * 100)).await;
            println!("Async task {} done.", id);
            let result = id * id; // silly calculation...
            (id, result)
        });

        tasks.push(t);
    }

    println!("Launched {} tasks...", tasks.len());
    for task in tasks {
        let (id, result) = task.await.expect("task failed");
        println!("Task {} completed with result: {}", id, result);
    }
    println!("Ready!");
}

What has changed compared to the previous example?

Notice the move statement, in the async block. This is needed if you want to use external variables, in this case id, inside the async block. The compiler will check if the variable can be sent across threads safely. More on that later.

We are returning the id and result in a tuple, that we deconstruct after await-ing the result of the async task.

Using a JoinSet

In the previous example, tasks complete in reverse order. If you observe the output from the main thread, though, you notice that nothing is printed until task '0' finishes. This is because we are iterating through the tasks in a sequence that does not match the order in which tasks complete. Typically, you want to act as soon as a task finishes, irrespective of the state of other tasks that are running in parallel.

This can be accomplished with the tokio::task::JoinSet. We have changed the example such that it uses the spawn method to launch new tasks, and then we use join_next() method to wait for the next task to complete. Once all tasks are completed, the join_next() method will return None. This is where we break the loop.

use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let mut tasks = JoinSet::new();
    for id in 0..5 {
        tasks.spawn(async move {
            println!("Async task {} started.", id);
            sleep(Duration::from_millis((5 - id) * 100)).await;
            println!("Async task {} done.", id);
            let result = id * id; // silly calculation...
            (id, result)
        });
    }

    println!("Launched {} tasks...", tasks.len());
    while let Some(task) = tasks.join_next().await {
        let (id, result) = task.expect("task failed");
        println!("Task {} completed with result: {}", id, result);
    }
    println!("Ready!");
}

Run both examples and compare the output. You will notice that the second example prints the results in the order in which the tasks complete.

Reference material