In haskell streaming, there is an example of copy
>>> (S.toList . mapped S.toList . chunksOf 5) $ (S.toList . mapped S.toList . chunksOf 3) $ S.copy $ each [1..10]
[[1,2,3,4,5],[6,7,8,9,10]] :> ([[1,2,3],[4,5,6],[7,8,9],[10]] :> ())
Is it possible to separate this into two "clean" streams such that it can print below results?
>>>S.print stream1
[[1,2,3,4,5],[6,7,8,9,10]]
>>>S.print stream2
[[1,2,3],[4,5,6],[7,8,9],[10]]
Note there is no more ':>' in the above result. More generally, I am not sure whether there are functions that 'simplify' the nested streams (or stream of streams) from m
or (Of a)
part in Stream (Of a) m r
f1::Stream (Of a) (Stream (Of b) m) r -> Stream (Of b) m r
f2::Stream (Of a) (Stream (Of b) m) r -> Stream (Of a) m r
f3::Stream (Stream (Of a) m) r -> Stream (Of a) m r
[Update]
The background of this question is I am looking for idiomatic ways to re-use an underlying stream multiple times. The stream is pulled from database and the IO can be expensive. I also want to get a reference to the intermediate streams so that I can better structure my code. Some mock code:
my-stream-fn = do
original_stream <- pull_from_database
let (o1, s1) = calc_moving_average $ S.copy original_stream
(o2, s2) = calc_max $ S.copy o1
(o3, s3) = calc_min $ S.copy o2
S.print $ S.zipWith3 (\x y z-> (x, y, z)) s1 s2 s3
What I wish is o1 o2 and o3 being exactly the same as original_stream and pull_from_database IO operation done only once when original_stream is pulled.
f1 = S.effects @(Stream (Of _) _)
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of b) r
f2 = hoist @(Stream (Of _)) S.effects
:: Monad m
=> Stream (Of a) (Stream (Of b) m) r
-> Stream (Of a) m r
(typevars renamed for clarity, see docs for effects
), and f3
doesn't kind-check.
It feels like you're trying to defeat the point of streaming. You construct the pipeline, source to sink, and run it - the key is that there's no (implicit) intermediate accumulation of values. Your question is slightly loose and thus different to answer precisely, but if you wish to run all the effects of the first stream, then all the effects of the second stream, then you must be willing to store the (computation representing the) second stream until the first stream has finished effecting => you've accumulated the second stream (and thus not really streamed it). Hence why S.copy
is designed for interleaving the effects. Cf. this github issue.
I think part of what's confusing you is that you're using pure streams, and in the absence of effects then the restrictions are less obviously-motivated. Use identifiers for the components of the pipeline, not the partial results. Also in your example, then you should be combining folds, eg.
import qualified Control.Foldl as L
import qualified Streaming.Prelude as S
myStreamFn =
let movingAvg n = {-# ... #-}
combinedAcc = (,,) <$> L.minimum <*> L.maximum <*> movingAvg 10
in S.print
$ L.purely S.fold combinedAcc
$ pullFromDatabase
Another function you might like to consider is S.store
, eg.
myStreamFn
= pullFromDatabase
& S.store S.maximum
& S.store (L.purely S.fold L.minimum)
& S.store movingAvg
& S.print