Sharing data & ownership (part 2)

In Chapter 2.3 we've looked at ownership of data and how data can be borrowed by functions. In this chapter, we'll explore how data can be shared between threads.

Let's say we're implementing a scaffold to look up a user in a database asynchronously and add it to a list of users. With the stuff learned so far, you could come up with something like this:

use tokio::spawn;

#[derive(Debug)]
struct User {
    id: String,
}

async fn lookup_user(id: &str, users: &mut Vec<User>) {
    users.push(User { id: id.to_string() })
}

#[tokio::main]
async fn main() {
    let mut users: Vec<User> = vec![];

    let lookup_users = spawn(async move {
        lookup_user("bob", &mut users).await;
        lookup_user("tom", &mut users).await;
    });

    lookup_users.await.expect("failed to lookup users");
    println!("Users: {users:?}");
}

Unfortunately this does not compile:

14 |       let mut users: Vec<User> = vec![];
   |           --------- move occurs because `users` has type `std::vec::Vec<User>`, which does not implement the `Copy` trait
   ...
22 |       println!("Users: {users:?}");
   |                               ^^^^^ value borrowed here after move

Apparently we cannot use users in main after it moved into the async task. We need to find a way to use a reference to users that we can use inside the asynchronous task...

Well, maybe you paid attention to the 'advanced' example in chapter 2.3, did some reading online on Rc and RefCell ( like RefCell and the Interior Mutability Pattern), and that lead you to adapt the code in this way:

use std::cell::RefCell;
use std::rc::Rc;
use tokio::spawn;

#[derive(Debug)]
struct User {
    id: String,
}

async fn lookup_user(id: &str, users: Rc<RefCell<Vec<User>>>) {
    users.borrow_mut().push(User { id: id.to_string() })
}

#[tokio::main]
async fn main() {
    let users = Rc::new(RefCell::new(vec![]));
    let users_for_task = users.clone();

    let lookup_users = spawn(async move {
        lookup_user("bob", users_for_task.clone()).await;
        lookup_user("tom", users_for_task).await;
    });

    lookup_users.await.expect("failed to lookup users");
    println!("Users: {:?}", *users.borrow());
}

Another compiler error:

error[E0277]: `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>` cannot be sent between threads safely
   --> src/main.rs:19:24
    |
19  |     let lookup_users = spawn(async move {
    |                        ^^^^^ `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>` cannot be sent between threads safely
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.20/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>`
    = note: required because it appears within the type `[static generator@src/main.rs:19:41: 22:6 users_for_task:std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>> _]`
    = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:19:41: 22:6 users_for_task:std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>> _]>`
    = note: required because it appears within the type `impl std::future::Future`

So apparently we cannot use Rc when using threads: 'the trait std::marker::Send is not implemented for std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>'.

Remember that in the previous chapter it was mentioned, that when using async move, the Rust compiler checks if the variable can be sent between threads safely. This is one of those cases, where it is unsafe to do so. Rc is not thread-safe, and RefCell for sure isn't either.

Let's review what an Rc is by reading the chapter in the Rust book: Rc, the Reference Counted Smart Pointer

This may also be a good time to read a bit about the Send and the Sync trait: Send and Sync.

Luckily for us, Rust has thread-safe replacements for Rc and RefCell: Arc and Mutex.

Arc is a counted reference to an object that can be shared between threads. Arc stands for 'Atomically Reference Counted'.

Wrapping an object in an Arc allows us to share a reference to that object across threads. Unfortunately for us, that reference is read-only.

The goal of the exercise is to add the user, that was retrieved from the database, to the shared list of users.

To safely change data that is shared between threads, we need to wrap it in a Mutex. Mutex stands for mutual exclusion. This means that a Mutex safeguards access to the data that it wraps, by only allowing access to a single thread at any moment in time.

With these tools in hand, let's rewrite the example one last time:

use std::sync::Arc;
use tokio::spawn;
use tokio::sync::Mutex;

#[derive(Debug)]
struct User {
    id: String,
}

async fn lookup_user(id: &str, users: Arc<Mutex<Vec<User>>>) {
    let mut editable_users = users.lock().await;
    editable_users.push(User { id: id.to_string() })
}

#[tokio::main]
async fn main() {
    let users = Arc::new(Mutex::new(vec![]));
    let users_for_task = users.clone();

    let lookup_users = spawn(async move {
        lookup_user("bob", users_for_task.clone()).await;
        lookup_user("tom", users_for_task).await;
    });

    lookup_users.await.expect("failed to lookup users");
    let users_with_access = users.lock().await;
    println!("Users: {users_with_access:?}");
}

Make sure to use the tokio::sync::Mutex and not the one from the standard library, when using a mutex in an application that uses the tokio async runtime.

As pointed out previously, when cloning a Rc or Arc, we are cloning the reference to the data, not the data itself.

In the above example we are creating a clone of users and store it in users_for_task. The reason for this clone step, is that we need access to the users inside the lookup_users task, but also afterwards in the main() function when we print the user list. If we wouldn't clone users it would move into the task (due to async move) and we could not use it later on.

For completeness, here's the broken example without the initial clone step.

use std::sync::Arc;
use tokio::spawn;
use tokio::sync::Mutex;

#[derive(Debug)]
struct User {
    id: String,
}

async fn lookup_user(id: &str, users: Arc<Mutex<Vec<User>>>) {
    let mut editable_users = users.lock().await;
    editable_users.push(User { id: id.to_string() })
}

#[tokio::main]
async fn main() {
    let users = Arc::new(Mutex::new(vec![]));

    let lookup_users = spawn(async move {
        lookup_user("bob", users.clone()).await;
        lookup_user("tom", users.clone()).await;
    });

    lookup_users.await.expect("failed to lookup users");
    let users_with_access = users.lock().await;
    println!("Users: {users_with_access:?}");
}

In our example we use a Mutex to protect the user list from mutual manipulation by concurrent threads. It often happens that we need to read data in an object often, but manipulate it only seldom. In that case an RwLock might be a better alternative to a Mutex. A RwLock gives the same kind of protection as a Mutex, but distinguishes between read and write operations. It allows many read operations simultaneously, but only a single write operation.

Let's adjust our example to use a RwLock.

use std::sync::Arc;
use tokio::spawn;
use tokio::sync::RwLock;

#[derive(Debug)]
struct User {
    id: String,
}

async fn lookup_user(id: &str, users: Arc<RwLock<Vec<User>>>) {
    let mut editable_users = users.write().await;
    editable_users.push(User { id: id.to_string() })
}

#[tokio::main]
async fn main() {
    let users = Arc::new(RwLock::new(vec![]));
    let users_for_task = users.clone();

    let lookup_users = spawn(async move {
        lookup_user("bob", users_for_task.clone()).await;
        lookup_user("tom", users_for_task).await;
    });

    lookup_users.await.expect("failed to lookup users");
    let users_with_access = users.read().await;
    println!("Users: {:?}", users_with_access.as_slice());
}

Notice the read() vs. write() methods to acquire the different type of locks.

To summarize:

Wrap an object in an Arc if you want to share it between threads. If the object has to be mutable, also wrap it in a Mutex. If you have many read operations, and only a few write operations, use a RwLock instead of a Mutex.

Scope of variables

We've seen how we can use a Mutex to protect shared data between threads. What you may not have noticed is that the Mutex is dropped when it goes out of scope. And subsequently this means that the lock is released automatically when the Mutex is dropped.

This is a very powerful feature of Rust, and it's called RAII (Resource Acquisition Is Initialization). It's a pattern that is used in Rust to ensure that resources are cleaned up when they are no longer needed.

All variables in Rust have a scope, and when they go out of scope, they are dropped. This is the reason why we can use Mutex and RwLock in Rust without having to worry about unlocking them. The lock is automatically released when the Mutex or RwLock goes out of scope.

So how does scope work in Rust? And what are ways to limit the scope of variables?

Scope in Rust is defined by the curly braces {}. Variables declared inside the curly braces are only available inside those curly braces. This is called a block. This blocks can be part of a function definition, a loop, an if statement, or standalone. Here are some examples:

fn main() {
    let x = 42;

    {
        let y = 24;
        println!("x: {}, y: {}", x, y);
    }

    // y is not available here
    println!("x: {}", x);
}

Loops also have their own scope:

fn main() {
    let x = 42;

    for i in 0..3 {
        let y = i * 2;
        println!("x: {}, y: {}", x, y);
    }

    // y is not available here
    println!("x: {}", x);
}

Scope and Locks

When using a Mutex or RwLock it's important to limit the scope of the lock to the smallest possible scope. This ensures that the lock is released as soon as possible. This is important because the lock is blocking other threads from accessing the data.

Here's an example of how to limit the scope of a Mutex:

use std::sync::{Arc, Mutex};

fn main() {
    let data = Arc::new(Mutex::new(0));

    {
        let mut locked_data = data.lock().unwrap();
        *locked_data += 1;
        // locked_data is released here
    }

    println!("Data: {:?}", data);
}

Alternatively you could extract the content of the {} block into a function:

use std::sync::{Arc, Mutex};

fn main() {
    let data = Arc::new(Mutex::new(0));
    increment_data(&data);
    println!("Data: {:?}", data);
}

fn increment_data(data: &Arc<Mutex<i32>>) {
    let mut locked_data = data.lock().unwrap();
    *locked_data += 1;
    // locked_data is released here
}

Reference material