FP Complete


This blog post is a direct follow up on my previous blog post on different levels of async in Rust. You may want to check that one out before diving in here.

Alright, so now we know that we can make our programs asynchronous by using non-blocking I/O calls. But last time we only saw examples that remained completely sequential, defeating the whole purpose of async. Let’s change that with something more sophisticated.

A few months ago I needed to ensure that all the URLs for a domain name resolved to either a real web page (200 status code) or redirected to somewhere else with a real web page. To make that happen, I needed a program that would:

To make this simple, we’re going to take a lot of shortcuts like:

For the curious: the original version of this was a really short Haskell program that had these properties. For fun a few weeks back, I rewrote it in two ways in Rust, which ultimately led to this pair of blog posts.

Fully blocking

Like last time, I recommend following along with my code. I’ll kick this off with cargo new httpstatus. And then to avoid further futzing with our Cargo.toml, let’s add our dependencies preemptively:

[dependencies]
tokio = { version = "0.2.22", features = ["full"] }
reqwest = { version = "0.10.8", features = ["blocking"] }
async-channel = "1.4.1"
is_type = "0.2.1"

That features = ["blocking"] should hopefully grab your attention. The reqwest library provides an optional, fully blocking API. That seems like a great place to get started. Here’s a nice, simple program that does what we need:

// To use .lines() before, just like last time
use std::io::BufRead;

// We'll return _some_ kind of an error
fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Open the file for input
    let file = std::fs::File::open("urls.txt")?;
    // Make a buffered version so we can read lines
    let buffile = std::io::BufReader::new(file);

    // CSV header
    println!("URL,Status");

    // Create a client so we can make requests
    let client = reqwest::blocking::Client::new();

    for line in buffile.lines() {
        // Error handling on reading the lines in the file
        let line = line?;
        // Make a request and send it, getting a response
        let resp = client.get(&line).send()?;
        // Print the status code
        println!("{},{}", line, resp.status().as_u16());
    }
    Ok(())
}

Thanks to Rust’s ? syntax, error handling is pretty easy here. In fact, there are basically no gotchas here. reqwest makes this code really easy to write!

Once you put a urls.txt file together, such as the following:

https://www.wikipedia.org
https://www.wikipedia.org/path-the-does-not-exist
http://wikipedia.org

You’ll hopefully get output such as:

URL,Status
https://www.wikipedia.org,200
https://www.wikipedia.org/path-the-does-not-exist,404
http://wikipedia.org,200

The logic above is pretty easy to follow, and hopefully the inline comments explain anything confusing. With that idea in mind, let’s up our game a bit.

Ditching the blocking API

Let’s first move away from the blocking API in reqwest, but still keep all the sequential nature of the program. This involves four relatively minor changes to the code, all spelled out below:

use std::io::BufRead;

// First change: add the Tokio runtime
#[tokio::main]
// Second: turn this into an async function
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = std::fs::File::open("urls.txt")?;
    let buffile = std::io::BufReader::new(file);

    println!("URL,Status");

    // Third change: Now we make an async Client
    let client = reqwest::Client::new();

    for line in buffile.lines() {
        let line = line?;

        // Fourth change: We need to .await after send()
        let resp = client.get(&line).send().await?;

        println!("{},{}", line, resp.status().as_u16());
    }
    Ok(())
}

The program is still fully sequential: we fully send a request, then get the response, before we move onto the next URL. But we’re at least ready to start playing with different async approaches.

Where blocking is fine

IF you remember from last time, we had a bit of a philosophical discussion on the nature of blocking, and that ultimately some blocking is OK in a program. In order to both simplify what we do here, as well as provide some real-world recommendations, let’s list all of the blocking I/O we’re doing:

Note that, even though we’re sequentially running our HTTP requests right now, those are in fact using non-blocking I/O. Therefore, I haven’t included anything related to HTTP in the list above. We’ll start dealing with the sequential nature next.

Returning to the four blocking I/O calls above, I’m going to make a bold statement: don’t bother making them non-blocking. It’s not actually terribly difficult to do the file I/O using tokio (we saw how last time). But we get virtually no benefit from doing so. The latency for local disk access, especially when we’re talking a file as small as urls.txt is likely to be, and especially in contrast to a bunch of HTTP requests, is miniscule.

Feel free to disagree with me, or to take on making those calls non-blocking as an exercise. But I’m going to focus instead on higher value targets.

Concurrent requests

The real problem here is that we have sequential HTTP requests going on. Instead, we would much prefer to make our requests concurrently. If we assume there are 100 URLs, and each request takes 1 second (hopefully an overestimation), a sequential algorithm can at best finish in 100 seconds. However, a concurrent algorithm could in theory finish all 100 requests in just 1 second. In reality that’s pretty unlikely to happen, but it is completely reasonable to expect a significant speedup factor, depending on network conditions, number of hosts you’re connecting to, and other similar factors.

So how exactly do we do concurrency with tokio? The most basic answer is the tokio::spawn function. This spawns a new task in the tokio runtime. This is similar in principle to spawning a new system thread. But instead, running and scheduling is managed by the runtime instead of the operating system. Let’s take a first stab at spawning each HTTP request into its own task:

tokio::spawn(async move {
    let resp = client.get(&line).send().await?;

    println!("{},{}", line, resp.status().as_u16());
});

That looks nice, but we have a problem:

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `std::ops::Try`)
  --> srcmain.rs:16:24
   |
15 |           tokio::spawn(async move {
   |  _________________________________-
16 | |             let resp = client.get(&line).send().await?;
   | |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
17 | |
18 | |             println!("{},{}", line, resp.status().as_u16());
19 | |         });
   | |_________- this function should return `Result` or `Option` to accept `?`

Our task doesn’t return a Result, and therefore has no way to complain about errors. This is actually indicating a far more serious issue, which we’ll get to later. But for now, let’s just pretend errors won’t happen, and cheat a bit with .unwrap():

let resp = client.get(&line).send().await.unwrap();

This also fails, now with an ownership issue:

error[E0382]: use of moved value: `client`
  --> srcmain.rs:15:33
   |
10 |       let client = reqwest::Client::new();
   |           ------ move occurs because `client` has type `reqwest::async_impl::client::Client`, which does not implement the `Copy` trait

This one is easier to address. The Client is being shared by multiple tasks. But each task needs to make its own clone of the Client. If you read the docs, you’ll see that this is recommended behavior:

The Client holds a connection pool internally, so it is advised that you create one and reuse it.

You do not have to wrap the Client it in an Rc or Arc to reuse it, because it already uses an Arc internally.

Once we add this line before our tokio::spawn, our code will compile:

let client = client.clone();

Unfortunately, things fail pretty spectacularly at runtime:

URL,Status
thread 'thread 'tokio-runtime-workerthread 'tokio-runtime-worker' panicked at '' panicked at 'tokio-runtime-workercalled `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "https://www.wikipedia.org/path-the-does-not-exist", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }called `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "https://www.wikipedia.org/", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }' panicked at '', ', called `Result::unwrap()` on an `Err` value: reqwest::Error { kind: Request, url: "http://wikipedia.org/", source: hyper::Error(Connect, ConnectError("dns error", Custom { kind: Interrupted, error: JoinError::Cancelled })) }srcmain.rssrcmain.rs', ::srcmain.rs1717:::241724

That’s a big error message, but the important bit for us is a bunch of JoinError::Cancelled stuff all over the place.

Wait for me!

Let’s talk through what’s happening in our program:

  1. Initiate the Tokio runtime
  2. Create a Client
  3. Open the file, start reading line by line
  4. For each line:
    • Spawn a new task
    • That task starts making non-blocking I/O calls
    • Those tasks go to sleep, to be rescheduled when data is ready
    • When all is said and done, print out the CSV lines
  5. Reach the end of the main function, which triggers the runtime to shut down

