Search code examples
haskellfrpconduithaskell-pipesstreamly

GroupBy of stream by agreggateId (Haskell / concurrency streaming)


Context : I'm implementing an App in CQRS and I'm trying to optimize the processing of commands (1 stream by aggregate Id basically)...

Problem : I would like to have a first stream that receives all the commands and dispatches these ones by their aggregate Id on different threads :

1) Commands within an aggregate are processed in a serialized way
2) Aggregates process their commands independently (in parallel).

Solution : I'm trying to perform a groupBy on streams by aggregate Id basically...To help a bit,I simplified the example as follow :

module Sandbox where

import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))

main :: IO ()
main = do
         runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
            threadId <- myThreadId
            liftIO $ putStrLn $ (show threadId) ++ "  value " ++ (show x))


getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]

so this script is displaying the following result :

ThreadId 17  value 1
ThreadId 15  value 2
ThreadId 19  value 3
ThreadId 13  value 1
ThreadId 16  value 3
ThreadId 18  value 2

What I'm expecting is something like that (no special order just x always processed on the same thread x1 ) :

ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z
ThreadId X1  value X
ThreadId Y1  value Y
ThreadId Z1  value Z

Thanks !!


Solution

  • In the code above, parallely decided to create one Haskell thread for each element in the list getAggregateIds, which is [1,2,3,1,2,3]. parallely does not care about there being some duplicate elements in the list: it simply starts a thread for each one.

    In principle, parallely could allocate only a small number of Haskell threads and reuse them later on (possibly for the same duplicate ID, or another one), but there would be no performance gain in doing so. Indeed, the crucial part here is that an Haskell thread is being allocated, not an OS thread,

    Haskell threads are very lightweight, they use very little memory and so they are very cheap to create and dispose. Trying to reuse them would possibly lead to worse performance.

    Further, the Haskell runtime can execute many Haskell threads in a single OS threads. Usually, a small pool of OS threads is kept around by the runtime, and Haskell threads are mapped to those. Since OS threads are not as lightweight OS threads are indeed reused between Haskell threads.

    Finally, note that the ThreadId is the name of the Haskell thread, not the OS one, so it's normal to see no reuse of those IDs.