Take the 2017 Haskell Survey

Asynchronous API in C++ and the Continuation Monad.

Posted by Bartosz Milewski - 09 April, 2012

The latest C++11 Standard was a brave attempt, after many years of neglect, at catching up with the reality of concurrent programming. The result is a new memory model -- a solid foundation for concurrency -- and a collection of random concurrency features, many of them either obsolete or inadequate. But there is hope on the horizon: the next C++ Standard, on which the work started even before C++11 saw the light of day. In my last blog I reported on the first meeting of the C++ study group on concurrency and parallelism that took place in Bellevue, WA.
Erik Meijer Erik Meijer
One of the hot topics at that meeting was building better support for asynchronous calls, either through libraries or new language extensions. The C# group dazzled us with their very mature solution. Honestly, I am impressed with their work. And I know why they are good at what they do -- they know their functional programming. They have several Haskell experts either on their team or on call. And they have Erik Meijer within shouting distance.
I've been trying to popularize functional programming (FP) among C++ programmers, and I know that some people decided to study Haskell after reading my blogs or listening to my talks (here's a good online tutorial, in case you want to join their ranks). If C++ is to enter the era of modern concurrency, parallelism, and asynchronicity, it must embrace both the theory and the practice of FP -- just like the C# community did to their great advantage.
This blog is based on a talk I gave at the C++ Now! conference. I will first explain the problem with asynchronous interfaces and then show how it can be approached using FP, both in Haskell and in C++, where I'll be using template metaprogramming (TMP). I realize that TMP is a very awkward tool and should probably be avoided. The purpose of this post, however, is to shed some light on, and provide some language to talk about the implementation of asynchronous interfaces in C++. Understanding the big picture might help in providing a unified and extensible approach to multithreading, parallelism, and asynchronicity in future C++ Standards. We don't want to end up with three different incompatible solutions for these three problems.

Inversion of Control


The main problem with asynchronous interfaces is that they lead to inversion of control. What does it mean? In good old-fashioned imperative code we would call a blocking API and, when it returns, we would continue our business. Granted, while the call is blocked the application is not responsive, but at least the code is simple to write and maintain.
A call to an asynchronous API, on the other hand, returns immediately, which is good for latency. However, if we need the result of the API (say, file contents, or an HTTP request), we can't have it immediately. We have to register a callback with the API. A callback is a function that the API will call with the result, as soon as one is available. This programming construct, where it's an external agent that's calling our function, is called inversion of control (IoC).
Unlike in the imperative version, IoC leads to splitting the code into separate parts: The part up to the API call is in one function, the code after it is in another (the handler). Things get aggravated if state is involved, e.g., the same handler behaves differently depending on previous interactions. In that case we are dealing with a state machine whose implementation is scattered among several functions and (often global) data structures. It's a maintenance nightmare.
Notice that inversion of control breaks the principles of imperative programming, in which statements are executed in the order they appear in the program, subject only to simple control structures such as conditionals or loops. Instead, with IoC, we are entering the area of reactive programming, were the program reacts to external events. Interestingly, functional languages such as Haskell have been able to cast reactive programming into the mold of imperative programming (yes, imperative programming) with the help of monads. Don't worry, this post is not a YAMT (yet another monad tutorial), although I will explain some of the basics as we go.

The Continuation Framework


Handlers in asynchronous API programming are an example of a more general idea of continuations. A continuation reifies the concept of "the rest of the computation." In particular, when we call an async API and pass it a handler, we ask it to perform the rest of the computation at a later time. The handler is the continuation.
The interesting part is that continuations are composable. This is very important if your program has to juggle multiple calls to async APIs. In the language of handlers, you have to nest async APIs within handlers. This requires the creation of new handlers, and the picture very quickly gets impossibly complicated.
Functional languages know how to deal with this mess, so let's see what we can learn from them. The basic element in the continuation game is a type of a function that, instead of returning its result, passes it to another function. I'll call this type a Continuator. Here's the definition of the data type Continuator in Haskell (C++ code will follow, but you really want to see those ideas first expressed clearly in Haskell). A Continuator is an object that encapsulates a function . Think of this function as an async API.
newtype Continuator r a = CTR ((a->r)->r)
For those readers who are still in the planning stages of learning Haskell, here's the explanation of the syntax: newtype introduces a new type -- sort of like struct in C++. The name of the type is Continuator, and it is parameterized by two type variables, r and a. In C++ we'll have to use a template.
The type a describes the value produced by the continuator -- the value that is passed to the continuation. The type r describes the return type of the continuator (and the continuation). On the right hand of the equal sign there's a constructor, I call it CTR, and it takes a function of type ((a->r)->r).
Functions in Haskell have the type t->u, meaning a function taking an argument of type t and returning a value of type u. In this case, the argument to our function is also a function -- the continuation, a->r ; hence the signature ((a->r)->r).
When we want to run a continuator, we have to give it a continuation. Here's the Haskell code to do that:
andThen (CTR action) k = action k

Again, here's the play-by-play: andThen is a function that takes a continuator, (CTR action), and a continuation, k.
It's usual in Haskell to pattern-match an argument to get to its contents. In this case we pattern match the constructor CTR of the continuator to extract the function, action. We then call action with k. (Haskell newcomers are often confused by the function call syntax which doesn't use parentheses around the argument list. But after working with Haskell, even just a little, you start feeling that the C++ use of parentheses is weird.)
Here's the more or less equivalent C++ code:
template<class R, class A>
struct Continuator {
    virtual ~Continuator() {}
    virtual R andThen(function<R(A)> k) {}
};

I simplified matters a bit by not passing a continuator function ((a->r)->r) to the contstructor of Continuator (that would be necessary if I wanted to abstract this pattern further). The logic of that function will be embedded directly in the implementation of andThen, as you'll see in the following examples.
Now suppose that there is an asynchronous API with the following signature:
void asyncApi(function<void(string)> handler)
(A toy implementation of it is provided in the Appendix.)
This is how we would encapsulate it in a continuator object, AsyncApi:
struct AsyncApi : Continuator<void, string> {
    void andThen(function<void(string)> k) {
        asyncApi(k);
    }
};
Here the continuation k becomes the handler for the async API. We have established a connection between asynchronicity and continuations.
The way to understand a continuator in the context of async API is to see it as the encapsulation of a value that's not there yet. What can we do with a value that doesn't exist? Well, whatever it is, we can do it inside the continuation.
Here's how you could use AsyncApi -- with the continuation in the form of a lambda function:
AsyncApi callApi;
callApi.andThen([](string s)
{
    cout << s << endl;
});

In C#, andThen is called ContinueWith and, in Microsoft PPL, a task has a method called then. You read this code in a very imperative way as: Call API and then print the result. Yet in reality this code hides inversion of control.

The Continuation Monad


Real fun begins when you want to compose asynchronous APIs. For instance, you might want to open a file and, if it succeeds, read its contents, possibly in multiple chunks -- all using async APIs. You could express this logic with typical imperative control structures, but only if you can afford to block on each API call. Otherwise you have to deal with inversion of control.
The problem is that imperative languages come with a fixed set of control structures, which normally don't include support for IoC. Haskell, on the other hand, is great at letting the programmer create new control structures. IoC is just another such structure, and Haskell lets you abstract it. And, while I'm explaining this, I'll also translate it into C++.
To make the discussion more concrete, suppose that we want to call our async API in a loop -- the output of one iteration being the input to the next one. A loop is one of these imperative control structures that don't fit very well in the IoC model. However, we know how functional languages deal with iteration -- they turn it into recursion. So the solution to this particular problem would be to call the API with a handler that calls the API again and passes itself as a handler. (Now combine this with error checking and state, and you've created a monster.)
It turns out that, in Haskell, all you need in order to abstract the IoC control structures is to define two higher order functions.
The first function is called return and is defined like this:
return x = CTR (\k -> k x)
It takes a value, x, of arbitrary type, and creates a trivial continuator for it. It does it using the constructor CTR with a lambda:
\k -> k x
The backslash is the closest ASCII character to the Greek lambda, λ. Here, the lambda takes a continuation k, and calls k with x as the argument. Again, pay attention to the function call syntax.
Not to torture non-Haskell speakers, here's the C++ equivalent:
template<class R, class A>
struct Return : Continuator<R, A> {
    Return(A x) : _x(x) {}
    R andThen(function<R(A)> k) {
        return k(_x);
    }
    A _x;
};
The logic is inside andThen, which calls the continuation k with the value that was used to construct the Continuator. Notice: this has nothing to do with asynchrony, but it lets you uniformly mix regular values with the ones that are not available yet. You'll see soon why you'd want to do it.
The second function is called "bind" and it lets you compose two calculations represented by continuators. Remember, continuators are values that haven't materialized yet. A calculation may return such a "special" value in the form of a continuator (for instance a file handle from an asynchronous open call). We want to somehow pass this special value to another calculation that does something with it and returns some other special value (for instance, it could asynchronously read a chunk from the file). By using this kind of glue we'll be able to work with non-existing values as if they were right there.
In Haskell, "bind" is represented by an infix operator >>=. Bind composes an arbitrary continuator ktor with the "rest of the calculation," represented here by the function rest. This "rest of the calculation" can only be executed once the value encapsulated in ktor becomes available. So rest, too, must produce a continuator -- a promise to eventually yield a value.
The binding of ktor and rest results in yet another continuator. (Isn't that obvious? It can't produce an actual value from an input that's only a promise of a value.)
Just from this general description it's relatively easy to figure out the outer structure of >>=:
ktor >>= rest = CTR (\k -> ...)

Indeed, binding ktor with rest must produce a continuator, so we use the constructor CTR, and pass it the appropriate function.
The devil is in the lambda. We have to somehow extract the value from ktor and pass it to rest. The only way to extract a value from a Continuator is to construct a new continuation and run the continuator's andThen. Let's call this new continuation k2:
ktor >>= rest = CTR (\k -> andThen ktor k2)

Now k2 will be called with the actual value produced by ktor. We want to pass this value, a, to rest. So k2 must be a function that takes a and calls rest with it:
\a -> rest a

But rest a produces a continuator too. We will have to run it at some point, which means we need a continuation to retrieve its result. Well, that's the outer continuation k, which we were given to absorb the final value:
\a -> andThen (rest a) k

Putting it all together:
ktor >>= rest = CTR (\k -> andThen ktor (\a -> andThen (rest a) k))

Do not be alarmed if your brain feels slightly overheated. It was just an easy exercise to prepare you for the C++ version:
template<class R, class A, class C>
struct Bind : Continuator<R, A>
{
    Bind(C & ktor, function<unique_ptr<Continuator>(A)> rest)
        : _ktor(ktor), _rest(rest)
    {}
    void andThen(function<R(A)> k)
    {
        function<unique_ptr<Continuator>(A)> rest = _rest;
        function<R(A)> lambda = [k, rest](A a) 
        {
            return rest(a)->andThen(k); 
        };
        _ktor.andThen(lambda);
    }
    C _ktor;
    function<unique_ptr<Continuator>(A)> _rest;
};

I have marked in red the parts that have their counterpart in Haskell. Haskell has type inference and garbage collection, which helps to keep its code reasonably sized. Also C++ lambda functions are much less powerfull than the Haskell ones, and they have to be converted to std::function to fit the C++ type system.
And now it's time for full disclosure: What I have just described is a monad -- the continuation monad to be precise. It looks more monadic in Haskell, where you can abstract the monadic nature, but a C++ implementation will do in a pinch. Now let's see how we can write some monadic code using Return and Bind.

Looping with Continuations


Remember what I said about functional languages turning IoC into imperative code? In Haskell, with any monad, you can use special "do" notation, which has a distinct imperative feel to it. For instance, the earlier problem of calling an async operation in a loop might be implemented like this:
loop s = do
   t <- async
   loop t

This is a bit silly, since the function loop doesn't do anything with its argument and it runs forever, but just look at the structure of it. It looks pretty much like imperative synchronous code. You call async, which produces t and you call loop again with t. The key to understanding this code is to realize that the function async does not produce a value, it produces a Continuator. But inside the do block, t is just a regular value. We can manipulate it, do arithmetic on it, concatenate it, etc., depending on its type. But the "do" notation is nothing but syntactic sugar over monadic bind. The function loop can in fact be desugared down to:
loop s =
   async >>= \t -> loop t
Here, the lambda \t->loop t represents our "rest of the computation." The "do" version is arguably more readable.
Unfortunately, we don't get much sugar in C++, so we'll have to use Bind explicitly. Here's the Loop object with some debugging output added:
struct Loop : Continuator<void, string>
{
    Loop(string s) : _s(s) {}

void andThen(function<void(string)> k) { cout << "Loop::andThen: " <<_s << endl; Bind<void, string, AsyncApi>(AsyncApi(), [](string t) { return unique_ptr<Continuator> (new Loop(t)); }).andThen(k); } string _s; };

As before, the parts corresponding to Haskell code are in red. This code doesn't look as imperative as its Haskell counterpart, but one thing is worth observing: The inversion of control is somewhat masked by the fact that the code using the result of the asynchronous call follows the call itself, rather than being relegated to a global handler function. (You can see the results of running this code in the Appendix.)
A small comment: Even though the C++ solution corresponds to a recursive function in Haskell, it isn't actually recursive. The Loop is not calling itself -- it creates a new Loop object. You can run this code under the debugger and see that the program's stack is not growing. Neither is heap space for that matter -- the Loop objects are constantly recycled.
But there's more. We can combine asynchrony with other control structures. Here, for instance, is a loop that executes only n times.
struct LoopN : Continuator<void, string>
{
    LoopN(string s, int n) : _s(s), _n(n) {}

void andThen(function<void(string)> k) { cout << "[LoopN::andThen] " <<_s << " " << _n << endl; int n = _n; Bind<void, string, AsyncApi>(AsyncApi(), [n](string s) -> unique_ptr<Continuator> { if (n > 0) return unique_ptr<Continuator>( new LoopN(s, n - 1)); else return unique_ptr<Continuator> ( new Return<void, string>("Done!")); }).andThen(k); } string _s; int _n; };

This example uses the polymorphism of the Continuator by alternatively returning a LoopN object or the Return object (encapsulated inside unique_ptr). (Again, the test code for this example is given in the Appendix.)
If you were to implement the same functionality with handlers, you'd have to build a state machine, where n would be part of the state.

Combinators


The binding of continuators reflects data dependencies in your code: The "rest of the calculation" cannot proceed until the preceding continuator produces a result. A general dataflow graph can also contain confluences, nodes that need more than one input in order to fire. Such a confluence could be implemented using a join operation, which requires waiting for two or more continuators. But the usual thread join is a blocking operation -- something we are trying to avoid.
There is a more general mechanism though, relying on the use of combinators. A combinator takes two or more continuators and produces a new compound continuator. We can then use that continuator to continue growing the graph of dependencies. Separate branches of such a graph can then run in parallel, limited only by data dependencies. And this is the ultimate goal of parallelism -- to imitate a dataflow computer.
To make this discussion more concrete, here's a simple example of an "and" combinator; one that combines the results of two continuators. The use of continuations makes such constructions reasonably easy.
struct And : Continuator<void, string>
{
    And(unique_ptr<Continuator<void, string>> & ktor1, 
        unique_ptr<Continuator<void, string>> & ktor2)
    : _ktor1(move(ktor1)), _ktor2(move(ktor2))
    {}
    void andThen(function<void(pair<string, string>)> k)
    {
        _ktor1->andThen([this, k](string s1)
        {
            lock_guard<mutex> l(_mtx);
            if (_s2.empty())
                _s1 = s1;
            else
                k(make_pair(s1, _s2));
        });
        _ktor2->andThen([this, k](string s2)
        {
            lock_guard<mutex> l(_mtx);
            if (_s1.empty())
                _s2 = s2;
            else
                k(make_pair(_s1, s2));
        });
    }
    mutex _mtx;
    string _s1;
    string _s2;
    unique_ptr<Continuator<void, string>> _ktor1;
    unique_ptr<Continuator<void, string>> _ktor2;
};

The two output strings serve also as state in this elementary state machine -- they are tested using empty by the two continuations. Notice also the use of a mutex to protect shared output when continuations are run on separate threads.
Here's a simple test for our And combinator.
unique_ptr<Continuator<void, string>> api1(new AsyncApi("A"));
unique_ptr<Continuator<void, string>> api2(new AsyncApi("B"));

And and(api1, api2); and.andThen([](pair<string, string> ss) { cout << ss.first + ", " + ss.second << std::endl; });

In a real application you could pass the and object to another Bind, and continue the game.

Conclusion


The syntax of C++ makes the use of the continuation monad awkward, to say the least. I don't seriously expect anybody to use it as-is. However, realizing that there is a well developed theory for transforming IoC code into more imperative form might help avoid mistakes in the design of the C++ language. Essentially, the discussion should concentrate on designing appropriate syntactic sugar to make this style of programming accessible to everybody. That's exactly the direction that C# has taken.
But the strongest advantage of monadic approach is that it unifies many programming patterns. I have shown you how to use the continuation monad to deal with async API. However, for my own testing I didn't use an actual API; rather I faked it using a worker thread:
void asyncApi(function<void(string)> handler)
{
    thread th([handler]()
    {
        cout << "Started async\n";
        this_thread::sleep_for(chrono::seconds(3));
        handler("Done async");
    });
    th.detach();
}

When you squint at this code, it almost looks like I have implemented a future. Inside the thread, instead of calling set_value on a promise I called the handler. But outside the thread, things look rather different. If I used a future, the only way to access the value would be to call get on the future. But get is a blocking call. I can't perform any computations on the future value until my worker thread sets the promise. I described this problem some time ago in my blog Broken Promises. Now it's clear that composable futures should have been implemented using the continuation monad.
Think of it: A future represents a value that is not there yet. That's just like my Continuator. Instead of blocking on a future to retrieve its value, you provide a continuation to consume it. And this is not an original idea either. That's how C# tasks are implemented. And also that's one of the ways concurrency is implemented in Haskell using the par monad (a version of the continuation monad).
There's also been talk at the meeting in Bellevue about adding support for vectorization and, in general, for data-driven parallelism. It seems like a completely different set of problems requiring its own solution, possibly a language extension. Except that it's not! Vector operations and GPU kernels must also be composable, the same way that futures and asynchronous operations should be.
Just to give you a simple example: Multiply vector v1 by a scalar a and add the result to vector v2. You definitely don't want the multiplication and the addition to be coded separately, with a barrier between them. Neither do you want to provide separate intrinsics for all possible combinations of vector operations. You want the compiler to be able to perform a fusion -- to compile the two operations into one kernel, as long as the underlying hardware allows it. But that can't be done (at least not easily) if the code uses inversion of control.
Finally, having one framework for asynchronicity, concurrency, and data-driven parallelism would allow free composition between all three of them. You could send one part of the computation to a GPU, another to a worker thread, then combine their results using a combinator, and then call an asynchronous API, all without blocking. I think this is a worthy design goal for the next version of the C++ Standard.

Appendix 1


Here is some testing harness I used to test my C++ code. This is my fake async API, as I've already shown:
void asyncApi(function<void(string)> handler)
{
    thread th([handler]()
    {
        cout << "Started async\n";
        this_thread::sleep_for(chrono::seconds(3));
        handler("Done async");
    });
    th.detach();
}

I create a detached thread that sleeps for 3 seconds and then calls the handler. Notice that asyncApi itself returns almost immediately, as it should.
This is the test for the simple async continuator described in the text:
void main()
{
    AsyncApi callApi;
    callApi.andThen([](string s)
    {
        cout << s << std::endl;
    });
    // This code will be executed in parallel to async API
    for(int i = 0; i < 200; ++i)
    {
        cout << i << endl;
        this_thread::sleep_for(chrono::seconds(1));
    }
}

It creates the AsyncApi object and calls its andThen method, passing it a continuation. Then it immediately starts printing numbers, one every second. After three seconds the API finishes and calls the handler.
This is the test for the simple looping construct. The outer continuation that's supposed to print "Never happens" is never called because the loop is infinite.
void main()
{
    Loop("Loop: ").andThen([](string s) 
    { 
        cout << "Never happens: " << s << endl; 
    });
    // run counter in parallel
    for(int i = 0; i < 200; ++i)
    {
        cout << i << endl;
        this_thread::sleep_for(chrono::seconds(1));
    }
}

Here's the output of this program:
[caption id="attachment_305" align="alignnone" width="211"] The output of an asynchronous loop[/caption]
The counter in the main thread is running all the time, while the async loop keeps printing its results every three seconds.
Finally, this is the test of the finite async loop. The text "Finally" is printed after three iterations of the loop, while the main thread keeps printing numbers.
void main()
{
    LoopN("Begin ", 3).andThen([](string s) 
    { 
        cout << "Finally " << s << endl; 
    });
    for(int i = 0; i < 200; ++i)
    {
        cout << i << endl;
        this_thread::sleep_for(chrono::seconds(1));
    }
}

Appendix 2


This is a slight digression from the main topic, but I though I'd include it for completeness. It often happens, in particular in GUI programming, that the handler for an async operation must be executed in the main thread (e.g., the GUI thread). There is in fact a provision for that in C#, so I'd like to show you how it could be implemented in the monadic setup.
I pass a small handler to the API that does nothing but post a message to the global message queue:
struct AsyncApi : Continuator<void, string>
{
    void andThen(function<void(string)> k)
    {
        TheQueue.registerHandler(k);
        asyncApi([](string s)
        {
            TheQueue.post(s);
        });
    }
};

I also register my main handler -- the one that must run in the GUI thread. (A Windows implementation could register a special Windows message and add the handler for it to the message map.) When the API finishes, it will post the message, and the GUI thread will pick it up from the queue and execute the handler.

Acknowledgments

I'd like to thank Artur Laksberg (Microsoft, PPL), Mads Torgersen (Microsoft, C#), and Abel Sinkovics (Eötvös Loránd University, Budapest, Hungary) for reviewing the draft of this blog and providing valuable feedback.

Further Reading

  1. Paolo Capriotti, Reinversion of Control with Continuations
  2. sigfpe, Quick and Dirty Reinversion of Control

Recent Posts

Lambda Conference and Haskell Survey

read more

My DevOps Journey and How I Became a Recovering IT Operations Manager

read more

Amazon GovCloud has no Route53! How to solve this?

read more