FP Complete


Let’s write a simple tokio-powered program that will download the contents of an HTTP response body using reqwest and print it to stdout. We’ll take the URL to download on the command line using clap. This might look something like the following:


  use anyhow::Result;
  use clap::Parser;
  #[derive(Parser)]
  struct Opt {
    url: String,
  }

  #[tokio::main]
  async fn main() -> Result<()> {
    let Opt { url } = Opt::parse();
    let body = reqwest::get(url).await?.text().await?;
    println!("{body}");
    Ok(())
  }

All good, but let’s (arguably) improve our program by extracting the logic for the download-and-print to a helper function:


  #[tokio::main]
  async fn main() -> Result<()> {
    let Opt { url } = Opt::parse();
    let body = reqwest::get(url).await?.text().await?;
    println!("{body}");
    Ok(())
  }

  async fn download_and_print(url: &str) -> Result<()> {
    let body = reqwest::get(url).await?.text().await?;
    println!("{body}");
    Ok(())
  }

I’ve followed general best practices here and taken the url as a string slice instead of an owned string. Now, it’s really easy to extend this program to support multiple URLs:

 
  #[derive(clap::Parser)]
  struct Opt {
    urls: Vec<String>,
  }

  #[tokio::main]
  async fn main() -> Result<()> {
    let Opt { urls } = Opt::parse();
    for url in urls {
      download_and_print(&url).await?;
    }
    Ok(())
  }

But now, let’s kick it up a notch and introduce some parallelism. We’re going to use a JoinSet to allow us to spawn off a separate task per URL provided and wait on all of them returning. If anything fails along the way, we’ll exit the entire program and abort ongoing activities.

 
  #[tokio::main]
  async fn main() -> Result<()> {
    let Opt { urls } = Opt::parse();
    let mut set = tokio::task::JoinSet::new();

    for url in urls {
      set.spawn(download_and_print(&url));
    }

    while let Some(result) = set.join_next().await {
      match result {
        Ok(Ok(())) => (),
        Ok(Err(e)) => {
          set.abort_all();
          return Err(e);
        }
        Err(e) => {
          set.abort_all();
          return Err(e.into());
        }
      }
    }
    Ok(())
  }

While the parallelism going on here is OK, the spawning of the new tasks themselves fails:

error[E0597]: 'url' does not live long enough
 --> src/main.rs:15:38
     |
14   |          for url in urls {
     |              --- binding 'url' declared here
15   |            set.spawn(download_and_print(&url));
     |                       ----------------------^^^^_
     |                     |                      |
     |                     |                      borrowed value does not live long enough     
     |                     argument requires that 'url' is borrowed for `'static'
16   |         }
     |         - `url` dropped here while still borrowed

This is a common failure mode in async (and, for that matter, multithreaded) Rust. The issue is that the String we want to pass is owned by the main task, and we’re trying to pass a reference to it with no guarantee that the main task will outlive the child task. You might argue that the main task will always outlive all other tasks, but (1) there’s no static proof of that within the code, and (2) it’s entirely possible to slightly refactor this program so that the spawning occurs in a subtask instead.

The question is: how do you fix this compile time error

We’ll explore a few options.

TAKE A STRING

Arguably the simplest solution is to change the type of the download_and_print function so that it takes an owned String instead of a reference:


  async fn download_and_print(url: String) -> Result<()> {
    let body = reqwest::get(url).await?.text().await?;
    println!("{body}");
    Ok(())
  }

Now, at the call site, we’re no longer borrowing a reference to the main task’s String. Instead, we pass in the entire owned value, transferring ownership to the newly spawned task:

 
  for url in urls {
      // Note the lack of & here!
      set.spawn(download_and_print(url));
  }

On the one hand, this feels dirty. We’re violating best practices and taking an owned String where one isn’t needed. However, this may be considered a small price to pay for the code simply working. However, if the download_and_print“ function will be used in other parts of the code base where passing a reference will work fine, forcing an owned String“ will cause an unnecessary allocation for those use cases, and we may want to look for a better solution.

ADJUST THE CALLSITE WITH ASYNC MOVE

Another possibility is to leave our download_and_print function as-is taking a reference, and modify our call site as follows:
 
  for url in urls {
    set.spawn(async move { download_and_print(&url).await });
  }

By introducing an async move block, what we’ve done is created a new Future that will be passed to set.spawn. That new Future itself owns the String, not the main task. Therefore, borrowing a reference to url and passing it to download_and_print works just fine.

This is a great solution when you’re using a library function that you cannot modify, or when most of your code doesn’t run into this lifetime issue. But it can be a bit tedious to have to rewrite code in this way.

IMPL ASREF

Our final approach today will be to modify the function to accept a more general url type:

  async fn download_and_print(url: impl AsRef<str>) -> Result<()> {
    let body = reqwest::get(url.as_ref()).await?.text().await?;
    println!("{body}");
   Ok(())
  }

This type means “I’ll accept anything that can be converted into a &str.” This will work for an owned String as well as a string slice, leaving the decision entirely to the caller. If we leave our call site as passing in a reference, we’ll still get the lifetime error above. But if instead we pass in url directly, our program once again works.

This is the approach I’d probably recommend in general. It takes a bit of practice to get used to these impl AsRef parameters, but the payoff is worth it in my opinion.

IMPROVEMENTS

The code above is not perfect. I’m sure others will find other limitations, but two things that jump out at me are:

  1. Instead of using reqwest::get, we should be creating a single reqwest::Client and sharing it throughout the application.
  2. For a large number of incoming URLs, we wouldn’t want to spawn a separate task per URL, but instead have a fixed number of workers and have them all pop work items from a shared queue. This would help with avoiding rate limiting from servers and from overwhelming our application. But the number of URLs we’d have to be requesting would need to be pretty high to run into either of these issues in practice.

Fortunately, both of these are relatively easy to implement thanks to the simplicity of the JoinSet API:

 
  use anyhow::Result;
  use async_channel::Receiver; 
  use clap::Parser;

  #[derive(Parser)]
  struct Opt {
    urls: Vec<String>,
    #[clap(long, default_value_t = 8)]
    workers: usize,
  }

  #[tokio::main]
  async fn main() -> Result<()> {
    let Opt { urls, workers } = Opt::parse();
    let mut set = tokio::task::JoinSet::new();
    let client = reqwest::Client::new();
    let (send, recv) = async_channel::bounded(workers * 2);

    set.spawn(async move {
      for url in urls {
        send.send(url).await?;
      }
      Ok(())
    });

    for _ in 0..workers {
      set.spawn(worker(client.clone(), recv.clone()));
    }

    while let Some(result) = set.join_next().await {
      match result {
        Ok(Ok(())) => (),
        Ok(Err(e)) => {
          set.abort_all();
          return Err(e);
        }
        Err(e) => {
          set.abort_all();
          return Err(e.into());
        }
      }
    }
     
    Ok(())
  }

  async fn worker(client: reqwest::Client, recv: Receiver<String>) -> Result<()> {
    while let Ok(url) = recv.recv().await {
      download_and_print(&client, &url).await?;
    }
    Ok(())
  }
    
  async fn download_and_print(client: &reqwest::Client, url: impl AsRef<str>) -> Result<()> {
    let body = client.get(url.as_ref()).send().await?.text().await?;
    println!("{body}");
    Ok(())
  }

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged