Search code examples
haskellmemoryhaskell-pipes

Understanding memory usage of this Haskell program


I should preface this with saying I'm very much a beginner with Haskell and the pipes library, and I would like to understand what is causing the high memory usage of this program in the test function.

Specifically in the fold that produces r1 value in test I'm seeing a build up of MyRecord values until the final result is produced, unless deepseq is used. On my sample data set of ~ 500000 lines / ~ 230 MB the memory usage grows beyond 1.5 GB.

The fold that produces r2 value runs in constant memory.

What I would like to understand is:

1) What could be causing the build of MyMemory values in the first fold, and why using deepseq would fix it? I was very much throwing things at it at random until arriving at using deepseq to achieve constant memory usage but would like to understand why it works. Can constant memory usage be achieved without using deepseq while still producing the same result type of Maybe Int?

2). What is different for the second fold causing it not to exhibit the same issue?

I know that if I were to work with just integers instead of a tuples I could use the builtin sum function from Pipes.Prelude but I will eventually want to process the second element that contains any parsing errors.

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Test where

import           Control.Arrow
import           Control.DeepSeq
import           Control.Monad
import           Data.Aeson
import           Data.Function
import           Data.Maybe
import           Data.Monoid
import           Data.Text (Text)

import           Pipes
import qualified Pipes.Aeson as PA (DecodingError(..))
import qualified Pipes.Aeson.Unchecked as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Group as PG
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as P

import           System.IO
import           Control.Lens
import qualified Control.Foldl as Fold

data MyRecord = MyRecord
  { myRecordField1 :: !Text
  , myRecordField2 :: !Int
  , myRecordField3 :: !Text
  , myRecordField4 :: !Text
  , myRecordField5 :: !Text
  , myRecordField6 :: !Text
  , myRecordField7 :: !Text
  , myRecordField8 :: !Text
  , myRecordField9 :: !Text
  , myRecordField10 :: !Int
  , myRecordField11 :: !Text
  , myRecordField12 :: !Text
  , myRecordField13 :: !Text
  } deriving (Eq, Show)

instance FromJSON MyRecord where
  parseJSON (Object o) =
    MyRecord <$> o .: "field1" <*> o .: "field2" <*> o .: "field3" <*>
    o .: "field4" <*>
    o .: "field5" <*>
    o .: "filed6" <*>
    o .: "field7" <*>
    o .: "field8" <*>
    o .: "field9" <*>
    (read <$> o .: "field10") <*>
    o .: "field11" <*>
    o .: "field12" <*>
    o .: "field13"
  parseJSON x = fail $ "MyRecord: expected Object, got: " <> show x

instance ToJSON MyRecord where
    toJSON _ = undefined

test :: IO ()
test = do
  withFile "some-file" ReadMode $ \hIn
  {-

      the pipeline is composed as follows:

      1 a producer reading a file with Pipes.ByteString, splitting chunks into lines,
        and parsing the lines as JSON to produce tuples of (Maybe MyRecord, Maybe
        ByteString), the second element being an error if parsing failed

      2 a pipe filtering that tuple on a field of Maybe MyRecord, passing matching
        (Maybe MyRecord, Maybe ByteString) downstream

      3 and a pipe that picks an Int field out of Maybe MyRecord, passing (Maybe Int,
        Maybe ByteString downstream)

      pipeline == 1 >-> 2 >-> 3

      memory profiling indicates the memory build up is due to accumulation of
      MyRecord "objects", and data types comprising their fields (mainly
      Text/ARR_WORDS)

  -}
   -> do
    let pipeline = f1 hIn >-> f2 >-> f3
    -- need to use deepseq to avoid leaking memory
    r1 <-
      P.fold
        (\acc (v, _) -> (+) <$> acc `deepseq` acc <*> pure (fromMaybe 0 v))
        (Just 0)
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r1
    hSeek hIn AbsoluteSeek 0
    -- this works just fine as is and streams in constant memory
    r2 <-
      P.fold
        (\acc v ->
           case fst v of
             Just x -> acc + x
             Nothing -> acc)
        0
        id
        (pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
    print r2
    return ()
  return ()

f1
  :: (FromJSON a, MonadIO m)
  => Handle -> Producer (Maybe a, Maybe PB.ByteString) m ()
f1 hIn = PB.fromHandle hIn & asLines & resumingParser PA.decode

f2
  :: Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) IO r
f2 = filterRecords (("some value" ==) . myRecordField5)

f3 :: Pipe (Maybe MyRecord, d) (Maybe Int, d) IO r
f3 = P.map (first (fmap myRecordField10))

filterRecords
  :: Monad m
  => (MyRecord -> Bool)
  -> Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) m r
filterRecords predicate =
  for cat $ \(l, e) ->
    when (isNothing l || (predicate <$> l) == Just True) $ yield (l, e)

asLines
  :: Monad m
  => Producer PB.ByteString m x -> Producer PB.ByteString m x
asLines p = Fold.purely PG.folds Fold.mconcat (view PB.lines p)

parseRecords
  :: (Monad m, FromJSON a, ToJSON a)
  => Producer PB.ByteString m r
  -> Producer a m (Either (PA.DecodingError, Producer PB.ByteString m r) r)
parseRecords = view PA.decoded

resumingParser
  :: Monad m
  => PP.StateT (Producer a m r) m (Maybe (Either e b))
  -> Producer a m r
  -> Producer (Maybe b, Maybe a) m ()
resumingParser parser p = do
  (x, p') <- lift $ PP.runStateT parser p
  case x of
    Nothing -> return ()
    Just (Left _) -> do
      (x', p'') <- lift $ PP.runStateT PP.draw p'
      yield (Nothing, x')
      resumingParser parser p''
    Just (Right b) -> do
      yield (Just b, Nothing)
      resumingParser parser p'

Solution

  • As mentioned in the docs for Pipes.foldl, the fold is strict. However, the strictness is implemented with $! which only forces evaluation to WHNF - weak head normal form. WHNF is enough to fully evaluate a simple type like an Int, but it isn't strong enough to completely evaluate a more complex type like a Maybe Int.

    Some examples:

    main1 = do
      let a = 3 + undefined
          b = seq a 10
      print b                -- error: Exception: Prelude.undefined
    
    main2 = do
      let a = Just (3 + undefined)
          b = seq a 10
      print b                -- no exception
    

    In the first case the variable acc is a Just of a large thunk - the summation of all of the elements. On each iteration the variable acc goes from Just a to Just (a+b) to Just (a+b+c) etc. The addition is not being performed during the fold - it's only being done at the very end. The large memory usage comes from storing this growing summation in memory.

    In the second case the summation is reduced each iteration by $! to a simple Int.

    Besides using deepseq you can also use force:

    force x = x `deepseq` x
    

    and as mentioned in the deepseq docs, combined with ViewPatterns you can create an pattern which will fully evaluate a function argument:

    {-# LANGUAGE ViewPatterns #-}
    
    ...
    P.fold
      (\(force -> !acc) (v,_) -> (+) <$> acc <*> pure (fromMaybe 0 v))
      (Just 0)
      ...