Learning Rust: Part 7— Threads, Shared State and Channels
Next in our series; Threads, Shared State, Channels and some general concurrency information.
Rust Series
Part 1 — Basic Concepts
Part 2 — Memory
Part 3 — Flow Control and Functions
Part 4 — Option / Result and Collections
Part 5 — Traits, Generics and Closures
Part 6 — Macros, Iterators and File Handling
Part 7 — Threads Shared State and Channels (This article)
Introduction
Welcome to Part 7 of the Learning Rust Series. In this part of the series will look at several topic related to concurrent programming in Rust. This includes threads for parallel processing, sharing data and tracking its ownership.
First we will start out with a short explanation of a handy macro to use during development.
Detour — todo! Macro
Developers in Rust often use this macro because it allows you to tell the compiler not complain about some portion of unfinished code. When developing, and using an iterative approach, you may want to first write the general structure of your code to help you imagine your project’s final form. For example, imagine a simple project to do something with sensors and sensor types.
struct Sensor;
enum SensorType {
Temperature,
Humidity,
}
fn get_sensor(sensor: &Sensor) -> Option<String> {
todo!(); // will implement later
}
fn delete_sensor(sensor: &Sensor) -> Result<(), String> {
todo!(); // will implement later
}
fn print_sensor_type(sens_type: &SensorType) {
match sens_type {
SensorType::Temperature => println!("It is a temperature sensor"),
SensorType::Humidity => println!("It is a humidity sensor"),
}
}
fn main() {
let sens_type = SensorType::Temperature;
print_sensor_type(&sens_type);
}
Here we want to test one function, print_sensor_type, before we finished the other. Notice the todo! within the get_sensor and delete_sensor functions. If we removed these the compiler would not compile and we could not test our print_sensor_type function, which was the intent.
Definitely not a complex topic, but still it provides another tool in our toolbox of Rust development techniques.
Threads
Now let’s get into threads. This is a complex subject and is going to take some time to explain.
If you have not read Part 5 of this series Traits, Generics and Closures, I would recommend going back to it and reading the closure section at least. It is critical to understanding the thread code we will look at. Closures are at the heart of threads in Rust so the understanding of closures and their syntax is critical.
Possibly to start out a few definitions might be useful.
- Process — Running programs execute as processes. A process has its own virtual address space, at least one thread and OS overhead to manage that process.
- Thread — Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
- Concurrency — this means where different parts of a program execute independently.
- Parallel Programming — this means where different parts of a program execute at the same time
Rust strong ownership memory model lends itself well to safe concurrent programming. Unique I think in Rust, is many errors that could arise during concurrent execution are caught by the compiler.
In Rust you create threads with std::thread::spawn and then implement a closure as parameter to the spawn function to tell it what to execute.
Here is a simple example of a thread.
fn main() {
for _ in 0..10 {
let handle = std::thread::spawn(|| {
println!("Inside thread print");
});
}
}
This usually will not print anything, especially if executed in the Rust Playground, because the main function finishes before the thread. If you think of it in a sequential manner, this may be hard to understand. But threads are executed by the OS, and the primary thread (main) will likely finish before any child thread.
Let’s modify the code above to do something in the main thread, mainly sleep for a few 1 millisecond.
use std::thread;
use core::time::Duration;
fn main() {
for _ in 0..10 {
let _handle = std::thread::spawn(|| {
println!("Inside thread print");
});
}
for i in 0..5 {
println!("In loop number {} from the main function thread", i);
thread::sleep(Duration::from_millis(1));
}
}
Now we get what expect for output.
Inside thread print
Inside thread print
In loop number 0 from the main function thread
Inside thread print
Inside thread print
Inside thread print
Inside thread print
Inside thread print
Inside thread print
Inside thread print
Inside thread print
In loop number 1 from the main function thread
In loop number 2 from the main function thread
In loop number 3 from the main function thread
In loop number 4 from the main function thread
Note that if the main thread exits before all of the spawned threads are complete you may not see Inside Thread Print displayed 10 times.
Another way to make sure we get the result from all the thread we use a join. When a thread is created we say it is forked and then when collected we say join. The return type from spawn is a JoinHandle and it has a function join().
fn main() {
for _ in 0..10 {
let handle = std::thread::spawn(|| {
println!("Inside thread print");
});
let _ = handle.join();
}
}
This solves our problem, we get the expected output, but it is not really an optimal program. We are calling the join within the loop which doesn’t really make sense from a performance perspective. To optimize this we would modify the above to store the handles and then call join outside. Let’s see that example.
fn main() {
let mut join_handles = vec![];
for _ in 0..10 {
let handle = std::thread::spawn(|| {
println!("Inside thread print");
});
join_handles.push(handle);
}
for handle in join_handles {
handle.join().unwrap();
}
}
This has several items to note about the code.
- There is the Vec that will hold each of the JoinHandles. It is declared outside of the for loop.
- We push the handle onto the Vec, but don’t call join at this point.
- The last loop iterates over our Vec that calls join().unwrap() on each handle.
Along the same lines let’s look at an example of a thread accessing a variable outside of the closure scope. Remember from the earlier article this is where closures got their name, they can “Close Over” external variables.
In this example we declare a variable called outer_string that is declared outside of the thread scope. Then, to access this we are required to use the move keyword in front of our closure. If you want try removing it to see the compile error.
use std::{thread, time};
fn main() {
let start = time::Instant::now();
let outer_string = String::from("outside");
let thread = thread::spawn(move || {
println!("Inside thread with string '{}'", outer_string);
});
let _ = thread.join();
let finish = time::Instant::now();
println!("Duration: {:02?}", finish.duration_since(start));
}
If you remove it, you get a lengthy error, but they key line in the output is.
help: to force the closure to take ownership of `outer_string` (and any other referenced variables), use the `move` keyword
I wanted to highlight one line in the code with a little explanation.
let _ = thread.join();
The let _ portion is something I added to ignore the return value and therefore remove the compiler warning. thread.join() returns a Result and through the underscore I am saying I acknowledge there is a return type, but I am not going to use it.
Here is another example of using a loop to spawn multiple threads, all accessing the same mutable outer variable, num. Again, we used the move keyword to signal we are accessing a variable outside the closures inner scope.
use std::{thread, time};
fn main() {
let start = time::Instant::now();
let mut num = 4;
for _ in 1..10 {
thread::spawn(move || {
num += 1;
println!("String is {}", num);
});
}
thread::sleep(time::Duration::from_secs(1));
let finish = time::Instant::now();
println!("Duration: {:02?}", finish.duration_since(start));
}
What is noteworthy here is that the output looks as follows.
String is 5
String is 5
String is 5
String is 5
String is 5
String is 5
String is 5
String is 5
String is 5
Duration: 1.000600599s
If you expected an incrementing number (5, 6, 7…), this is not the case. Each thread receives a copy of the original outer variable that is initialized with a value of 4. Then each thread prints 5.
This is the end of the core part of our thread discussion, but let’s look at some other related topics.
Mutex and Referencing Counting
You may know the term Mutex from other languages you have experience with. Mutex is short for mutual exclusion, a mechanism for ensuring that a piece of code is executed by only one thread at a time.
Here is an example, a number is wrapped in a Mutex and then accessed via lock().unwrap().
use std::sync::Mutex;
fn main() {
let mutexed_num = Mutex::new(8);
{
let number = mutexed_num.lock().unwrap();
println!("A) Mutexed number multiple by 4 equals {}", *number * 4);
}
println!("B) Mutexed number multiplied by 6 equals {}", *mutexed_num.lock().unwrap() * 6);
}
In this example it calls lock twice, how does this work? Well, the first lock gets released when the block ends, because that's when the number variable gets dropped.
As you can see, it is possible in this way to declare a Mutex, but we need reference counting to use it in a multi-threaded environment.
In Rust Rc (and its sibling for concurrent programming Arc) stands for “reference counter” (or “reference counted”).
Rc is used often as a way to get around Rust’s strict rules on ownership, that we learned about in Part 2 on Rust Memory, without actually breaking these rules. It allows shared ownership by tracking how long the data is shared for. Note this reference counting, which we will look at, is similar at a very basic level how garbage collectors work on determining which memory to clean up in languages like Java.
Let’s look at a somewhat trivial, but effective for explanation, example of reference counting.
use std::rc::Rc;
#[derive(Debug)]
struct Sensor {
name: Rc<String>,
description: Rc<String>, // String inside an Rc
}
#[derive(Debug)]
struct SensorData {
_names: Vec<Rc<String>>,
_descriptions: Vec<Rc<String>>, // A Vec of Strings inside Rcs
}
fn main() {
let temp_sensor_name = Rc::new("Temperature".to_string());
let temp_sensor_desc = Rc::new(
"Temperature Sensor On Roof Of Building A"
.to_string(),
);
let sensor_a = Sensor {
name: Rc::clone(&temp_sensor_name),
description: Rc::clone(&temp_sensor_desc),
};
let _location_a_sensors = SensorData {
_names: vec![Rc::clone(&temp_sensor_name)], // .clone() will increase the count
_descriptions: vec![Rc::clone(&temp_sensor_desc)],
};
println!("{} sensor description is: {}", sensor_a.name, sensor_a.description);
println!("Reference count for description: {}", Rc::strong_count(&sensor_a.description));
}
This prints this.
Temperature sensor description is: Temperature Sensor On Roof Of Building A
Reference count for description: 3
The value is 3 as we call Rc:new once and Rc::clone twice on the temp_sensor_desc variable. Each time it received a new owner (via clone twice).
With threads we use Arc. Arc (atomics in general) should only be used with concurrent situations as with thread safety comes a performance penalty, that is only needed for concurrency. Arc stands for Atomic Reference Count and atomics work like primitive types, but are safe to share across threads
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let loop_counter = Arc::new(Mutex::new(0));
let mut join_handles = vec![];
for _ in 0..15 {
let loop_counter = Arc::clone(&loop_counter);
let join_handle = thread::spawn(move || {
let mut loop_num = loop_counter.lock().unwrap();
*loop_num += 1;
});
join_handles.push(join_handle);
}
for join_handle in join_handles {
join_handle.join().unwrap();
}
println!("Result: {}", *loop_counter.lock().unwrap());
}
Notice here we use Arc::new. Also, within the for loop that spawns threads we call Arc::clone which is thread safe.
This then completes are introduction to safe shared state with Mutex, Rc and Arc.
Channels
This is also referred to as message passing in the Rust documentation. Communication between threads can be implemented in a safe way by the use of channels. Rust’s standard library provides channel functionality from the std::sync::mpsc. mpsc means Multiple Producers, Single Consumer. That is, these channels may have multiple writers, but only a single reader. The standard library implements channels where they can have multiple sending sources that produce values but only one receiving sinks that consumes those values. Visually it might help to think of a funnel, where they big end is the producer side and the single output is the consumer.
Let’s look at an example of opening a channel. In this example, we clone the first channel, setting up the multiple producer scenario.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
let _ = tx.send(1);
let _ = thread::spawn(move || tx.send(10));
let _ = thread::spawn(move || tx_clone.send(100));
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {} via the channel", rx.recv().unwrap());
}
When you run this you may get the output.
Received 1 via the channel
Received 10 via the channel
but you may also get the output
Received 1 via the channel
Received 100 via the channel
That is because we call receive only twice and the 100 may get to the channel before the 10 that was sent.
To get all the values write this.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
let _ = tx.send(1);
let _ = thread::spawn(move || tx.send(10));
let _ = thread::spawn(move || tx_clone.send(100));
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {} via the channel", rx.recv().unwrap());
println!("Received {} via the channel", rx.recv().unwrap());
}
Then you will get this, all three values, but in an indeterminate order.
Received 1 via the channel
Received 10 via the channel
Received 100 via the channel
or
Received 1 via the channel
Received 100 via the channel
Received 10 via the channel
What if you want to send multiple values and seeing the receiver waiting? The spawned thread will now send multiple sensor reading messages and pause for a 200ms between each message
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let readings = vec![
String::from("Temperature"),
String::from("Humidity"),
String::from("Air Pressure"),
String::from("Wind Speed"),
];
for reading in readings {
tx.send(reading).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
for received in rx {
println!("Received Sensor Reading: {}", received);
}
}
The output you will see is as follows.
Received Sensor Reading: Temperature Received Sensor Reading: Humidity Received Sensor Reading: Air Pressure Received Sensor Reading: Wind Speed
A final example will show how the consumer receives the values in a Result. This loops through until there are no more values. The output would be the same as above.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
let _ = tx.send(1);
let _ = thread::spawn(move || tx.send(10));
let _ = thread::spawn(move || tx_clone.send(100));
loop {
let result = rx.recv();
if result.is_err() {
break;
} else {
println!("Received {} via the channel", result.unwrap());
}
}
}
Note also we are using tx.clone to create a secnario where we have multiple producers.
This concludes our discussion of channels.
Summary
Hope you enjoyed this portion of our Learning Rust series. This article started with a look at a technique for step-wise building functionality by using the todo! macro to signal that the implementation is not complete, but will be done later. Next we looked at threads and thread creating and details. The concept of a mutex was covered next and how reference counting can be used to allow multiple owners of a shared variable. Finally, we looked at channels and how to communicate between threads within Rust.
Enjoy the journey!
🔔 If you enjoyed this, subscribe to my future articles or view already published here. 🚀
📝 Have questions or suggestions? Leave a comment or message me through Medium.
Thank you for your support! 🌟