Sharing data & ownership (part 2)
In Chapter 2.3 we've looked at ownership of data and how data can be borrowed by functions. In this chapter, we'll explore how data can be shared between threads.
Let's say we're implementing a scaffold to look up a user in a database asynchronously and add it to a list of users. With the stuff learned so far, you could come up with something like this:
use tokio::spawn; #[derive(Debug)] struct User { id: String, } async fn lookup_user(id: &str, users: &mut Vec<User>) { users.push(User { id: id.to_string() }) } #[tokio::main] async fn main() { let mut users: Vec<User> = vec![]; let lookup_users = spawn(async move { lookup_user("bob", &mut users).await; lookup_user("tom", &mut users).await; }); lookup_users.await.expect("failed to lookup users"); println!("Users: {users:?}"); }
Unfortunately this does not compile:
14 | let mut users: Vec<User> = vec![];
| --------- move occurs because `users` has type `std::vec::Vec<User>`, which does not implement the `Copy` trait
...
22 | println!("Users: {users:?}");
| ^^^^^ value borrowed here after move
Apparently we cannot use users
in main
after it moved into the async task. We need to find a way to use a reference
to users
that we can use inside the asynchronous task...
Well, maybe you paid attention to the 'advanced' example in chapter 2.3, did some reading online on Rc
and RefCell
(
like RefCell
use std::cell::RefCell; use std::rc::Rc; use tokio::spawn; #[derive(Debug)] struct User { id: String, } async fn lookup_user(id: &str, users: Rc<RefCell<Vec<User>>>) { users.borrow_mut().push(User { id: id.to_string() }) } #[tokio::main] async fn main() { let users = Rc::new(RefCell::new(vec![])); let users_for_task = users.clone(); let lookup_users = spawn(async move { lookup_user("bob", users_for_task.clone()).await; lookup_user("tom", users_for_task).await; }); lookup_users.await.expect("failed to lookup users"); println!("Users: {:?}", *users.borrow()); }
Another compiler error:
error[E0277]: `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>` cannot be sent between threads safely
--> src/main.rs:19:24
|
19 | let lookup_users = spawn(async move {
| ^^^^^ `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>` cannot be sent between threads safely
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.20/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>`
= note: required because it appears within the type `[static generator@src/main.rs:19:41: 22:6 users_for_task:std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>> _]`
= note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:19:41: 22:6 users_for_task:std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>> _]>`
= note: required because it appears within the type `impl std::future::Future`
So apparently we cannot use Rc
when using threads: 'the trait std::marker::Send
is not implemented
for std::rc::Rc<std::cell::RefCell<std::vec::Vec<User>>>
'.
Remember that in the previous chapter it was mentioned, that when using async move
, the Rust compiler checks if the
variable can be sent between threads safely. This is one of those cases, where it is unsafe to do so. Rc
is not
thread-safe, and RefCell
for sure isn't either.
Let's review what an Rc
is by reading the chapter in the Rust book:
Rc
This may also be a good time to read a bit about the
Send
and theSync
trait: Send and Sync.
Luckily for us, Rust has thread-safe replacements for Rc
and RefCell
: Arc
and Mutex
.
Arc
is a counted reference to an object that can be shared between threads. Arc
stands for 'Atomically Reference
Counted'.
Wrapping an object in an Arc
allows us to share a reference to that object across threads. Unfortunately for us, that
reference is read-only.
The goal of the exercise is to add the user, that was retrieved from the database, to the shared list of users.
To safely change data that is shared between threads, we need to wrap it in a Mutex
. Mutex stands for mutual
exclusion.
This means that a Mutex
safeguards access to the data that it wraps, by only allowing access to a single thread at
any
moment in time.
With these tools in hand, let's rewrite the example one last time:
use std::sync::Arc; use tokio::spawn; use tokio::sync::Mutex; #[derive(Debug)] struct User { id: String, } async fn lookup_user(id: &str, users: Arc<Mutex<Vec<User>>>) { let mut editable_users = users.lock().await; editable_users.push(User { id: id.to_string() }) } #[tokio::main] async fn main() { let users = Arc::new(Mutex::new(vec![])); let users_for_task = users.clone(); let lookup_users = spawn(async move { lookup_user("bob", users_for_task.clone()).await; lookup_user("tom", users_for_task).await; }); lookup_users.await.expect("failed to lookup users"); let users_with_access = users.lock().await; println!("Users: {users_with_access:?}"); }
Make sure to use the
tokio::sync::Mutex
and not the one from the standard library, when using amutex
in an application that uses thetokio
async runtime.
As pointed out previously, when cloning a Rc
or Arc
, we are cloning the reference to the data, not the data itself.
In the above example we are creating a clone of users
and store it in users_for_task
. The reason for this clone
step, is that we need access to the users
inside the lookup_users
task, but also afterwards in the main()
function when we print the user list. If we wouldn't clone users
it would move into the task (due to async move
) and
we could not use it later on.
For completeness, here's the broken example without the initial clone
step.
use std::sync::Arc; use tokio::spawn; use tokio::sync::Mutex; #[derive(Debug)] struct User { id: String, } async fn lookup_user(id: &str, users: Arc<Mutex<Vec<User>>>) { let mut editable_users = users.lock().await; editable_users.push(User { id: id.to_string() }) } #[tokio::main] async fn main() { let users = Arc::new(Mutex::new(vec![])); let lookup_users = spawn(async move { lookup_user("bob", users.clone()).await; lookup_user("tom", users.clone()).await; }); lookup_users.await.expect("failed to lookup users"); let users_with_access = users.lock().await; println!("Users: {users_with_access:?}"); }
In our example we use a Mutex
to protect the user list from mutual manipulation by concurrent threads. It often
happens that we need to read data in an object often, but manipulate it only seldom. In that case an RwLock
might
be a better alternative to a Mutex
. A RwLock
gives the same kind of protection as a Mutex
, but distinguishes
between read and write operations. It allows many read operations simultaneously, but only a single write operation.
Let's adjust our example to use a RwLock
.
use std::sync::Arc; use tokio::spawn; use tokio::sync::RwLock; #[derive(Debug)] struct User { id: String, } async fn lookup_user(id: &str, users: Arc<RwLock<Vec<User>>>) { let mut editable_users = users.write().await; editable_users.push(User { id: id.to_string() }) } #[tokio::main] async fn main() { let users = Arc::new(RwLock::new(vec![])); let users_for_task = users.clone(); let lookup_users = spawn(async move { lookup_user("bob", users_for_task.clone()).await; lookup_user("tom", users_for_task).await; }); lookup_users.await.expect("failed to lookup users"); let users_with_access = users.read().await; println!("Users: {:?}", users_with_access.as_slice()); }
Notice the
read()
vs.write()
methods to acquire the different type of locks.
To summarize:
Wrap an object in an Arc
if you want to share it between threads. If the object has to be mutable,
also wrap it in a Mutex
. If you have many read operations, and only a few write operations, use a RwLock
instead
of a Mutex
.
Scope of variables
We've seen how we can use a Mutex
to protect shared data between threads. What you may not have noticed is that the
Mutex
is dropped when it goes out of scope. And subsequently this means that the lock is released automatically when
the Mutex
is dropped.
This is a very powerful feature of Rust, and it's called RAII (Resource Acquisition Is Initialization). It's a pattern that is used in Rust to ensure that resources are cleaned up when they are no longer needed.
All variables in Rust have a scope, and when they go out of scope, they are dropped. This is the reason why we can use
Mutex
and RwLock
in Rust without having to worry about unlocking them. The lock is automatically released when the
Mutex
or RwLock
goes out of scope.
So how does scope work in Rust? And what are ways to limit the scope of variables?
Scope in Rust is defined by the curly braces {}
. Variables declared inside the curly braces are only available inside
those curly braces. This is called a block. This blocks can be part of a function definition, a loop, an if
statement, or standalone. Here are some examples:
fn main() { let x = 42; { let y = 24; println!("x: {}, y: {}", x, y); } // y is not available here println!("x: {}", x); }
Loops also have their own scope:
fn main() { let x = 42; for i in 0..3 { let y = i * 2; println!("x: {}, y: {}", x, y); } // y is not available here println!("x: {}", x); }
Scope and Locks
When using a Mutex
or RwLock
it's important to limit the scope of the lock to the smallest possible scope. This
ensures that the lock is released as soon as possible. This is important because the lock is blocking other threads
from accessing the data.
Here's an example of how to limit the scope of a Mutex
:
use std::sync::{Arc, Mutex}; fn main() { let data = Arc::new(Mutex::new(0)); { let mut locked_data = data.lock().unwrap(); *locked_data += 1; // locked_data is released here } println!("Data: {:?}", data); }
Alternatively you could extract the content of the {}
block into a function:
use std::sync::{Arc, Mutex}; fn main() { let data = Arc::new(Mutex::new(0)); increment_data(&data); println!("Data: {:?}", data); } fn increment_data(data: &Arc<Mutex<i32>>) { let mut locked_data = data.lock().unwrap(); *locked_data += 1; // locked_data is released here }