FP Complete


First there was cooperative multiprocessing. Then there were processes. An operating system could run multiple processes, each performing a series of sequential, blocking actions. Then came threads. A single processes could spawn off multiple threads, each performing its own series of sequential, blocking actions. (And really, the story starts earlier, with hardware interrupts and the like, but hopefully you’ll forgive a little simplification.)

Sitting around and waiting for stuff? Ain’t nobody got time for that. Spawning threads at the operating system level? That’s too costly for a lot of what we do these days.

Perhaps the first foray into asynchronous programming that really hit the mainstream was the Nginx web server, which boasted a huge increase in throughput by natively using asynchronous I/O system calls. Some programming languages, like Go, Erlang, and Haskell, built runtimes systems that support spawning cheap green threads, and handle the muck of asynchronous system calls for you under the surface. Other languages, such as Javascript and more recently Rust, provide explicit asynchronous support in the language.

Given how everyone seems to be bending over backwards to make it easy for you to make your code async-friendly, it would be fair to assume that all code at all times should be async. And it would also be fair to guess that, if you stick the word async on a function, it’s completely asynchronous. Unfortunately, neither of these assumptions are true. This post is intended to dive into this topic, in Rust, using a simple bit of code for motivation.

Prior knowledge This post will assume that you are familiar with the Rust programming language, as well as its async/.await syntax. If you’d like to brush up on either language basics or async code, I’d recommend checking out FP Complete’s Rust Crash Course.

Update I’ve published a follow up to this post covering a more sophisticated HTTP client example.

Blocking vs non-blocking calls

Just to make sure we’re on the same page, I’m going to define here what a blocking versus non-blocking call is and explain how this impacts async vs sync. If you’re already highly experienced with async programming, you can probably skip this section.

For the most part, “async code” means code that relies on non-blocking, rather than blocking, system calls for performing I/O. By contrast, sync (or synchronous) code relies on blocking system calls. As a simple example, consider a web server that has 20 open sockets from web clients, and needs to read data from all of them. One approach would be to use the blocking recv system call. This will:

If you follow this approach, and you have the aforementioned 20 connections, you essentially have two choices:

  1. Have a single thread handle each of the connections one at a time
  2. Spawn 20 separate operating system threads, and let each of them handle a single connection

(1) would be an abysmal client experience. If a slow client gets in line with a connection, you could easily end up waiting a long time to make your request. (Imagine going to a supermarket with a single checkout line, no self checkout, and the person at the front of the line is paying in coins.) (2) is much better, but spawning off those operating system threads is a relatively costly activity, ultimately.

Both of these approaches are synchronous. By contrast, an asynchronous approach could handle all 20 connections in a single operating system thread, with a basic approach of:

Writing code like this manually can be fairly complicated, which is why many languages have added either async syntax or some kind of green thread based runtime. And it seems overall that this simply makes your program better. But let’s test those ideas out in practice.

Count by lines

Let’s write a simple, synchronous, single threaded, blocking program in Rust. It will take all of the lines in a file (hard-coded to input.txt), and print to standard output the number of characters on each line. It will exit the program on any errors. The program is pretty straightforward:

// We want to use the lines method from this trait
use std::io::BufRead;

// Let's us use ? for simple error handling
fn main() -> Result<(), std::io::Error> {
    // Try to open the file
    let file = std::fs::File::open("input.txt")?;
    // Create a buffered version of the file so we can use lines
    let buffered = std::io::BufReader::new(file);

    // Iterate through each line in the file
    for line in buffered.lines() {
        // But we get a Result each time, get rid of the errors
        let line = line?;
        // And print out the line length and content
        println!("{} {}", line.len(), line);
    }

    // Everything went fine, so we return Ok
    Ok(())
}

Recommendation I encourage readers to start playing along with this code themselves. Assuming you have installed Rust, you can run cargo new asyncpost and then copy-paste the code above into src/main.rs. Then add in the input.txt file here:

Hello world
Hope you have a great day!
Goodbye

And if you run cargo run, you should get the expected output of:

11 Hello world
26 Hope you have a great day!
7 Goodbye

I won’t comment on running the code yourself any more, but I recommend you keep updating the code and running cargo run throughout reading this post.

Anyway, back to async code. The code above is completely synchronous. Every I/O action will fully block the main (and only) thread in our program. To be crystal clear, let’s see all the places this is relevant (ignoring error cases):

  1. Opening the file makes a blocking open system call
  2. As we iterate through the lines, the BufRead trait will implicitly be triggering multiple read system calls, which block waiting for data to be available from the file descriptor
  3. The println! macro will make write system calls on the stdout file descriptor, each of which is a blocking call
  4. Finally, when the file is dropped, the close system call to close the file descriptor