The problem is that we reach (5) long before we finish (4). When this happens, all in-flight I/O will be cancelled, which leads to the error messages we saw above. Instead, we need to ensure we wait for each task to complete before we exit. The easiest way to do this is to call .await on the result of the tokio::spawn call. (Those results, by the way, are called JoinHandles.) However, doing so immediately will completely defeat the purpose of our concurrent work, since we will once again be sequential!

Instead, we want to spawn all of the tasks, and then wait for them all to complete. One easy way to achieve this is to put all of the JoinHandles into a Vec. Let’s look at the code. And since we’ve made a bunch of changes since our last complete code dump, I’ll show you the full current status of our source file:

use std::io::BufRead;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = std::fs::File::open("urls.txt")?;
    let buffile = std::io::BufReader::new(file);

    println!("URL,Status");

    let client = reqwest::Client::new();

    let mut handles = Vec::new();

    for line in buffile.lines() {
        let line = line?;

        let client = client.clone();
        let handle = tokio::spawn(async move {
            let resp = client.get(&line).send().await.unwrap();

            println!("{},{}", line, resp.status().as_u16());
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await?;
    }
    Ok(())
}

And finally we have a concurrent program! This is actually pretty good, but it has two flaws we’d like to fix:

  1. It doesn’t properly handle errors, instead just using .unwrap(). I mentioned this above, and said our usage of .unwrap() was indicating a “far more serious issue.” That issue was the fact that the result values from spawning subthreads are never noticed by the main thread, which is really the core issue causing the cancellation we discussed above. It’s always nice when type-driven error messages indicate a runtime bug in our code!
  2. There’s no limitation on the number of concurrent tasks we’ll spawn. Ideally, we’d rather have a job queue approach, with a dedicated number of worker tasks. This will let our program behave better as we increase the number of URLs in our input file.

NOTE It would be possible in the program above to skip the spawns and collect a Vec of Futures, then await on those. However, that would once again end up sequential in nature. Spawning allows all of those Futures to run concurrently, and be polled by the tokio runtime itself. It would also be possible to use join_all to poll all of the Futures, but it has some performance issues. So best to stick with tokio::spawn.

Let’s address the simpler one first: proper error handling.

Error handling

The basic concept of error handling is that we want the errors from the spawned tasks to be detected in the main tasks, and then cause the application to exit. One way to handle that is to return the Err values from the spawned tasks directly, and then pick them up with the JoinHandle that spawn returns. This sounds nice, but naively implemented will result in checking the error responses one at a time. Instead, we’d rather fail early, by detecting that (for example) the 57th request failed and immediately terminating the application.

You could do some kind of a “tell me which is the first JoinHandle that’s ready,” but it’s not the way I initially implemented it, and some quick Googling indicated you’d have to be careful about which library functions you use. Instead, we’ll try a different approach using an mpsc (multi-producer, single-consumer).

Here’s the basic idea. Let’s pretend there are 100 URLs in the file. We’ll spawn 100 tasks. Each of those tasks will write a single value onto the mpsc channel: a Result<(), Error>. Then, in the main task, we’ll read 100 values off of the channel. If any of them are Err, we exit the program immediately. Otherwise, if we read off 100 Ok values, we exit successfully.

Before we read the file, we don’t know how many lines will be in it. So we’re going to use an unbounded channel. This isn’t generally recommended practice, but it ties in closely with my second complaint above: we’re spawning a separate task for each line in the file instead of doing something more intelligent like a job queue. In other words, if we can safely spawn N tasks, we can safely have an unbounded channel of size N.

Alright, let’s see the code in question!

use std::io::BufRead;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = std::fs::File::open("urls.txt")?;
    let buffile = std::io::BufReader::new(file);

    println!("URL,Status");

    let client = reqwest::Client::new();

    // Create the channel. tx will be the sending side (each spawned task),
    // and rx will be the receiving side (the main task after spawning).
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

    // Keep track of how many lines are in the file, and therefore
    // how many tasks we spawned
    let mut count = 0;

    for line in buffile.lines() {
        let line = line?;

        let client = client.clone();
        // Each spawned task gets its own copy of tx
        let tx = tx.clone();
        tokio::spawn(async move {
            // Use a map to say: if the request went through
            // successfully, then print it. Otherwise:
            // keep the error
            let msg = client.get(&line).send().await.map(|resp| {
                println!("{},{}", line, resp.status().as_u16());
            });
            // And send the message to the channel. We ignore errors here.
            // An error during sending would mean that the receiving side
            // is already closed, which would indicate either programmer
            // error, or that our application is shutting down because
            // another task generated an error.
            tx.send(msg).unwrap();
        });

        // Increase the count of spawned tasks
        count += 1;
    }

    // Drop the sending side, so that we get a None when
    // calling rx.recv() one final time. This allows us to
    // test some extra assertions below
    std::mem::drop(tx);

    let mut i = 0;
    loop {
        match rx.recv().await {
            // All senders are gone, which must mean that
            // we're at the end of our loop
            None => {
                assert_eq!(i, count);
                break Ok(());
            }
            // Something finished successfully, make sure
            // that we haven't reached the final item yet
            Some(Ok(())) => {
                assert!(i < count);
            }
            // Oops, an error! Time to exit!
            Some(Err(e)) => {
                assert!(i < count);
                return Err(From::from(e));
            }
        }
        i += 1;
    }
}

With this in place, we now have a proper concurrent program that does error handling correctly. Nifty! Before we hit the job queue, let’s clean this up a bit.

Workers

The previous code works well. It allows us to spawn multiple worker tasks, and then wait for all of them to complete, handling errors when they occur. Let’s generalize this! We’re doing this now since it will make the final step in this blog post much easier.

We’ll put all of the code for this in a separate module of our project. The code will be mostly the same as what we had before, except we’ll have a nice struct to hold onto our data, and we’ll be more explicit about the error type. Put this code into src/workers.rs:

use is_type::Is; // fun trick, we'll look at it below
use std::future::Future;
use tokio::sync::mpsc;

/// Spawn and then run workers to completion, handling errors
pub struct Workers<E> {
    count: usize,
    tx: mpsc::UnboundedSender<Result<(), E>>,
    rx: mpsc::UnboundedReceiver<Result<(), E>>,
}

impl<E: Send + 'static> Workers<E> {
    /// Create a new Workers value
    pub fn new() -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        Workers { count: 0, tx, rx }
    }

    /// Spawn a new task to run inside this Workers
    pub fn spawn<T>(&mut self, task: T)
    where
        // Make sure we can run the task
        T: Future + Send + 'static,
        // And a weird trick: make sure that the output
        // from the task is Result<(), E>
        // Equality constraints would make this much nicer
        // See: https://github.com/rust-lang/rust/issues/20041
        T::Output: Is<Type = Result<(), E>>,
    {
        // Get a new copy of the send side
        let tx = self.tx.clone();
        // Spawn a new task
        tokio::spawn(async move {
            // Run the provided task and get its result
            let res = task.await;
            // Send the task to the channel
            // This should never fail, so we panic if something goes wrong
            match tx.send(res.into_val()) {
                Ok(()) => (),
                // could use .unwrap, but that would require Debug constraint
                Err(_) => panic!("Impossible happend! tx.send failed"),
            }
        });
        // One more worker to wait for
        self.count += 1;
    }

    /// Finish running all of the workers, exiting when the first one errors or all of them complete
    pub async fn run(mut self) -> Result<(), E> {
        // Make sure we don't wait for ourself here
        std::mem::drop(self.tx);
        // How many workers have completed?
        let mut i = 0;

        loop {
            match self.rx.recv().await {
                None => {
                    assert_eq!(i, self.count);
                    break Ok(());
                }
                Some(Ok(())) => {
                    assert!(i < self.count);
                }
                Some(Err(e)) => {
                    assert!(i < self.count);
                    return Err(e);
                }
            }
            i += 1;
        }
    }
}

