Search code examples
haskellhaskell-pipes

Error handling in pipes


Backstory

I have a number of data files, each of them containing a list of data records (one per line). Similar to CSV but sufficiently different that I'd prefer to write my own parser rather than using a CSV library. For the purpose of this question I will use a simplified data file that contains just one number per line:

1
2
3
error
4

As you can see it is possible that a file contains malformed data in which case the whole file should be considered malformed.

The kind of data-processing I want to do can be expressed in terms of maps and folds. So, I thought this would be a good opportunity to learn how to use the pipes library.

{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}

import           Control.Monad.Except
import           Pipes ((>->))
import qualified Pipes as P
import qualified Pipes.Prelude as P
import qualified Pipes.Safe as P
import qualified System.IO as IO

First, I create a producer of lines in the text file. This is very similar to the example in the docs of Pipes.Safe.

getLines = do
    P.bracket (IO.openFile "data.txt" IO.ReadMode) IO.hClose P.fromHandle

Next, I need a function to parse each of these lines. As I mentioned before, this might fail, which I will represent with Either.

type ErrMsg = String

parseNumber :: String -> Either ErrMsg Integer
parseNumber s = case reads s of
                  [(n, "")] -> Right n
                  _         -> Left $ "Parse Error: \"" ++ s ++ "\""

For simplicity, as a first step, I want to collect all data records into a list of records. The most straight-forward approach is to pipe all the lines through the parser and just collect the whole thing into a list.

readNumbers1 :: IO [Either ErrMsg Integer]
readNumbers1 = P.runSafeT $ P.toListM $
    getLines >-> P.map parseNumber

Unfortunately, that creates a list of eithers of records. However, if the file contains one wrong record then the whole file should be considered wrong. What I really want is an either of a list of records. Of course I can just use sequence to transpose the list of eithers.

readNumbers2 :: IO (Either ErrMsg [Integer])
readNumbers2 = sequence <$> readNumbers1

But, that would read the whole file even if the first line is already malformed. These files can be large and I have many of them, so, it would be better if the reading would stop at the first error.

Question

My Question is how to achieve that. How to abort parsing upon the first malformed record?

What I got so far

My first thought was to use the monad instance of Either ErrMsg and P.mapM instead of P.map. Since we are reading from a file we already have IO and SafeT in our monad stack, so, I guess I'll need ExceptT to get error handling into that monad stack. This is the point where I'm stuck. I tried many different combinations and always ended up being yelled at by the type-checker. The following is the closest I can get to it compiles.

readNumbers3 = P.runSafeT $ runExceptT $ P.toListM $
    getLines >-> P.mapM (ExceptT . return . parseNumber)

The infered type of readNumbers3 reads

*Main> :t readNumbers3
readNumbers3
  :: (MonadIO m, P.MonadSafe (ExceptT ErrMsg (P.SafeT m)),
      P.MonadMask m, P.Base (ExceptT ErrMsg (P.SafeT m)) ~ IO) =>
     m (Either ErrMsg [Integer])

which looks close to what I want:

readNumbers3 :: IO (Either ErrMsg [Integer])

However, as soon as I try to actually execute that action I get the following error message in ghci:

*Main> readNumbers3

<interactive>:7:1:
    Couldn't match expected type ‘IO’
                with actual type ‘P.Base (ExceptT ErrMsg (P.SafeT m0))’
    The type variable ‘m0’ is ambiguous
    In the first argument of ‘print’, namely ‘it’
    In a stmt of an interactive GHCi command: print it

If I try to apply the following type-signature:

readNumbers3 :: IO (Either ErrMsg [Integer])

Then I get the following error message:

error.hs:108:5:
    Couldn't match expected type ‘IO’
                with actual type ‘P.Base (ExceptT ErrMsg (P.SafeT IO))’
    In the first argument of ‘(>->)’, namely ‘getLines’
    In the second argument of ‘($)’, namely
      ‘getLines >-> P.mapM (ExceptT . return . parseNumber)’
    In the second argument of ‘($)’, namely
      ‘P.toListM $ getLines >-> P.mapM (ExceptT . return . parseNumber)’
Failed, modules loaded: none.

Aside

Another motivation for moving the error handling into the pipe's base monad is that it would make further data processing much easier if I wouldn't have to juggle with eithers in my maps and folds.


Solution

  • Here is an incremental approach to solving the problem.

    Following Tekmo's suggestion in this SO answer we aim to operate in the following monad:

    ExceptT String (Pipe a b m) r
    

    We begin with imports and the definition of parseNumber:

    import           Control.Monad.Except
    import           Pipes ((>->))
    import qualified Pipes as P
    import qualified Pipes.Prelude as P
    
    parseNumber :: String -> Either String Integer
    parseNumber s = case reads s of
                      [(n, "")] -> Right n
                      _         -> Left $ "Parse Error: \"" ++ s ++ "\""
    

    Here is a plain Producer of Strings in the IO-monad we'll use as our input:

    p1 :: P.Producer String IO ()
    p1 = P.stdinLn >-> P.takeWhile (/= "quit")
    

    To lift it to the ExceptT monad we just use lift:

    p2 :: ExceptT String (P.Producer String IO) ()
    p2 = lift p1
    

    Here is a pipeline segment which converts Strings to Integers in the ExceptT monad:

    p4 :: ExceptT String (P.Pipe String Integer IO) a
    p4 = forever $ 
           do s <- lift P.await
              case parseNumber s of
                Left e  -> throwError e
                Right n -> lift $ P.yield n
    

    The probably can be written more combinatorially, but I've left it very explicit for clarity.

    Next we join p2 and p4 together. The result is also in the ExceptT monad.

    -- join together p2 and p4
    p7 :: ExceptT String (P.Producer Integer IO) ()
    p7 = ExceptT $ runExceptT p2 >-> runExceptT p4
    

    Tekmo's SO answer suggests creating a new operator for this.

    Finally, we can use toListM' to run this pipeline. (I've included the definition of toListM' here because it doesn't appear in my installed version of Pipes.Prelude)

    p8 :: IO ([Integer], Either String ())
    p8 = toListM' $ runExceptT p7
    
    toListM' :: Monad m => P.Producer a m r -> m ([a], r)
    toListM' = P.fold' step begin done
      where
        step x a = x . (a:)
        begin = id
        done x = x []
    

    Examples of how p8 works:

    ghci> p8
    4
    5
    6
    quit
    ([4,5,6],Right ())
    
    ghci> p8
    5
    asd
    ([5],Left "Parse Error: \"asd\"")
    

    Update

    You can simplify the code by generalizing parseNumber like this:

    parseNumber' :: (MonadError [Char] m) => String -> m Integer
    parseNumber' s = case reads s of
                       [(n, "")] -> return n
                       _         -> throwError $ "Parse Error: \"" ++ s ++ "\""
    

    Then p4 may be written:

    p4' :: ExceptT String (P.Pipe String Integer IO) a
    p4' = forever $ lift P.await >>= parseNumber' >>= lift . P.yield