FP Complete


This is the second of four posts in a series on combining web and gRPC services into a single service using Tower, Hyper, Axum, and Tonic. The full four parts are:

  1. Overview of Tower
  2. Today’s post: Understanding Hyper, and first experiences with Axum
  3. Demonstration of Tonic for a gRPC client/server
  4. How to combine Axum and Tonic services into a single service

I recommend checking out the first post in the series if you haven’t already.

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Quick recap

With that in mind, let’s look at Hyper.

Services in Hyper

Now that we’ve got Tower under our belts a bit, it’s time to dive into the specific world of Hyper. Much of what we saw above will apply directly to Hyper. But Hyper has a few additional curveballs to deal with:

In place of the run function we had in our previous fake server example, Hyper follows a builder pattern for initializing HTTP servers. After providing configuration values, you create an active Server value from your Builder with the serve method. Just to get it out of the way now, this is the type signature of serve from the public docs:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

That’s a lot of requirements, and not all of them are clear from the docs. Hopefully we can bring some clarity to this. But for now, let’s start off with something simpler: the “Hello world” example from the Hyper homepage:

use std::{convert::Infallible, net::SocketAddr};
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};

async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok(Response::new("Hello, World!".into()))
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));

    let make_svc = make_service_fn(|_conn| async {
        Ok::<_, Infallible>(service_fn(handle))
    });

    let server = Server::bind(&addr).serve(make_svc);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

This follows the same pattern we established above:

Using this level of abstraction for writing a normal web app is painful for (at least) three different reasons:

So we’ll be more than happy to move on from Hyper to Axum a bit later. But for now, let’s continue exploring things at the Hyper layer.

Bypassing service_fn and make_service_fn

What I found most helpful when trying to grok Hyper was implementing a simple app without service_fn and make_service_fn. So let’s go through that ourselves here. We’re going to create a simple counter app (I’m nothing if not predictable). We’ll need two different data types: one for the “app factory”, and one for the app itself. Let’s start with the app itself:

struct DemoApp {
    counter: Arc<AtomicUsize>,
}

impl Service<Request<Body>> for DemoApp {
    type Response = Response<Body>;
    type Error = hyper::http::Error;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, _req: Request<Body>) -> Self::Future {
        let counter = self.counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        let res = Response::builder()
            .status(200)
            .header("Content-Type", "text/plain; charset=utf-8")
            .body(format!("Counter is at: {}", counter).into());
        std::future::ready(res)
    }
}

This implementation uses the std::future::Ready struct to create a Future which is immediately ready. In other words, our application doesn’t perform any async actions. I’ve set the Error associated type to hyper::http::Error. This error would be generated if, for example, you provided invalid strings to the header method call, such as non-ASCII characters. As we’ve seen multiple times, poll_ready just advertises that it’s always ready to handle another request.

The implementation of DemoAppFactory isn’t terribly different:

struct DemoAppFactory {
    counter: Arc<AtomicUsize>,
}

impl Service<&AddrStream> for DemoAppFactory {
    type Response = DemoApp;
    type Error = Infallible;
    type Future = Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, conn: &AddrStream) -> Self::Future {
        println!("Accepting a new connection from {:?}", conn);
        std::future::ready(Ok(DemoApp {
            counter: self.counter.clone()
        }))
    }
}

We have a different parameter to Service, this time &AddrStream. I did initially find the naming here confusing. In Tower, a Service takes some Request. And with our DemoApp, the Request it takes is a Hyper Request<Body>. But in the case of DemoAppFactory, the Request it’s taking is a &AddrStream. Keep in mind that a Service is really just a generalization of failable, async functions from input to output. The input may be a Request<Body>, or may be a &AddrStream, or something else entirely.

Similarly, the “response” here isn’t an HTTP response, but a DemoApp. I again find it easier to use the terms “input” and “output” to avoid the name overloading of request and response.

Finally, our main function looks much the same as the original from the “Hello world” example:

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let factory = DemoAppFactory {
        counter: Arc::new(AtomicUsize::new(0)),
    };

    let server = Server::bind(&addr).serve(factory);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

If you’re looking to extend your understanding here, I’d recommend extending this example to perform some async actions within the app. How would you modify Future? If you use a trait object, how exactly do you pin?

But now it’s time to take a dive into a topic I’ve avoided for a while.

Understanding the traits

Let’s refresh our memory from above on the signature of serve:

pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
    I: Accept,
    I::Error: Into<Box<dyn StdError + Send + Sync>>,
    I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    S: MakeServiceRef<I::Conn, Body, ResBody = B>,
    S::Error: Into<Box<dyn StdError + Send + Sync>>,
    B: HttpBody + 'static,
    B::Error: Into<Box<dyn StdError + Send + Sync>>,
    E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
    E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,

Up until preparing this blog post, I have never tried to take a deep dive into understanding all of these bounds. So this will be an adventure for us all! (And perhaps it should end up with some documentation PRs by me…) Let’s start off with the type variables. Altogether, we have four: two on the impl block itself, and two on this method:

I: Accept

I needs to implement the Accept trait, which represents the ability to accept a new connection from some a source. The only implementation out of the box is for AddrIncoming, which can be created from a SocketAddr. And in fact, that’s exactly what Server::bind does.

Accept has two associated types. Error must be something that can be converted into an error object, or Into<Box<dyn StdError + Send + Sync>>. This is the requirement of (almost?) every associated error type we look at, so from now on I’ll just skip over them. We need to be able to convert whatever error happened into a uniform representation.

The Conn associated type represents an individual connection. In the case of AddrIncoming, the associated type is AddrStream. This type must implement AsyncRead and AsyncWrite for communication, Send and 'static so it can be sent to different threads, and Unpin. The requirement for Unpin bubbles up from deeper in the stack, and I honestly don’t know what drives it.

S: MakeServiceRef

MakeServiceRef is one of those traits that doesn’t appear in the public documentation. This seems to be intentional. Reading the source:

Just a sort-of “trait alias” of MakeService, not to be implemented by anyone, only used as bounds.

Were you confused as to why we were receiving a reference with &AddrStream? This is the trait that powers that transformation. Overall, the trait bound S: MakeServiceRef<I::Conn, Body, ResBody = B> means:

And while we’re talking about it: that ResBody has the restriction that it must implement HttpBody. As you might guess, the Body struct mentioned above implements HttpBody. There are a number of implementations too. When we get to Tonic and gRPC, we’ll see that there are, in fact, other response bodies we have to deal with.

NewSvcExec and ConnStreamExec

The default value for the E parameter is Exec, which does not appear in the generated docs. But of course you can find it in the source. The concept of Exec is to specify how tasks are spawned off. By default, it leverages tokio::spawn.

I’m not entirely certain of how all of these plays out, but I believe the two traits in the heading allow for different handling of spawning for the connection service (app factory) versus the request service (app).

Using Axum

Axum is the new web framework that kicked off this whole blog post. Instead of dealing directly with Hyper like we did above, let’s reimplement our counter web service using Axum. We’ll be using axum = "0.2". The crate docs provide a great overview of Axum, and I’m not going to try to replicate that information here. Instead, here’s my rewritten code. We’ll analyze a few key pieces below:

use axum::extract::Extension;
use axum::handler::get;
use axum::{AddExtensionLayer, Router};
use hyper::{HeaderMap, Server, StatusCode};
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

#[derive(Clone, Default)]
struct AppState {
    counter: Arc<AtomicUsize>,
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([0, 0, 0, 0], 3000));

    let app = Router::new()
        .route("/", get(home))
        .layer(AddExtensionLayer::new(AppState::default()));

    let server = Server::bind(&addr).serve(app.into_make_service());

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

async fn home(state: Extension<AppState>) -> (StatusCode, HeaderMap, String) {
    let counter = state
        .counter
        .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
    let mut headers = HeaderMap::new();
    headers.insert("Content-Type", "text/plain; charset=utf-8".parse().unwrap());
    let body = format!("Counter is at: {}", counter);
    (StatusCode::OK, headers, body)
}

The first thing I’d like to get out of the way is this whole AddExtensionLayer/Extension bit. This is how we’re managing shared state within our application. It’s not directly relevant to our overall analysis of Tower and Hyper, so I’ll suffice with a link to the docs demonstrating how this works. Interestingly, you may notice that this implementation relies on middlewares, which does in fact leverage Tower, so it’s not completely separate.

Anyway, back to our point at hand. Within our main function, we’re now using this Router concept to build up our application:

let app = Router::new()
    .route("/", get(home))
    .layer(AddExtensionLayer::new(AppState::default()));

This says, essentially, “please call the home function when you receive a request for /, and add a middleware that does that whole extension thing.” The home function uses an extractor to get the AppState, and returns a value of type (StatusCode, HeaderMap, String) to represent the response. In Axum, any implementation of the appropriately named IntoResponse trait can be returned from handler functions.

Anyway, our app value is now a Router. But a Router cannot be directly run by Hyper. Instead, we need to convert it into a MakeService (a.k.a. an app factory). Fortunately, that’s easy: we call app.into_make_service(). Let’s look at that method’s signature:

impl<S> Router<S> {
    pub fn into_make_service(self) -> IntoMakeService<S>
    where
        S: Clone;
}

And going down the rabbit hole a bit further:

pub struct IntoMakeService<S> { /* fields omitted */ }

impl<S: Clone, T> Service<T> for IntoMakeService<S> {
    type Response = S;
    type Error = Infallible;
    // other stuff omitted
}

The type Router<S> is a value that can produce a service of type S. IntoMakeService<S> will take some kind of connection info, T, and produce that service S asynchronously. And since Error is Infallible, we know it can’t fail. But as much as we say “asynchronously”, looking at the implementation of Service for IntoMakeService, we see a familiar pattern:

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(()))
}

fn call(&mut self, _target: T) -> Self::Future {
    future::MakeRouteServiceFuture {
        future: ready(Ok(self.service.clone())),
    }
}

Also, notice how that T value for connection info doesn’t actually have any bounds or other information. IntoMakeService just throws away the connection information. (If you need it for some reason, see into_make_service_with_connect_info.) In other words:

So where does that S type come from? It’s built up by all the route and layer calls you make. For example, check out the get function’s signature:

pub fn get<H, B, T>(handler: H) -> OnMethod<H, B, T, EmptyRouter>
where
    H: Handler<B, T>,

pub struct OnMethod<H, B, T, F> { /* fields omitted */ }

impl<H, B, T, F> Service<Request<B>> for OnMethod<H, B, T, F>
where
    H: Handler<B, T>,
    F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone,
    B: Send + 'static,
{
    type Response = Response<BoxBody>;
    type Error = Infallible;
    // and more stuff
}

get returns an OnMethod value. And OnMethod is a Service that takes a Request<B> and returns a Response<BoxBody>. There’s some funny business at play regarding the representations of bodies, which we’ll eventually dive into a bit more. But with our newfound understanding of Tower and Hyper, the types at play here are no longer inscrutable. In fact, they may even be scrutable!

And one final note on the example above. Axum works directly with a lot of the Hyper machinery. And that includes the Server type. While the axum crate reexports many things from Hyper, you can use those types directly from Hyper instead if so desired. In other words, Axum is pretty close to the underlying libraries, simply providing some convenience on top. It’s one of the reasons I’m pretty excited to get a bit deeper into my experiments with Axum.

So to sum up at this point:

Next step on our journey: let’s look at another library for building Hyper services. We’ll follow up on this in our next post.

Read part 3 now

If you’re looking for more Rust content from FP Complete, check out:

Subscribe to our blog via email

Email subscriptions come from our Atom feed and are handled by Blogtrottr. You will only receive notifications of blog posts, and can unsubscribe any time.

Tagged