Now in src/main.rs, we’re going to get to focus on just our business logic… and error handling. Have a look at the new contents:

// Indicate that we have another module
mod workers;

use std::io::BufRead;

/// Create a new error type to handle the two ways errors can happen.
#[derive(Debug)]
enum AppError {
    IO(std::io::Error),
    Reqwest(reqwest::Error),
}

// And now implement some boilerplate From impls to support ? syntax
impl From<std::io::Error> for AppError {
    fn from(e: std::io::Error) -> Self {
        AppError::IO(e)
    }
}

impl From<reqwest::Error> for AppError {
    fn from(e: reqwest::Error) -> Self {
        AppError::Reqwest(e)
    }
}

#[tokio::main]
async fn main() -> Result<(), AppError> {
    let file = std::fs::File::open("urls.txt")?;
    let buffile = std::io::BufReader::new(file);

    println!("URL,Status");

    let client = reqwest::Client::new();
    let mut workers = workers::Workers::new();

    for line in buffile.lines() {
        let line = line?;
        let client = client.clone();
        // Use workers.spawn, and no longer worry about results
        // ? works just fine inside!
        workers.spawn(async move {
            let resp = client.get(&line).send().await?;
            println!("{},{}", line, resp.status().as_u16());
            Ok(())
        })
    }

    // Wait for the workers to complete
    workers.run().await
}

There’s more noise around error handling, but overall the code is easier to understand. Now that we have that out of the way, we’re finally ready to tackle the last piece of this…

Job queue

Let’s review again at a high level how we do error handling with workers. We set up a channel to allow each worker task to send its results to a single receiver, the main task. We used mpsc, or “multi-producer single-consumer.” That matches up with what we just described, right?

OK, a job queue is kind of similar. We want to have a single task that reads lines from the file and feeds them into a channel. Then, we want multiple workers to read values from the channel. This is “single-producer multi-consumer.” Unfortunately, tokio doesn’t provide such a channel out of the box. After I asked on Twitter, I was recommended to use async-channel, which provides a “multi-producer multi-consumer.” That works for us!

Thanks to our work before with the Workers struct refactor, this is now pretty easy. Let’s have a look at the modified main function:

#[tokio::main]
async fn main() -> Result<(), AppError> {
    let file = std::fs::File::open("urls.txt")?;
    let buffile = std::io::BufReader::new(file);

    println!("URL,Status");

    // Feel free to define to any numnber (> 0) you want
    // At a value of 4, this could comfortably fit in OS threads
    // But tasks are certainly up to the challenge, and will scale
    // up more nicely for large numbers and more complex applications
    const WORKERS: usize = 4;
    let client = reqwest::Client::new();
    let mut workers = workers::Workers::new();
    // Buffers double the size of the number of workers are common
    let (tx, rx) = async_channel::bounded(WORKERS * 2);

    // Spawn the task to fill up the queue
    workers.spawn(async move {
        for line in buffile.lines() {
            let line = line?;
            tx.send(line).await.unwrap();
        }
        Ok(())
    });

    // Spawn off the individual workers
    for _ in 0..WORKERS {
        let client = client.clone();
        let rx = rx.clone();
        workers.spawn(async move {
            loop {
                match rx.recv().await {
                    // uses Err to represent a closed channel due to tx being dropped
                    Err(_) => break Ok(()),
                    Ok(line) => {
                        let resp = client.get(&line).send().await?;
                        println!("{},{}", line, resp.status().as_u16());
                    }
                }
            }
        })
    }

    // Wait for the workers to complete
    workers.run().await
}

And just like that, we have a concurrent job queue! It’s everything we could have wanted!

Conclusion

I’ll admit, when I wrote the post last week, I didn’t think I’d be going this deep into the topic. But once I started playing with solutions, I decided I wanted to implement a full job queue for this.

I hope you found this topic interesting! If you want more Rust content, please hit me up on Twitter. Also, feel free to check out some of our other Rust content:

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged