FP Complete


I’ve played around with various web server libraries and frameworks in Rust, and found various strengths and weaknesses with them. Most recently, I put together an FP Complete solution called Zehut (which I’ll blog about another time) that needed to combine a web frontend and gRPC server. I used Hyper, Tonic, and a minimal library I put together called routetype. It worked, but I was left underwhelmed. Working directly with Hyper, even with the minimal routetype layer, felt too ad-hoc.

When I recently saw the release of Axum, it seemed to be speaking to many of the needs I had, especially calling out Tonic support. I decided to make an experiment of replacing the direct Hyper+routetype usage I’d used with Axum. Overall the approach works, but (like the routetype work I’d already done) involved some hairy business around the Hyper and Tower APIs.

I’ve been meaning to write some blog post/tutorial/experience report for Hyper+Tower for a while now. So I decided to take this opportunity to step through these four libraries (Tower, Hyper, Axum, and Tonic), with the specific goal in mind of creating hybrid web/gRPC apps. It turned out that there was more information here than I’d anticipated. To make for easier reading, I’ve split this up into a four part blog post series:

  1. Today’s post: overview of Tower
  2. Understanding Hyper, and first experiences with Axum
  3. Demonstration of Tonic for a gRPC client/server
  4. How to combine Axum and Tonic services into a single service

Let’s dive in!

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

What is Tower?

The first stop on our journey is the tower crate. To quote the docs, which state this succinctly:

Tower provides a simple core abstraction, the Service trait, which represents an asynchronous function taking a request and returning either a response or an error. This abstraction can be used to model both clients and servers.

This sounds fairly straightforward. To express it in Haskell syntax, I’d probably say Request -> IO Response, leveraging the fact that IO handles both error handling and asynchronous I/O. But the Service trait is necessarily more complex than that simplified signature:

pub trait Service<Request> {
    type Response;
    type Error;

    // This is what it says in the generated docs
    type Future: Future;

    // But this more informative piece is in the actual source code
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn poll_ready(
        &mut self,
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}

Service is a trait, parameterized on the types of Requests it can handle. There’s nothing specific about HTTP in Tower, so Requests may be lots of different things. And even within Hyper, an HTTP library leveraging Tower, we’ll see that there are at least two different types of Request we care about.

Anyway, two of the associated types here are straightforward: Response and Error. Combining the parameterized Request with Response and Error, we basically have all the information we care about for a Service.

But it’s not all the information Rust cares about. To provide for asynchronous calls, we need to provide a Future. And the compiler needs to know the type of the Future we’ll be returning. This isn’t really useful information to use as a programmer, but there are plenty of pain points already around async code in traits.

And finally, what about those last two methods? They are there to allow the Service itself to be asynchronous. It took me quite a while to fully wrap my head around this. We have two different components of async behavior going on here:

Some of this complexity can be hidden away. For example, instead of giving a concrete type for Future, you can use a trait object (a.k.a. type erasure). Stealing again from the docs, the following is a perfectly valid associated type for Future:

type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

However, this incurs some overhead for dynamic dispatch.

Finally, these two layers of async behavior are often unnecessary. Many times, our server is always ready to handle a new incoming Request. In the wild, you’ll often see code that hard-codes the idea that a service is always ready. To quote from those docs for the final time in this section:

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(()))
}

This isn’t saying that request handling is synchronous in our Service. It’s saying that request acceptance always succeeds immediately.

Going along with the two layers of async handling, there are similarly two layers of error handling. Both accepting the new request may fail, and processing the new request may fail. But as you can see in the code above, it’s possible to hard-code something which always succeeds with Ok(()), which is fairly common for poll_ready. When processing the request itself also cannot fail, using Infallible (and eventually the never type) as the Error associated type is a good call.

Fake web server

That was all relatively abstract, which is part of the problem with understanding Tower (at least for me). Let’s make it more concrete by implementing a fake web server and fake web application. My Cargo.toml file looks like:

[package]
name = "learntower"
version = "0.1.0"
edition = "2018"

[dependencies]
tower = { version = "0.4", features = ["full"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1"

I’ve uploaded the full source code as a Gist, but let’s walk through this example. First we define some helper types to represent HTTP request and response values:

pub struct Request {
    pub path_and_query: String,
    pub headers: HashMap<String, String>,
    pub body: Vec<u8>,
}

#[derive(Debug)]
pub struct Response {
    pub status: u32,
    pub headers: HashMap<String, String>,
    pub body: Vec<u8>,
}

Next we want to define a function, run, which:

The first question is: how do you represent that web application? It’s going to be an implementation of Service, with the Request and Response types being those we defined above. We don’t need to know much about the errors, since we’ll simply print them. These parts are pretty easy:

pub async fn run<App>(mut app: App)
where
    App: Service<crate::http::Request, Response = crate::http::Response>,
    App::Error: std::fmt::Debug,

But there’s one final bound we need to take into account. We want our fake web server to be able to handle requests concurrently. To do that, we’ll use tokio::spawn to create new tasks for handling requests. Therefore, we need to be able to send the request handling to a separate task, which will require bounds of both Send and 'static. There are at least two different ways of handling this:

There are different runtime impacts of making this decision, such as whether the main request accept loop will be blocked or not by the application reporting that it’s not available for requests. I decided to go with the latter approach. So we’ve got one more bound on run:

App::Future: Send + 'static,

The body of run is wrapped inside a loop to allow simulating an infinitely running server. First we sleep for a bit and then generate our new fake request:

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

let req = crate::http::Request {
    path_and_query: "/fake/path?page=1".to_owned(),
    headers: HashMap::new(),
    body: Vec::new(),
};

Next, we use the ready method (from the ServiceExt extension trait) to check whether the service is ready to accept a new request:

let app = match app.ready().await {
    Err(e) => {
        eprintln!("Service not able to accept requests: {:?}", e);
        continue;
    }
    Ok(app) => app,
};

Once we know we can make another request, we get our Future, spawn the task, and then wait for the Future to complete:

let future = app.call(req);
tokio::spawn(async move {
    match future.await {
        Ok(res) => println!("Successful response: {:?}", res),
        Err(e) => eprintln!("Error occurred: {:?}", e),
    }
});

And just like that, we have a fake web server! Now it’s time to implement our fake web application. I’ll call it DemoApp, and give it an atomic counter to make things slightly interesting:

#[derive(Default)]
pub struct DemoApp {
    counter: Arc<AtomicUsize>,
}

Next comes the implementation of Service. The first few bits are relatively easy:

impl tower::Service<crate::http::Request> for DemoApp {
    type Response = crate::http::Response;
    type Error = anyhow::Error;
    #[allow(clippy::type_complexity)]
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    // Still need poll_ready and call
}

Request and Response get set to the types we defined, we’ll use the wonderful anyhow crate’s Error type, and we’ll use a trait object for the Future. We’re going to implement a poll_ready which is always ready for a Request:

fn poll_ready(
    &mut self,
    _cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(())) // always ready to accept a connection
}

And finally we get to our call method. We’re going to implement some logic to increment the counter, fail 25% of the time, and otherwise echo back the request from the user, with an added X-Counter response header. Let’s see it in action:

fn call(&mut self, mut req: crate::http::Request) -> Self::Future {
    let counter = self.counter.clone();
    Box::pin(async move {
        println!("Handling a request for {}", req.path_and_query);
        let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        anyhow::ensure!(counter % 4 != 2, "Failing 25% of the time, just for fun");
        req.headers
            .insert("X-Counter".to_owned(), counter.to_string());
        let res = crate::http::Response {
            status: 200,
            headers: req.headers,
            body: req.body,
        };
        Ok::<_, anyhow::Error>(res)
    })
}

With all that in place, running our fake web app on our fake web server is nice and easy:

#[tokio::main]
async fn main() {
    fakeserver::run(app::DemoApp::default()).await;
}

app_fn

One thing that’s particularly unsatisfying about the code above is how much ceremony it takes to write a web application. I need to create a new data type, provide a Service implementation for it, and futz around with all that Pin<Box<Future>> business to make things line up. The core logic of our DemoApp is buried inside the call method. It would be nice to provide a helper of some kind that lets us define things more easily.

You can check out the full code as a Gist. But let’s talk through it here. We’re going to implement a new helper app_fn function which takes a closure as its argument. That closure will take in a Request value, and then return a Response. But we want to make sure it asynchronously returns the Response. So we’ll need our calls to look something like:

app_fn(|req| async { some_code(req).await })

This app_fn function needs to return a type which provides our Service implementation. Let’s call it AppFn. Putting these two things together, we get:

pub struct AppFn<F> {
    f: F,
}

pub fn app_fn<F, Ret>(f: F) -> AppFn<F>
where
    F: FnMut(crate::http::Request) -> Ret,
    Ret: Future<Output = Result<crate::http::Response, anyhow::Error>>,
{
    AppFn { f }
}

So far, so good. We can see with the bounds on app_fn that we’ll accept a Request and return some Ret type, and Ret must be a Future that produces a Result<Response, Error>. Implementing Service for this isn’t too bad:

impl<F, Ret> tower::Service<crate::http::Request> for AppFn<F>
where
    F: FnMut(crate::http::Request) -> Ret,
    Ret: Future<Output = Result<crate::http::Response, anyhow::Error>>,
{
    type Response = crate::http::Response;
    type Error = anyhow::Error;
    type Future = Ret;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(())) // always ready to accept a connection
    }

    fn call(&mut self, req: crate::http::Request) -> Self::Future {
        (self.f)(req)
    }
}

We have the same bounds as on app_fn, the associated types Response and Error are straightforward, and poll_ready is the same as it was before. The first interesting bit is type Future = Ret;. We previously went the route of a trait object, which was more verbose and less performant. This time, we already have a type, Ret, that represents the Future the caller of our function will be providing. It’s really nice that we get to simply use it here!

The call method leverages the function provided by the caller to produce a new Ret/Future value per incoming request and hand it back to the web server for processing.

And finally, our main function can now embed our application logic inside it as a closure. This looks like:

#[tokio::main]
async fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    fakeserver::run(util::app_fn(move |mut req| {
        // need to clone this from the closure before moving it into the async block
        let counter = counter.clone();
        async move {
            println!("Handling a request for {}", req.path_and_query);
            let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
            anyhow::ensure!(counter % 4 != 2, "Failing 25% of the time, just for fun");
            req.headers
                .insert("X-Counter".to_owned(), counter.to_string());
            let res = crate::http::Response {
                status: 200,
                headers: req.headers,
                body: req.body,
            };
            Ok::<_, anyhow::Error>(res)
        }
    }))
    .await;
}

Side note: the extra clone

From bitter experience, both my own and others I’ve spoken with, that let counter = counter.clone(); above is likely the trickiest piece of the code above. It’s all too easy to write code that looks something like:

let counter = Arc::new(AtomicUsize::new(0));
fakeserver::run(util::app_fn(move |_req| async move {
    let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    Err(anyhow::anyhow!(
        "Just demonstrating the problem, counter is {}",
        counter
    ))
}))
.await;

This looks perfectly reasonable. We move the counter into the closure and then use it. However, the compiler isn’t too happy with us:

error[E0507]: cannot move out of `counter`, a captured variable in an `FnMut` closure
   --> srcmain.rs:96:57
    |
95  |       let counter = Arc::new(AtomicUsize::new(0));
    |           ------- captured outer variable
96  |       fakeserver::run(util::app_fn(move |_req| async move {
    |  _________________________________________________________^
97  | |         let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    | |                       -------
    | |                       |
    | |                       move occurs because `counter` has type `Arc<AtomicUsize>`, which does not implement the `Copy` trait
    | |                       move occurs due to use in generator
98  | |         Err(anyhow::anyhow!(
99  | |             "Just demonstrating the problem, counter is {}",
100 | |             counter
101 | |         ))
102 | |     }))
    | |_____^ move out of `counter` occurs here

It’s a slightly confusing error message. In my opinion, it’s confusing because of the formatting I’ve used. And I’ve used that formatting because (1) rustfmt encourages it, and (2) the Hyper docs encourage it. Let me reformat a bit, and then explain the issue:

let counter = Arc::new(AtomicUsize::new(0));
fakeserver::run(util::app_fn(move |_req| {
    async move {
        let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        Err(anyhow::anyhow!(
            "Just demonstrating the problem, counter is {}",
            counter
        ))
    }
}))

The issue is that, in the argument to app_fn, we have two different control structures:

The issue is that there’s only one counter value. It gets moved first into the closure. That means we can’t use counter again outside the closure, which we don’t try to do. All good. The second thing is that, when that closure is called, the counter value will be moved from the closure into the async move block. That’s also fine, but it’s only fine once. If you try to call the closure a second time, it would fail, because the counter has already been moved. Therefore, this closure is a FnOnce, not a Fn or FnMut.

And that’s the problem here. As we saw above, we need at least a FnMut as our argument to the fake web server. This makes intuitive sense: we will call our application request handling function multiple times, not just once.

The fix for this is to clone the counter inside the closure body, but before moving it into the async move block. That’s easy enough:

fakeserver::run(util::app_fn(move |_req| {
    let counter = counter.clone();
    async move {
        let counter = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        Err(anyhow::anyhow!(
            "Just demonstrating the problem, counter is {}",
            counter
        ))
    }
}))

This is a really subtle point, hopefully this demonstration will help make it clearer.

Connections and requests

There’s a simplification in our fake web server above. A real HTTP workflow starts off with a new connection, and then handles a stream of requests off of that connection. In other words, instead of having just one service, we really need two services:

  1. A service like we have above, which accepts Requests and returns Responses
  2. A service that accepts connection information and returns one of the above services

Again, leaning on some terse Haskell syntax, we’d want:

type InnerService = Request -> IO Response
type OuterService = ConnectionInfo -> IO InnerService

Or, to borrow some beautiful Java terminology, we want to create a service factory which will take some connection information and return a request handling service. Or, to use Tower/Hyper terminology, we have a service, and a make service. Which, if you’ve ever been confused by the Hyper tutorials like I was, may finally explain why “Hello World” requires both a service_fn and make_service_fn call.

Anyway, it’s too detailed to dive into all the changes necessary to the code above to replicate this concept, but I’ve provided a Gist showing an AppFactoryFn.

And with that… we’ve finally played around with fake stuff long enough that we can dive into real life Hyper code. Hurrah!

Next time

Up until this point, we’ve only played with Tower. The next post in this series is available, where we try to understand Hyper and experiment with Axum.

Read part 2 now

If you’re looking for more Rust content from FP Complete, check out:

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged