Search code examples
haskellhaskell-polysemyeffect-systems

Using pooledMapConcurrentlyN in polysemy


I'm currently playing around with Polysemy, rewriting a small toy project of mine to get used to it. I'm stumbling upon a piece of code that uses pooledMapConcurrentlyN, so basically a parallel version of traverse with bounded concurrency.

I can strip my example down to this:

foo :: Sem r Int
foo = do
  res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
  pure $ sum res

action :: String -> Sem r Int 
action = pure. length

This doesn't compile because there's no instance for MonadUnliftIO (Sem r). It does compile when I use traverse, but I'm looking for a concurrent version. I'm not sure which way I should go now.

I see the following options:

  • Implement a MonadUnliftIO (Sem r) instance. I see that there were some discussions about adding/implementing such an instance in this GitHub issue. However, it's not clear to me whether it's a good idea to do so.
  • Using something other than pooledMapConcurrentlyN that gives me an equivalent behavior. I know that there's parTraverse from the par-dual package, but that would require a ParDual instance. The parallel package could make a solution possible as well, but I'm not familiar with that so I can't tell if it's possible.
  • Model the parallel traverse as an effect. I tried it, but I couldn't manage to get an implementation for the effect. The effect definition I tried looks like this:
data ParTraverse m a where
  TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

I'm not really familiar yet with neither GADTs nor Polysemy, so it's possible that I'm missing something obvious here.


EDIT: As pointed out in the answer below, the most appropriate solution is to model this as an effect and handle the concurrency in the effect interpretation as opposed to the business logic. This means that I'm looking for a higher order effect (?) similar to the ParTraverse effect above:

data ParTraverse m a where
  TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)

makeSem ''ParTraverse

parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
  TraverseP f ta -> do
    _something

I'm not sure whether this type signature is correct or not (should the action have type a -> Sem r b? The signature for traverse has an Applicative constraint on m, how would I model that?)


Solution

  • As for the ParTraverse implementation, this is what I replied over on github, for a version specialized to [] for t:

    pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
    pooledMapConcurrently num f ta =
      ...
    
    data ParTraverse m a where
      TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
    
    makeSem ''ParTraverse
    
    parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
    parTraverseToIO =
      interpretH \case
       TraverseP f ta -> do
         taT <- traverse pureT ta
         fT <- bindT f
         tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
         ins <- getInspectorT
         pureT (catMaybes (inspect ins <$> catMaybes tb))
    

    Some explanations for the combinators used inside interpretH, where we operate in the Tactical environment:

    • Since we're dealing with a function a -> m b, where m is instantiated to Sem rInitial inside the interpreter, we have to use bindT to get a function that is something like f a -> Sem r (f b), with f being the monadic state of the interpreters.
    • We can't run pooledMapConcurrently on the Sem rInitial directly, because Member (Final IO) is only given for r.
    • ta contains the input for f, but since we lifted that to expect f a, we also have to call pureT on each element of ta, using traverse since it is a monadic action.
    • Functions produced by bindT (and runT) produce Sems that still have the current effect, ParTraverse, at the head, because the effect has to be interpreted within the wrapped Sem (passed in as a -> m b). This even allows to use a different interpreter for the inner program. In our case, we simply run parTraverseToIO on the result of f again. After that, we have to lift this Sem back into the Tactical environment (which is just another effect at the head), so we use raise.
    • Since our lifted f produces f (Maybe b) as result, we need to unpack this in order to get the return type right. For that, we can use the inspector, which transforms f to Maybe, giving us Maybe (Maybe b), which we can then flatten into a list.

    For completeness, here's the implementation of pooledMapConcurrently, written by KingoftheHomeless:

    pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
    pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
      (<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t