When deciding which language to use to solve challenges that require heavy concurrent algorithms, it's hard to not consider Haskell. Its immutable and persistent data structures reduce the introduction of accidental complexity, and the GHC runtime facilitates the creation of thousands of (green) threads without having to worry as much about the memory and performance costs.

The epitome of Haskell's concurrent API is the async package, which provides higher-order functions (e.g. race, mapConcurrently, etc.) that allow us to run IO sub-routines and combine their results in various ways while executing concurrently. It also offers the type Concurrently which allows developers to give normal sub-routines concurrent properties, and also provides Applicative and Alternative instances that help in the creation of values from composing smaller sub-routines.

In this blog post, we will discuss some of the drawbacks of using the Concurrently type when composing sub-routines. Then we will show how we can overcome these shortcomings by taking advantage of the structural nature of the Applicative and Alternative typeclasses; re-shaping and optimizing the execution of a tree of sub-routines.

And, if you simply want to get these performance advantages in your Haskell code today, you can cut to the chase and begin using the new Conc datatype we've introduced in unliftio 0.2.9.0.

The drawbacks of Concurrently

Getting started with Concurrently is easy. We can wrap an IO a sub-routine with the Concurrently constructor, and then we can compose async values using the map (<$>), apply (<*>), and alternative (<|>) operators. An example might be:

myPureFunction :: String -> String -> String -> String
myPureFunction a b c = a ++ " " ++ b ++ " " ++ c

myComputation :: Concurrently String
myComputation =
  myPureFunction
  <$> Concurrently fetchStringFromAPI1
  <*> (    Concurrently fetchStringFromAPI2_Region1
       <|> Concurrently fetchStringFromAPI2_Region2
       <|> Concurrently fetchStringFromAPI2_Region3
       <|> Concurrently fetchStringFromAPI2_Region4)
  <*> Concurrently fetchStringFromAPI3

Let's talk a bit on the drawbacks of this approach. How many threads do you think we need to make sure all these calls execute concurrently? Try to come up with a number and an explanation and then continue reading.

I am guessing you are expecting this code to spawn six (6) threads, correct? One for each IO sub-routine that we are using. However, with the existing implementation of Applicative and Alternative in Concurrently, we will spawn at least ten (10) threads. Let's explore these instances to have a better understanding of what is going on:

instance Applicative Concurrently where
  pure = Concurrently . return
  Concurrently fs <*> Concurrently as =
    Concurrently $ (\(f, a) -> f a) <$> concurrently fs as

instance Alternative Concurrently where
  Concurrently as <|> Concurrently bs =
    Concurrently $ either id id <$> race as bs

First, let us expand the alternative calls in our example:

    Concurrently fetchStringFromAPI2_Region1
<|> Concurrently fetchStringFromAPI2_Region2
<|> Concurrently fetchStringFromAPI2_Region3
<|> Concurrently fetchStringFromAPI2_Region4

--- is equivalent to
Concurrently (
  either id id <$>
    race {- 2 threads -}
      fetchStringFromAPI2_Region1
      (either id id <$>
         race {- 2 threads -}
           fetchStringFromAPI2_Region2
           (either id id <$>
              race {- 2 threads -}
                fetchStringFromAPI2_Region3
                fetchStringFromAPI2_Region4))
)

Next, let us expand the applicative calls:

    Concurrently (myPureFunction <$> fetchStringFromAPI1)
<*> Concurrently fetchStringFromAPI2
<*> Concurrently fetchStringFromAPI3

--- is equivalent to

Concurrently (
  (\(f, a) -> f a) <$>
    concurrently {- 2 threads -}
      ( (\(f, a) -> f a) <$>
         concurrently {- 2 threads -}
           (myPureFunction <$> fetchStringFromAPI1)
           fetchStringFromAPI2
      )
      fetchStringFromAPI3
)

You may tell we are always spawning two threads for each pair of sub-routines. Suppose we have 7 sub-routines we want to compose via Applicative or Alternative. Using this implementation we would spawn at least 14 new threads when at most 8 should do the job. For each composition we do, an extra thread is going to be spawned to deal with bookkeeping.

Another drawback to consider: what happens if one of the values in the call is a pure call? Given this code:

pure foo <|> bar

We get to spawn a new thread (unnecessarily) to wait for foo, even though it has already been computed and it should always win. As we mentioned before, Haskell is an excellent choice for concurrency because it makes spawning threads cheap; however, these threads don't come for free, and we should strive to avoid redundant thread creation.

Introducing the Conc type

To address the issues mentioned above, we implemented a new type called Conc in our unliftio package. It has the same purpose as Concurrently, but it offers some extra guarantees:

The Conc type is defined as follows:

data Conc m a where
  Action :: m a -> Conc m a
  Apply  :: Conc m (v -> a) -> Conc m v -> Conc m a
  LiftA2 :: (x -> y -> a) -> Conc m x -> Conc m y -> Conc m a
  Pure   :: a -> Conc m a
  Alt    :: Conc m a -> Conc m a -> Conc m a
  Empty  :: Conc m a

instance MonadUnliftIO m => Applicative (Conc m) where
  pure   = Pure
  (<*>)  = Apply
  (*>)   = Then
  liftA2 = LiftA2

instance MonadUnliftIO m => Alternative (Conc m) where
  (<|>) = Alt

If you are familiar with Free types, this will look eerily familiar. We are going to represent our concurrent computations as data so that we can later transform it or evaluate as we see fit. In this setting, our first example would look something like the following:

myComputation :: Conc String
myComputation =
  myPureFunction
  <$> conc fetchStringFromAPI1
  <*> (    conc fetchStringFromAPI2_Region1
       <|> conc fetchStringFromAPI2_Region2
       <|> conc fetchStringFromAPI2_Region3
       <|> conc fetchStringFromAPI2_Region4)

--- is equivalent to

Apply (myPureFunction <$> fetchStringFromAPI1)
      (Alt (Action fetchStringFromAPI2_Region1)
           (Alt (Action fetchStringFromAPI2_Region2)
                (Alt (Action fetchStringFromAPI2_Region3)
                     (Action fetchStringFromAPI2_Region4))))

You may notice we keep the tree structure of the Concurrently implementation. However, given we are dealing with a pure data structure, we can modify our Conc value to something that is easier to evaluate. Indeed, thanks to the Applicative interface, we don't need to evaluate any of the IO sub-routines to do transformations (magic!).

We have additional (internal) types that flatten all our alternatives and applicative values:

data Flat a
  = FlatApp !(FlatApp a)
  | FlatAlt !(FlatApp a) !(FlatApp a) ![FlatApp a]

data FlatApp a where
  FlatPure   :: a -> FlatApp a
  FlatAction :: IO a -> FlatApp a
  FlatApply  :: Flat (v -> a) -> Flat v -> FlatApp a
  FlatLiftA2 :: (x -> y -> a) -> Flat x -> Flat y -> FlatApp a

These types are equivalent to our Conc type, but they have a few differences from Conc:

The first example of our blog post, when flattened, would look something like the following:

FlatApp
  (FlatApply
    (FlatApp (FlatAction (myPureFunction <$> fetchStringFromAPI1)))
    (FlatAlt (FlatAction fetchStringFromAPI2_Region1)
             (FlatAction fetchStringFromAPI2_Region2)
             [ FlatAction fetchStringFromAPI2_Region3
             , FlatAction fetchStringFromAPI2_Regoin4 ]))

Using a flatten function that transforms a Conc value into a Flat value, we can later evaluate the concurrent sub-routine tree in a way that is optimal for our use case.

Performance

So given that the Conc API reduces the number of threads created via Alternative, our implementation should work best, correct? Sadly, it is not all peachy. To ensure that we get the result of the first thread that finishes on an Alternative composition, we make use of the STM API. This approach works great when we want to gather values from multiple concurrent threads. Sadly, the STM monad doesn't scale too well when composing lots of reads, making this approach prohibitive if you are composing tens of thousands of Conc values.

Considering this limitation, we only use STM when an Alternative function is involved; otherwise, we rely on MVars for multiple thread result composition via Applicative. We can do this without sweating because we can change the evaluator of the sub-routine tree created by Conc on the fly.

Conclusions

We showcased how we can model the composition of computations using an Applicative and Alternative tree, and then, taking advantage of this APIs; we transformed this computation tree into something more approachable to execute concurrently. We also took advantage of this sub-routines as data approach to change the evaluator from MVar to STM compositions.

Do you like this blog post and need help with industrial Haskell, Rust or DevOps? Contact us.

Share this