NOTE I’m using POSIX system call terms here, things may be slightly different on some operating systems, and radically different on Windows. That shouldn’t take away from the main thrust of the message here.

Make it async!

The most straightforward way to write asynchronous programs in Rust is to use async/await syntax. Let’s naively try simply converting our main function into something async:

async fn main() -> Result<(), std::io::Error> {
    // code inside is unchanged
}

That’s going to fail (I did say naively). The reason is that you can’t simply run an async function like main. Instead, you need to provide an executor that knows how to handle all of the work there. The most popular library for this is tokio, and it provides a nice convenience macro to make this really easy. First, let’s modify our Cargo.toml file to add the tokio dependency, together with all optional features turned on (it will be convenient later):

[dependencies]
tokio = { version = "0.2.22", features = ["full"] }

And then we stick the appropriate macro in front of our main function:

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    // unchanged
}

And just like that, we have an asynchronous version of our program! Thank you very much everyone, have a great day, I’ll see you later.

“But wait!”

“But wait!” you may be saying. “How does Rust automatically know to rewrite all of those blocking, synchronous system calls into asynchronous, non-blocking ones just by sticking the async keyword on there???”

Answer: it doesn’t. The cake is a lie.

This is the first message I want to bring home. The async keyword allows for some special syntax (we’ll see it in a little bit). It makes it much easier to write asynchronous programs. It relies on having some kind of an executor like tokio around to actually run things. But that’s all it does. It does not provide any kind of asynchronous system call support. You’ve got to do that on your own.

Fortunately for us, tokio does bring this to the table. Instead of using the std crate’s versions of I/O functions, we’ll instead lean on tokio‘s implementation. I’m now going to follow one of my favorite development techniques, “change the code and ask the compiler for help.” Let’s dive in!

The first synchronous call we make is to open the file with std::fs::File::open. Fortunately for us, tokio provides a replacement for this method via its replacement File struct. We can simply swap out std with tokio and get the line:

let file = tokio::fs::File::open("input.txt")?;

Unfortunately, that’s not going to compile, not even a little bit:

error[E0277]: the `?` operator can only be applied to values that implement `std::ops::Try`
 --> srcmain.rs:8:16
  |
8 |     let file = tokio::fs::File::open("input.txt")?;
  |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |                |
  |                the `?` operator cannot be applied to type `impl std::future::Future`
  |                help: consider using `.await` here: `tokio::fs::File::open("input.txt").await?`
  |
  = help: the trait `std::ops::Try` is not implemented for `impl std::future::Future`
  = note: required by `std::ops::Try::into_result`

(To the observant among you: yes, I’m compiling this on Windows.)

So what exactly does this mean? The open method from std returned a Result to represent “this may have had an error.” And we stuck a ? after the open call to say “hey, if there was an error, please exit this function with that error.” But tokio‘s open isn’t returning a Result. Instead, it’s returning a Future. This value represents a promise that, at some point in the future, we’ll get back a Result.

But I want the Result now! How do I force my program to wait for it? Easy: .await. The closer-to-compiling version of our code is:

let file = tokio::fs::File::open("input.txt").await?;

Now we’re saying to the compiler:

  1. Open the file
  2. Wait for the Future to complete to tell me the file result is ready (via .await)
  3. Then, if there was an error, exit this function (via ?)

And we end up with the file variable holding a tokio::fs::File struct. Awesome!

At this point, our code still doesn’t compile, since there’s a mismatch between the std and tokio sets of traits. If you want to have some fun, try to fix the code yourself. But I’ll just show you the completed version here:

// Replaces std::io::BufRead trait
use tokio::io::AsyncBufReadExt;
// We can't generally use normal Iterators with async code
// Instead, we use Streams
use tokio::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
    let file = tokio::fs::File::open("input.txt").await?;
    let buffered = tokio::io::BufReader::new(file);

    // Since we can't use a for loop, we'll manually
    // create our Stream of lines
    let mut lines = buffered.lines();

    // Now keep popping off another, waiting for each
    // I/O action to complete via .await
    while let Some(line) = lines.next().await {
        // Error handling again
        let line = line?;
        // And print out the line length and content
        println!("{} {}", line.len(), line);
    }

    Ok(())
}

It’s wordier, but it gets the job done. And now we have a fully asynchronous version of our program… right?

Well, no, not really. I mentioned four blocking I/O calls above: open, read (from the file), write (to stdout), and close. By switching to tokio, open and read are now using async versions of these system calls. close is a bit more complicated, since it happens implicitly when dropping a value. Ultimately, this code falls back to the same close system call, since it uses std‘s implementation of File under the surface. But for our purposes, let’s pretend like this doesn’t actually block.

