I should preface this with saying I'm very much a beginner with Haskell and the pipes library, and I would like to understand what is causing the high memory usage of this program in the test
function.
Specifically in the fold that produces r1
value in test
I'm seeing a build up of MyRecord values until the final result is produced, unless deepseq
is used. On my sample data set of ~ 500000 lines / ~ 230 MB the memory usage grows beyond 1.5 GB.
The fold that produces r2
value runs in constant memory.
What I would like to understand is:
1) What could be causing the build of MyMemory values in the first fold, and why using deepseq
would fix it? I was very much throwing things at it at random until arriving at using deepseq
to achieve constant memory usage but would like to understand why it works. Can constant memory usage be achieved without using deepseq
while still producing the same result type of Maybe Int?
2). What is different for the second fold causing it not to exhibit the same issue?
I know that if I were to work with just integers instead of a tuples I could use the builtin sum
function from Pipes.Prelude but I will eventually want to process the second element that contains any parsing errors.
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test where
import Control.Arrow
import Control.DeepSeq
import Control.Monad
import Data.Aeson
import Data.Function
import Data.Maybe
import Data.Monoid
import Data.Text (Text)
import Pipes
import qualified Pipes.Aeson as PA (DecodingError(..))
import qualified Pipes.Aeson.Unchecked as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Group as PG
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as P
import System.IO
import Control.Lens
import qualified Control.Foldl as Fold
data MyRecord = MyRecord
{ myRecordField1 :: !Text
, myRecordField2 :: !Int
, myRecordField3 :: !Text
, myRecordField4 :: !Text
, myRecordField5 :: !Text
, myRecordField6 :: !Text
, myRecordField7 :: !Text
, myRecordField8 :: !Text
, myRecordField9 :: !Text
, myRecordField10 :: !Int
, myRecordField11 :: !Text
, myRecordField12 :: !Text
, myRecordField13 :: !Text
} deriving (Eq, Show)
instance FromJSON MyRecord where
parseJSON (Object o) =
MyRecord <$> o .: "field1" <*> o .: "field2" <*> o .: "field3" <*>
o .: "field4" <*>
o .: "field5" <*>
o .: "filed6" <*>
o .: "field7" <*>
o .: "field8" <*>
o .: "field9" <*>
(read <$> o .: "field10") <*>
o .: "field11" <*>
o .: "field12" <*>
o .: "field13"
parseJSON x = fail $ "MyRecord: expected Object, got: " <> show x
instance ToJSON MyRecord where
toJSON _ = undefined
test :: IO ()
test = do
withFile "some-file" ReadMode $ \hIn
{-
the pipeline is composed as follows:
1 a producer reading a file with Pipes.ByteString, splitting chunks into lines,
and parsing the lines as JSON to produce tuples of (Maybe MyRecord, Maybe
ByteString), the second element being an error if parsing failed
2 a pipe filtering that tuple on a field of Maybe MyRecord, passing matching
(Maybe MyRecord, Maybe ByteString) downstream
3 and a pipe that picks an Int field out of Maybe MyRecord, passing (Maybe Int,
Maybe ByteString downstream)
pipeline == 1 >-> 2 >-> 3
memory profiling indicates the memory build up is due to accumulation of
MyRecord "objects", and data types comprising their fields (mainly
Text/ARR_WORDS)
-}
-> do
let pipeline = f1 hIn >-> f2 >-> f3
-- need to use deepseq to avoid leaking memory
r1 <-
P.fold
(\acc (v, _) -> (+) <$> acc `deepseq` acc <*> pure (fromMaybe 0 v))
(Just 0)
id
(pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
print r1
hSeek hIn AbsoluteSeek 0
-- this works just fine as is and streams in constant memory
r2 <-
P.fold
(\acc v ->
case fst v of
Just x -> acc + x
Nothing -> acc)
0
id
(pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
print r2
return ()
return ()
f1
:: (FromJSON a, MonadIO m)
=> Handle -> Producer (Maybe a, Maybe PB.ByteString) m ()
f1 hIn = PB.fromHandle hIn & asLines & resumingParser PA.decode
f2
:: Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) IO r
f2 = filterRecords (("some value" ==) . myRecordField5)
f3 :: Pipe (Maybe MyRecord, d) (Maybe Int, d) IO r
f3 = P.map (first (fmap myRecordField10))
filterRecords
:: Monad m
=> (MyRecord -> Bool)
-> Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) m r
filterRecords predicate =
for cat $ \(l, e) ->
when (isNothing l || (predicate <$> l) == Just True) $ yield (l, e)
asLines
:: Monad m
=> Producer PB.ByteString m x -> Producer PB.ByteString m x
asLines p = Fold.purely PG.folds Fold.mconcat (view PB.lines p)
parseRecords
:: (Monad m, FromJSON a, ToJSON a)
=> Producer PB.ByteString m r
-> Producer a m (Either (PA.DecodingError, Producer PB.ByteString m r) r)
parseRecords = view PA.decoded
resumingParser
:: Monad m
=> PP.StateT (Producer a m r) m (Maybe (Either e b))
-> Producer a m r
-> Producer (Maybe b, Maybe a) m ()
resumingParser parser p = do
(x, p') <- lift $ PP.runStateT parser p
case x of
Nothing -> return ()
Just (Left _) -> do
(x', p'') <- lift $ PP.runStateT PP.draw p'
yield (Nothing, x')
resumingParser parser p''
Just (Right b) -> do
yield (Just b, Nothing)
resumingParser parser p'
As mentioned in the docs for Pipes.foldl
, the fold is strict. However,
the strictness is implemented with $!
which only forces evaluation
to WHNF - weak head normal form. WHNF is enough to fully evaluate a simple
type like an Int, but it isn't strong enough to completely evaluate a more
complex type like a Maybe Int
.
Some examples:
main1 = do
let a = 3 + undefined
b = seq a 10
print b -- error: Exception: Prelude.undefined
main2 = do
let a = Just (3 + undefined)
b = seq a 10
print b -- no exception
In the first case the variable acc
is a Just
of a large thunk - the summation of all of the elements. On each iteration the variable acc
goes from Just a
to Just (a+b)
to Just (a+b+c)
etc. The addition
is not being performed during the fold - it's only being done at the
very end. The large memory usage comes from storing this growing summation
in memory.
In the second case the summation is reduced each iteration by $!
to a simple Int.
Besides using deepseq
you can also use force
:
force x = x `deepseq` x
and as mentioned in the deepseq docs, combined with ViewPatterns you can create an pattern which will fully evaluate a function argument:
{-# LANGUAGE ViewPatterns #-}
...
P.fold
(\(force -> !acc) (v,_) -> (+) <$> acc <*> pure (fromMaybe 0 v))
(Just 0)
...