Implementing pid1 with Rust and async/await.

Posted by Michael Snoyman - 26 November, 2019

Implementing pid1 with Rust and async/await

Implementing pid1 with Rust and async/await

A few years back, I wrote up a detailed blog post on Docker's process 1, orphans, zombies, and signal handling. Please read the gory details if you're interested, but the high level summary is:

  • On Linux, the process with ID 1 is treated specially, since it's typically an init process.
  • Process 1 is responsible for "reaping orphans," or calling waitpid on processes which have died after their parent processes died. (Yes, this sounds really morbid.)
  • Also, process 1 will by default not shut down in response to the interrupt signal, meaning Ctrl-C will not shut down the process.
  • In Docker, due to how it uses cgroups, the process you launch is usually process 1.
  • Instead of rewriting all of your processes to have support for reaping and responding to SIGINT, it's easier to write a separate pid1 executable and use it as your Docker entrypoint.

The solution from three years ago was a Haskell executable providing this functionality and a Docker image based on Ubuntu. I use that image for the base of almost all of my Docker work, and problem solved.

 

Rust Discount CTA

Rewrite it in Rust!

A few of the Haskellers on the FP Complete team have batted around the idea of rewriting pid1 in Rust as an educational exercise, and to have a nice comparison with Haskell. No one got around to it. However, when Rust 1.39 came out with async/await support, I was looking for a good use case to demonstrate, and decided I'd do this with pid1. While the real motivation here is to demonstrate Rust to those curious—especially my Haskell-favoring coworkers—there are some real advantages to Rust over Haskell for this use case:

  • The executables are smaller, which is nice.
  • It's easier to make a Rust static executable than a Haskell one (though the latter is possible). Usually, you need to ensure you have the right libc available.
  • Rust has no runtime and essentially 0 overhead for a situation like this, once the subprocess is launched.
  • Cross-compilation is easier, significantly. This can be great for creating a Docker image on Mac or Windows.

But to reiterate, this was mostly about learning and teaching. So the rest of this post will be about walking through the implementation and explaining some of the interesting points. We'll be hitting topics like:

  • Futures
  • async/.await syntax
  • Unsafe and FFI
  • Error handling

The full code for this is available on Github as pid1-rust-poc. Apologies to my many coworkers who insisted that I rename this to "the grim reaper."

Intended behavior

The program we're writing is intended to be called with a command line invocation such as pid1 command arg1 arg2 arg3. It will then:

  • Parse the command line arguments, exiting with an error if no command name is given.
  • Launch the child process requested.
  • Install a SIGCHLD signal handler, which will indicate that a child or orphan process is ready to be reaped.
  • Install a SIGINT signal handler which will send a SIGINT to the child process. This will make Ctrl-C work.
  • Start a loop that reaps a child each time SIGCHLD occurs.
  • As soon as the direct child exits, pid1 will exit. In the Docker case, this means that when the process launched by the user exits, the Docker container will exit.

There's a slight race condition in the above, since we launch the child process before the signal handlers are installed. I'm leaving that as-is to make the code a bit easier to understand, but feel free to improve this if you're looking for a challenge!

Parse the command

You can get the list of command line arguments as an iterator. This iterator will have the current executable's name as the first value, which we want to ignore. We want to return a pair of the command name and a vector of the rest of the arguments. And if there's no command provided, we'll use a Result to capture the error. Putting that all together, the function looks like this:

fn get_command() -> Result<(String, Vec<String>), Pid1Error> {
    let mut args = std::env::args();
    let _me = args.next();
    match args.next() {
        None => Err(Pid1Error::NoCommandGiven),
        Some(cmd) => Ok((cmd, args.collect())),
    }
}

We have to capture the result of std::env::args() inside a mutable variable, since each subsequent call to next() mutates the value, essentially popping a value off a stack. We're able to ignore the first value, then pattern match on the second value. If it's None, then the command is missing, and we return an Err value.

Otherwise, if there's a Some value, we take that as the command, and collect all of the remaining arguments from args into a Vec. Some interesting things to point out, especially to Haskellers:

  • Rust has sum types, which is refers to as enums. Don't be fooled though: these are fully powered sum types. I personally think the separation of sum types (enums) and product types (structs) in Rust is an improvement over Haskell's data types, but that's a discussion for another time.
  • Pattern matching is beautiful and powerful.
  • Rust does not at all constrain side effects. Calling args.collect() mutates the args value, and is part of a larger expression. This feels foreign to a Haskeller, but is right in line with "normal" programming languages.
  • Even though Rust allows mutation and effects, the actual impact is really nicely constrained here, due to immutability by default. While this function could theoretically "fire the missiles," it behaves in a nice, almost-functional way here.
  • I think the double wrapping of parentheses in Ok((cmd, args)) looks weird, but it's at least logically consistent.
  • We're explicit in our errors in Rust in general, as opposed to using unchecked runtime exceptions. I've spoken about both systems a lot in the past, and my opinion is pretty simple: both systems work, and you should fully embrace whatever your current language is promoting as best practice. In Haskell, I feel fine using unchecked runtime exceptions. In Rust, I have no problem creating enums of error types and propagating explicitly.
    • I didn't show you the definition of Pid1Error yet, I'm saving that for later.

Enough of that, let's move on!

The type of main

Our application needs to be able to handle a few things:

  • If any errors occur, they should propagate out and produce an error message (from the tyep Pid1Error) to the user.
  • We want to use the new async/.await and Futures stuff in Rust 1.39 (we'll see how later).
  • If everything goes OK, we want to just exit gracefully.

We're going to represent all of this with the signature of the main function. This looks like:

async fn main() -> Result<(), Pid1Error>

By returning a Result, we're telling the compiler: if this function produces an Err variant, print an error message to stderr and set the exit code to a failure. By adding async, we're saying: this function may await some stuff. Under the surface, this means that main is actually producing a value that is an instance of Future, but we'll get to that later. For now, the important thing to understand is that, in order to run a function like this, we need some kind of a scheduler to be available.

One option would be to rename main to main_inner, and then write a main function like:

fn main() -> Result<(), Pid1Error> {
    async_std::task::block_on(main_inner())
}

However, there's a crate called async-attributes which let's us do something a little bit slicker:

#[async_attributes::main]
async fn main() -> Result<(), Pid1Error> {
    // all of our code with .await
}

This almost makes Rust feel like a language like Haskell, Go, or Erlang, with a green threaded system just built in. Instead, Rust requires a little more effort for getting this async code, but it's almost entirely userland-code instead of a runtime system. It also means you can easily swap out different schedulers.

Launching and error handling

Inside our main function, we start by calling the get_command function:

let (cmd, args) = get_command()?;

To the uninitiated, two questions may pop up:

  1. I thought that function returns a Result value, why does it look like it's returning a pair?
  2. What's that question mark?

Perhaps unsurprisingly, one of these answers the other. The question mark can be added to any expression in Rust to ease error handling. The exact details are more complicated than this, but the above code essentially converts to:

let (cmd, args) = match get_command() {
    Ok(pair) => pair,
    Err(e) => return Err(e),
};

In other words, if the value is an Ok, it continues the current function with that value. Otherwise, it exits this function, propagating the error value itself. Pretty nice for a single character! Explicit error handling without much noise.

The next line is a little more interesting:

let child = std::process::Command::new(cmd).args(args).spawn()?.id();

We create a new command with the cmd value, set its argument to args, and then spawn the process. Spawning may fail, so it returns a Result. We're able to put the ? in the middle of the expression, and then continue chaining additional method calls. That's really slick, and composes very nicely with the .await syntax we'll see in a bit.

However, there's one curious bit here: spawn() doesn't use Pid1Error for indicating something went wrong. Instead, it uses std::io::Error. So how does the std::io::Error become a Pid1Error? There's a special trait (like a typeclass in Haskell, or interface in Java) called From in Rust. And now we can look at our definition of Pid1Error and the implementation of the From trait:

#[derive(Debug)]
enum Pid1Error {
    IOError(std::io::Error),
    NoCommandGiven,
    ChildPidTooBig(u32, std::num::TryFromIntError),
}

impl std::convert::From<std::io::Error> for Pid1Error {
    fn from(e: std::io::Error) -> Self {
        Pid1Error::IOError(e)
    }
}

It's not necessary to be this verbose; there are helper crates available providing helper attributes for more easily deriving this trait implementation. But I still prefer being verbose, and don't mind a bit of boilerplate like this.

Converting to pid_t

The child value we got above is of type u32, meaning "unsigned 32-bit integer." This is a reasonable representation for a child PID, since they cannot be negative. However, in libc the type pid_t is represented as a signed integer: type pid_t = i32. The reason for this distinction isn't documented, but it makes sense: libc has some functions that use negative values for special cases, like sending signals to entire process groups. We'll see one of those later.

Anyway, casting from a u32 to a i32 may fail. Languages like C and even Haskell encourage unchecked casting. But the default way to do this in Rust is more explicit:

use std::convert::TryInto;
let child: libc::pid_t = match child.try_into() {
    Ok(x) => x,
    Err(e) => return Err(Pid1Error::ChildPidTooBig(child, e)),
};

The TryInto trait defines a method try_into() which we want to use. In Rust, you need to use a trait to have its methods available. Fortunately, the compiler is smart about this and provides helpful error messages. Then we pattern match on the Result and return a Pid1Error::ChildPidToBig variant if the conversion fails.

You may be wondering why we used this pattern matching instead of ?. With the right From implementation, ? would work just fine. However, if you want to include additional context with your error, like the value we were trying to convert, you need to do a bit more work like above. Alternatively, you can play with the map_err method.

Filicide

Now that we know the process ID of the child, we can install a signal handler to capture any incoming SIGINTs, and send a signal ourselves to the child. Let's start with the callback that will actually send the SIGINT along.

let interrupt_child = move || {
    unsafe {
        libc::kill(child, libc::SIGINT); // ignoring errors
    }
};

Let's start from the inside out. libc::kill is a direct FFI call to the C library's kill function, which is how you send signals. We pass in the child PID and the signal we want to send. This function can result in an error result, and ideally we would handle that correctly in Rust. But we're just ignoring such errors here.

Moving out, the next thing we see is unsafe. The FFI calls to libc are all marked as unsafe. You can read more about unsafe in the Rust book.

Next, we see this weird || { ... } syntax. The pipes are used for defining a lambda/closure. We could put a comma-separated list of arguments inside the pipes, but we don't have any. Since we're trying to create a callback that will be used later, some kind of lambda is necessary.

Finally, the move. Inside our lambda, we refer to the child variable, which is defined outside of the closure. This variable is captured in the closure's environment. By default, this is captured via a borrow. This gets us into lifetime issues, where the lifetime of the closure itself must be less than or equal to the lifetime of child itself. Otherwise, we'd end up with a closure which refers to a piece of memory that's no longer being maintained.

move changes this, and causes the child value to instead be moved into the environment of the closure, making the closure the new owner of the value. Normally in Rust, this would mean that child can no longer be used in the original context, since it's been moved. However, there's something special about child: it's an i32 value, which has an implementation of Copy. That means that the compiler will automatically create a copy (or clone) of the value when needed.

OK! Now that we have our callback, we're going to use the really helpful signal-hook crate to install a handler for SIGINTs:

let sigid: signal_hook::SigId =
    unsafe { signal_hook::register(signal_hook::SIGINT, interrupt_child)? };

This register call is also unsafe, so we have an unsafe block. We pass in both SIGINT and the interrupt_child callback. And we stick a question mark at the end in case this fails; if so, our whole program will exit, which seems reasonable. We capture the resulting sigid so we can unregister this handler later. It's honestly not really necessary in a program like this, but why not.

The rest of our main function looks like this:

// something about handling the reaping of zombies...

signal_hook::unregister(sigid);
Ok(())

This unregisters the handler and then uses Ok(()) to indicate that everything went fine. Now we just need to deal with that reaping business.

Futures, Streams, signals, tasks and wakers

The last thing we need to do is reap the orphans in a loop, stopping when the direct child we spawned itself exits. Using the libc blocking waitpid call, this would actually work just fine as a normal loop with blocking system calls. Since our pid1 program doesn't have anything else to do, the blocking calls will not tie up an otherwise-useful system thread.

However, the goal of this exercise is to use the new async/.await syntax and Futures, and to use only non-blocking calls. So that's what we're going to do! To do this, we're going to need to talk about tasks. A task is similar to a thread, but is implemented in pure Rust using cooperative multithreading. Instead of the OS scheduling things, with tasks:

  • There's a scheduler inside a Rust library, such as async-std or tokio
  • Tasks define their work in terms of the Future trait (which we'll get to in a bit)
  • The async/.await syntax provides a much more user-friendly interface versus the raw Futures stuff
  • Tasks are able to indicate that they are waiting for something else to be ready, in which case
    • They don't tie up an OS thread blocking
    • The scheduler will wake up the task when the data is ready

We want the ability to "block" until a new child thread has died. Our application will be notified of this by the SIGCHLD signal. We then want to be able to generate a Stream of values indicating when a child process has died. A Stream is a slight extension of a Future which allows multiple values to be produced instead of just a single value. To represent this, we have a Zombies struct:

struct Zombies {
    sigid: signal_hook::SigId,
    waker: Arc<Mutex<(usize, Option<Waker>)>>,
}

This holds onto the SigId generating when we register the callback action, the same as we had from the SIGINT above. It also has a waker field. This waker follows the common pattern of Arc (atomic reference counted) around a Mutex around some data. This allows for reading and writing data from multiple threads with explicit locking, thereby avoiding race conditions. Rust is very good at using the type system itself to avoid many race conditions. For example, try replacing the Arc with an Rc (non-atomic reference counted) and see what happens.

Within out Arc<Mutex<...>>, we are storing a pair of values:

  • A usize, which is the number of zombies that still need to be reaped. Each time we get a SIGCHLD, we want to increment it. Each time we return a value from our Stream, we want to decrement it.
  • An Option<Waker>. This is how we tie into the task system.
    • When we are inside our task and ask for a zombie, we'll check the usize.
      • If it's greater than 0, we'll decrement it and keep going.
      • If it's 0, then we want to go to sleep until a new SIGCHLD arrives, and then get woken up. In that case, we'll set the Option<Waker> to the Waker for the current task.
    • When we receive a SIGCHLD, we'll first increment the usize, and then check if there's a value inside Option<Waker>. If present, we'll trigger it.

OK, enough talking about code. Let's look at the implementation of Zombies.

New Zombies

Within our impl Zombies { ... }, we define a new function. This is not an async function. It will do its work synchronously, and return once everything is set up. First we're going to create our Arc<Mutex<...>> bit and make a clone of it for a callback function:

let waker = Arc::new(Mutex::new((0, None)));
let waker_clone = waker.clone();

Next, the callback function, which should be called each time we get a SIGCHLD. Remember our goal: to increment the counter and call the waker if present.

let handler = move || {
    let mut guard = waker_clone.lock().unwrap();
    let pair: &mut (usize, Option<Waker>) = &mut guard;
    pair.0 += 1;
    match pair.1.take() {
        None => (),
        Some(waker) => waker.wake(),
    }
};

We use a move closure to capture the waker_clone. Unlike previously with the usize child value, our Arc<Mutex<...>> is not a Copy, so we need to explicit make our clone. Next, we lock the mutex. The lock may fail, which we handle with unwrap(). This will cause a panic. Generally that's not recommended, but if taking a lock on a mutex fails, it means you have a fundamental flaw in your program. Once we have a MutexGuard, we can use it to get a mutable reference to the pair of the count and the waker.

Incrementing the count is easy enough. So is calling waker.wake(). However, we first have to call take() to get the value inside the Option and pattern match. This also replaces the waker with a None, so that the same Waker will not be triggered a second time.

By the way, if you're looking to code golf this, you can get functional with a call to map:

pair.1.take().map(|waker| waker.wake());

But personally I prefer the explicit pattern matching. Maybe it's my Haskeller ways that make me uncomfortable about performing actions inside a map, who knows.

Finally, we can finish off the new function by registering the handler and returning a Zombies value with the Arc<Mutex<...>> and the new signal ID.

let sigid = unsafe { signal_hook::register(signal_hook::SIGCHLD, handler)? };
Ok(Zombies { waker, sigid })

Dropping Zombies

When we're done with the Zombies value, we'd like to restore the original signal handler for SIGCHLD. For our application, it doesn't actually make a difference, but it may be better in general. In any event, an implementation of Drop is easy enough:

impl Drop for Zombies {
    fn drop(&mut self) {
        signal_hook::unregister(self.sigid);
    }
}

Streaming

In order to work with the async system, we need something Future-like. As mentioned though, instead of producing a single value, we'll produce a stream of values to indicate "there's a new zombie to reap." To handle that, we'll instead use the Stream trait.

There's no additional information available each time a zombie is available, so we'll use a () unit value to represent the zombie. We could define a new struct, or perhaps do something fancy where we capture the time when the signal is received. But none of that is necessary. Here's the beginning of our trait implementation:

impl Stream for Zombies {
    type Item = ();
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
        unimplemented!()
    }
}

We have an associated type Item set to (). Our poll_next receives a pinned, mutable reference to the Zombies value itself, as well as a mutable reference to the Context of the task that requested a value. We'll return a Poll<Option<()>>, which can be one of three values:

  • Poll::Ready(Some()) means "There's a zombie waiting for you right now."
  • Poll::Pending means "There isn't a zombie waiting right now, but there will be in the future."
  • Poll::Ready(None) means "I know that there will be no more zombies." In reality, this case can never occur for us, and therefore we'll never produce that value.

Now let's look at the implementation. The first thing we're going to do is lock the waker and see if there's a waiting zombie:

let mut guard = self.waker.lock().unwrap();
let pair = &mut guard;
if pair.0 > 0 {
    // there's a waiting zombie
} else {
    // there isn't a waiting zombie
}

In the waiting zombie case (pair.0 > 0), we want to decrement the counter and then return our Poll::Ready(Some(())). Easy enough:

pair.0 -= 1;
Poll::Ready(Some(()))

And when there isn't a waiting zombie, we want to set the Waker to our current task's Waker (discovered via the Context), and then return a Poll::Pending:

pair.1 = Some(cx.waker().clone());
Poll::Pending

And that's it, we can now produce a stream of zombies! (Sounds like a good time to move to Hollywood, right?)

Reaping

We want to now consume that stream of zombies, reaping them in the process. We want to do this until our direct child process exits. Some information about how the system calls work for reaping:

  • There's a waitpid syscall we're going to use
  • If you tell it to reap the special process -1, it will reap any process available.
  • If you give it the WNOHANG option, it will be a non-blocking system call, returning a 0 if nothing is available to reap or -1 if there is an error.
  • It takes an additional mutable pointer to return status information, which we don't care about.
  • If it actually reaped a process, it will return the ID of that process.

Let's create our infinite loop of waiting forever for zombies:

while let Some(()) = self.next().await {
    // time to reap
}

panic!("Zombies should never end!");

The Stream trait doesn't represent the possibility of an infinite stream, so we need to do two things:

  1. Pattern match in the while using let Some(()).
  2. Add a panic! (or just an Ok(())) after the loop to handle the case that the compiler thinks can happen: that self.next().await will return a None.

Let's go back to that .await bit though. This is the real magic of the new async/.await syntax in Rust 1.39. .await can be appended to any expression which contains an impl Future. Under the surface, the compiler is converting this into callback-laden code. From prior experience, writing that code manually is at best tedious, especially:

  • when you have to deal with the borrow checker
  • when you have some kind of looping

However, as you can see here, the code is trivial to write, read, and explain. This is a huge usability improvement for Rust. There's another incremental improvement I can potentially see here:

async for () in self {
  ...
}

But that's a minor improvement, and would require standardizing the Stream trait. I'm more than happy with the code above.

Each step of that loop, we need to call waitpid and check its result. We have four cases:

  • It's the child we're waiting on: exit the function.
  • It's a different child: ignore.
  • It's a 0, indicating there wasn't a waiting zombie: that's a program error, because we already received a SIGCHLD.
  • It's a negative value, indicating the system call failed. Time to error out.

You can slice up the error handling differently, and decide to use panic!ing differently than I have, but here's my implementation:

let mut status = 0;
let pid = unsafe { libc::waitpid(-1, &mut status, libc::WNOHANG) };
if pid == till {
    return Ok(());
} else if pid == 0 {
    panic!("Impossible: I thought there was something to reap but there wasn't");
} else {
    return Err(Pid1Error::WaitpidFailed(pid));
}

Back to main

And finally, to tie it all together, let's see what the complete end of our main function looks like, including the zombie reaping code:

let sigid: signal_hook::SigId =
    unsafe { signal_hook::register(signal_hook::SIGINT, interrupt_child)? };

Zombies::new()?.reap_till(child).await?;

signal_hook::unregister(sigid);
Ok(())

And with that, we have a non-blocking, interrupt driven, user friendly grim reaper.

Conclusion

That was a detailed walkthrough of a fairly simple program. Hopefully the takeaway, however, was how simple it was to make this happen. I believe the async/.await syntax is a real game changer for Rust. While I've strongly believed in a green threaded runtimes for concurrency applications in the past, such a powerful system giving safety guarantees is very appealing. I look forward to using this in anger and comparing against both my previous tokio callback-ridden code, as well as Haskell I would write.

If you want to hear more about my endeavors here, please let me know this is a topic you're interested in. You can ping me on Twitter @snoyberg.

Also, if you want to hear more from FP Complete about Rust, please considering signing up for our mailing list.

Signup for our Rust mailing list

Are you interested to hear more about commercial services at FP Complete around software, DevOps, training, and consulting? Contact us for a free consultation with one of our engineers.

Free engineering consultation with FP Complete

Get your Free Rust Assessment Now!

Join The Rust Mailing List

Topics: rust, rust programming language, dockers, pid1, init process, orphans


Recent Posts

Devops FedRAMP Compliance and making your migration to govcloud successful

read more

DevOps Security and Privacy Strategies

read more

Implementing pid1 with Rust and async/await

read more

BlockChain Success Program Enrollment

Any content could go in here.

×