No, the more interesting thing remaining is the println! macro’s usage of write. This is still fully blocking I/O. And what I find informative is that it’s sitting in the middle of a fully asynchronous while loop leveraging .await! This hopefully drives home another one of the core messages I mentioned above: being async isn’t a binary on/off. You can have programs that are more or less asynchronous, depending on how many of the system calls get replaced.

We’ll get to replacing println! at the end of this post, but I want to point out something somewhat striking first. If you look at the examples in the tokio docs (such as the tokio::io module), you’ll notice that they’re using println! themselves. Why would a library for async I/O use blocking calls?!?

It’s not always worth it

Let’s return to our web server with 20 connections. Let’s pretend we wrote a half-async version of that web server. It uses non-blocking I/O for reading data from the clients. That means our web server will not need 20 independent threads, and it will know when data is available. However, like our program above, it’s going to produce output (data sent back to the clients) using blocking I/O calls. How will this affect our web server’s behavior?

Well, it’s better than the worst case we described above. We won’t block the entire server because one client is sending a really slow request. We’ll be able to wait until a client fully sends its request before we put together our response and send it back. However, since we’re using a blocking call to send the data back, if that same slow client is also slow at receiving data, we’ll be back to square one with a laggy server. And it won’t just slow down sending of responses. We’ll end up blocking all I/O, such as accepting new connections and receiving on existing connections.

But let’s pretend, just for a moment, that we know that each and every one of these clients has a super fast receive rate. The blocking send calls we make will always complete in something insanely fast like a nanosecond. Would we care that they’re blocking calls? No, probably not. I don’t mind blocking a system thread for one nanosecond. I care about having long and possibly indeterminate blocking I/O calls.

The situation with stdout is closer to this. It’s generally a safe assumption that outputting data to stdout is only going to block for a short duration of time. And therefore, most people think using println! is a fine thing to do in async code, and it’s not worth rewriting to something more complex. Taking this a step further: many things we don’t think of as blocking may, in fact block. For example, reading and writing memory that is memory mapped to files (via mmap) may involve blocking I/O. Generally, it’s impossible to expunge all traces of blocking behavior in a program.

But this “it’s not always worth it” goes much deeper. Let’s review our program above. With the new async I/O calls, our program is going to:

  1. Make a non-blocking system call to open a file descriptor
  2. Block (via .await) until the file is open
  3. In a loop:
    1. Read data from the descriptor with a non-blocking system call
    2. Block (via .await) for a complete line to be read
    3. Make a blocking call to write to output data to stdout
  4. Make a blocking close system call

Did moving from blocking to non-blocking calls help us at all? Absolutely not! Our program is inherently single threaded and sequential. We were previously blocking our main thread inside open and read system calls. Now we’re blocking that same thread waiting for the non-blocking equivalents to complete.

Blocking I/O with more steps

So just because we can make something asynchronous, doesn’t mean it’s always better code. By changing our program above, we’ve made it significantly more complex, added extra dependencies, and almost certainly slower to run.

Conclusion

To sum up what we covered in this post:

I wanted to cover a more complex example of async in this post, but it’s already on the long side. Instead, I’ll follow up in a later post with a program that makes HTTP requests and checks their status codes. I’ll update this post with a link when it’s ready. Stay tuned!

Update And here’s that updated blog post! HTTP status codes with async Rust

And finally…

Appendix: non-blocking output

I promised you I’d end with an example of replacing println! with non-blocking I/O. Remember, this isn’t something I’m generally recommending you do. But it’s informative to see it in action.

The simplest way to do this is to use format! to create a String with the content you want to output, and then use tokio::io::stdout(). (Note that this forces a heap allocation of a String, something that doesn’t occur with println! usage.) This looks like:

// Use the trait
use tokio::io::AsyncWriteExt;

// same code as before

let mut stdout = tokio::io::stdout();
while let Some(line) = lines.next().await {
    let line = line?;
    // Note the extra newline character!
    let s = format!("{} {}n", line.len(), line);
    stdout.write_all(s.as_bytes()).await?;
}

This overall looks safe, and in our program will perform correctly. However, generally, there are problems with this code. From the docs on stdout

In particular you should be aware that writes using write_all are not guaranteed to occur as a single write, so multiple threads writing data with write_all may result in interleaved output.

Since our program is sequential anyway, we don’t need to worry about multiple threads creating interleaved output. But in general, that would be a concern. One approach to address that would be to create a channel of messages to be sent to stdout.

But the most popular solution would be to simply use println! and similar macros.

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged