Context : I'm implementing an App in CQRS and I'm trying to optimize the processing of commands (1 stream by aggregate Id basically)...
Problem : I would like to have a first stream that receives all the commands and dispatches these ones by their aggregate Id on different threads :
1) Commands within an aggregate are processed in a serialized way
2) Aggregates process their commands independently (in parallel).
Solution : I'm trying to perform a groupBy on streams by aggregate Id basically...To help a bit,I simplified the example as follow :
module Sandbox where
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO(..))
main :: IO ()
main = do
runStream $ parallely $ S.fromList getAggregateIds |& S.mapM (\x -> do
threadId <- myThreadId
liftIO $ putStrLn $ (show threadId) ++ " value " ++ (show x))
getAggregateIds :: [Integer]
getAggregateIds = [1..3] <> [1..3]
so this script is displaying the following result :
ThreadId 17 value 1
ThreadId 15 value 2
ThreadId 19 value 3
ThreadId 13 value 1
ThreadId 16 value 3
ThreadId 18 value 2
What I'm expecting is something like that (no special order just x always processed on the same thread x1 ) :
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
ThreadId X1 value X
ThreadId Y1 value Y
ThreadId Z1 value Z
Thanks !!
In the code above, parallely
decided to create one Haskell thread for each element in the list getAggregateIds
, which is [1,2,3,1,2,3]
.
parallely
does not care about there being some duplicate elements in the list: it simply starts a thread for each one.
In principle, parallely
could allocate only a small number of Haskell threads and reuse them later on (possibly for the same duplicate ID, or another one), but there would be no performance gain in doing so. Indeed, the crucial part here is that an Haskell thread is being allocated, not an OS thread,
Haskell threads are very lightweight, they use very little memory and so they are very cheap to create and dispose. Trying to reuse them would possibly lead to worse performance.
Further, the Haskell runtime can execute many Haskell threads in a single OS threads. Usually, a small pool of OS threads is kept around by the runtime, and Haskell threads are mapped to those. Since OS threads are not as lightweight OS threads are indeed reused between Haskell threads.
Finally, note that the ThreadId
is the name of the Haskell thread, not the OS one, so it's normal to see no reuse of those IDs.