Search code examples
haskellpipelinetype-systemstype-level-computation

Can Haskell's type system enforce correct ordering of data pipeline stages?


I create a lot of data processing pipelines using mass spectrometry data, where data from the instrument gets cleaned up, transformed, scaled, inspected, and finally analyzed. I tend to use a recursive type definition for this -- here's a heavily simplified example:

data Dataset = Initial { x::(Vector Double), y::(Vector Double) name::String}
             | Cleaned { x::(Vector Double), y::(Vector Double) name::String}
             | Transformed { x::(Vector Double), y::(Vector Double) name::String}

Then a typical pipeline will just be a chain of functions that starts with a Dataset creator, and then continue with functions that consume something of type Dataset, and produce something of type Dataset:

createDataset :: Vector Double -> Vector Double -> String -> Dataset
createDataset x y name = Initial x y name

removeOutliers :: Dataset -> Dataset
removeOutliers (Initial x y n) = let
                         (new_x, new_y) = outlierRemovalFunction x y
                         in Cleaned new_x new_y (n ++"_outliersRemoved")
               (Cleaned x y n) = error "Already been cleaned"
               (Scaled x y n) = error "Scaled data should have already been cleaned"
               (Transformed x y n) = error "Transformed data should have already been cleaned"

logTransform :: Dataset -> Dataset
logTransform (Initial x y n) = error "Need to clean first"
             (Cleaned x y n) = let
                         (new_x, new_y) = logTransformFunction x y
                         in Transformed new_x new_y (n ++ "_logTransformed)


So this makes sure that the processing steps in the pipeline happen in the correct order, and you can create whole pipelines using composition

(logTransform . removeOutliers . createDataset) init_y init_y "ourData"

But this approach seems extremely limited for a couple of reasons. The first reason is that incorrectness is detected via pattern matching on constructors, so additions and changes to the pipeline will require changes everywhere in the pattern matching. Imagining a more complicated example with several cleaning and several transform steps -- basically every possible combination will need its own unique constructor, and all the pattern matching will have to be non-exhaustive, or repeated absolutely everywhere.

The second reason this seems limited is that an incorrectly constructed pipeline is only detected by failures at runtime. I've sequenced all the processing steps, so at every point in the pipeline I know exactly what has happened to the data. The type system should be able to keep me from putting the steps together incorrectly in the first place, and using a function expecting cleaned data on a non-cleaned input should be detectable at compile time.

I have thought about having separate types for each of the stages in the pipeline, and then having the "dataset" interface implemented as a type class, something like:

class Dataset a where
    x :: a -> Vector Double
    y :: a -> Vector Double
    name :: a -> String

data Initial = Initial x y name
instance Dataset Initial where ...

data Cleaned a = Cleaned a
instance Dataset Cleaned where ...

data Transformed a = Transformed a
instance Dataset Transformed where ...

then you can do things (I think...) like:


removeOutliers :: (Dataset a) => a -> Cleaned a
removeOutliers = ...

logTransform :: (Dataset a) => Cleaned a -> Transformed Cleaned a
logTransform = ...

I believe that this approach fixes issue number 1 above: We can now detect pipeline incorrectness at compile time, and we no longer are stuck with having to have all those different constructors to describe the processing steps.

However, it seems that I've just moved the problem "one level up". I'm now dealing with type variables and all these nested types. Instead of a needing a Dataset constructor for every possible combination of pipeline steps, I now need to create a Dataset instance for every combination of types!

What I really want is a way for a type in the processing pipeline to be both very specific or very general in their constraints. I'd like to use types/constraints that detail the order in which specific processing steps were applied, but I'd also like for a type/constraint to be able to convey something more general -- i.e. "In addition to other unimportant steps, outlier removal has been done". So basically the type of things that have had outliers removed.

Conveying ordering information would be an ultra-bonus -- "In addition to other unimportant steps outlier removal happened and at some point later on a log transformation happened". The type of things that have had outliers removed before they were log transformed (and not necessarily immediately before).

Is this sort of thing possible using Haskell's type system?


Solution

  • Yes, the modern Haskell type system can handle this. However, compared to usual, term-level programming, type-level programming in Haskell is still hard. The syntax and techniques are complicated, and documentation is somewhat lacking. It also tends to be the case that relatively small changes to requirements can lead to big changes in the implementation (i.e., adding a new "feature" to your implementation can cascade into a major reorganization of all the types), which can make it difficult to come up with a solution if you're still a little uncertain about what your requirements actually are.

    @JonPurdy's comment and @AtnNn's answer give a couple of ideas of what's possible. Here's a solution that tries to address your specific requirements. However, it's likely to prove difficult to use (or at least difficult to adapt to your requirements) unless you're willing to sit down and teach yourself a fair bit of type-level programming.

    Anyway, suppose you're interested in tagging a fixed data structure (i.e., always the same fields with the same types) with a type-level list of the processes that have been performed on it, with a means to check the process list against an ordered sublist of required processes.

    We'll need some extensions:

    {-# LANGUAGE ConstraintKinds #-}
    {-# LANGUAGE DataKinds #-}
    {-# LANGUAGE PolyKinds #-}
    {-# LANGUAGE TypeFamilies #-}
    {-# LANGUAGE TypeOperators #-}
    {-# LANGUAGE UndecidableInstances #-}
    

    The process tags themselves are defined as constructors in a sum type, with the DataKinds extension lifting the tags from the term level to the type level:

    data Process = Cleaned | Transformed | Scaled | Inspected | Analyzed
    

    The data structure is then tagged with a list of applied processes, its "pipeline":

    data Dataset (pipeline :: [Process])
      = Dataset { x :: [Double]
                , y :: [Double]
                , name :: String }
    

    NOTE: It will be most convenient for the pipeline to be in reverse order, with the most recently applied Process first.

    To allow us to require that a pipeline has a particular ordered subsequence of processes, we need a type-level function (i.e., a type family) that checks for subsequences. Here's one version:

    type family a || b where
      True  || b = True
      False || b = b
    
    type family Subseq xs ys where
      Subseq '[]      ys  = True
      Subseq nonempty '[] = False
      Subseq (x:xs) (x:ys) = Subseq xs ys || Subseq (x:xs) ys
      Subseq xs     (y:ys) = Subseq xs ys
    

    We can test this type-level function in GHCi:

    λ> :kind! Subseq '[Inspected, Transformed] '[Analyzed, Inspected, Transformed, Cleaned]
    Subseq '[Inspected, Transformed] '[Analyzed, Inspected, Transformed, Cleaned] :: Bool
    = 'True
    λ> :kind! Subseq '[Inspected, Transformed] '[Analyzed, Transformed, Cleaned]
    Subseq '[Inspected, Transformed] '[Analyzed, Transformed, Cleaned] :: Bool
    = 'False
    λ> :kind! Subseq '[Inspected, Transformed] '[Transformed, Inspected]
    Subseq '[Inspected, Transformed] '[Transformed, Inspected] :: Bool
    = 'False
    

    If you want to write a function that requires a dataset to have been transformed and then cleaned of outliers (in that order), possibly intermixed with other, unimportant steps with the function itself applying a scaling step, then the signature will look like this:

    -- remember: pipeline type is in reverse order
    foo1 :: (Subseq [Cleaned, Transformed] pipeline ~ True)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo1 = undefined
    

    If you want to prevent double-scaling, you can introduce another type-level function:

    type family Member x xs where
      Member x '[] = 'False
      Member x (x:xs) = 'True
      Member x (y:xs) = Member x xs
    

    and add another constraint:

    foo2 :: ( Subseq [Cleaned, Transformed] pipeline ~ True
            , Member Scaled pipeline ~ False)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo2 = undefined
    

    Then:

    > foo2 (Dataset [] [] "x" :: Dataset '[Transformed])
    ... Couldn't match type ‘'False’ with ‘'True’ ...
    > foo2 (Dataset [] [] "x" :: Dataset '[Cleaned, Scaled, Transformed])
    ... Couldn't match type ‘'False’ with ‘'True’ ...
    > foo2 (Dataset [] [] "x" :: Dataset '[Cleaned, Transformed])
    -- typechecks okay
    foo2 (Dataset [] [] "x" :: Dataset '[Cleaned, Transformed])
      :: Dataset '[ 'Scaled, 'Cleaned, 'Transformed]
    

    You can make it all a little friendlier, both in terms of constraint syntax and error messages, with some additional type aliases and type families:

    import Data.Kind
    import GHC.TypeLits
    
    type Require procs pipeline = Require1 (Subseq procs pipeline) procs pipeline
    type family Require1 b procs pipeline :: Constraint where
      Require1 True procs pipeline = ()
      Require1 False procs pipeline
        = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                     Text " lacks required processing " :<>: ShowType procs)
    type Forbid proc pipeline = Forbid1 (Member proc pipeline) proc pipeline
    type family Forbid1 b proc pipeline :: Constraint where
      Forbid1 False proc pipeline = ()
      Forbid1 True proc pipeline
        = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                     Text " must not include " :<>: ShowType proc)
    
    foo3 :: (Require [Cleaned, Transformed] pipeline, Forbid Scaled pipeline)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo3 = undefined
    

    which gives:

    > foo3 (Dataset [] [] "x" :: Dataset '[Transformed])
    ...The pipeline '[ 'Transformed] lacks required processing '[ 'Cleaned, 'Transformed]...
    > foo3 (Dataset [] [] "x" :: Dataset '[Cleaned, Scaled, Transformed])
    ...The pipeline '[ 'Cleaned, 'Scaled, 'Transformed] must not include 'Scaled...
    > foo3 (Dataset [] [] "x" :: Dataset '[Cleaned, Transformed])
    -- typechecks okay
    foo3 (Dataset [] [] "x" :: Dataset '[Cleaned, Transformed])
      :: Dataset '[ 'Scaled, 'Cleaned, 'Transformed]
    

    A full code sample:

    {-# LANGUAGE ConstraintKinds #-}
    {-# LANGUAGE DataKinds #-}
    {-# LANGUAGE PolyKinds #-}
    {-# LANGUAGE TypeFamilies #-}
    {-# LANGUAGE TypeOperators #-}
    {-# LANGUAGE UndecidableInstances #-}
    
    import Data.Kind
    import GHC.TypeLits
    
    data Process = Cleaned | Transformed | Scaled | Inspected | Analyzed
    
    data Dataset (pipeline :: [Process])
      = Dataset { x :: [Double]
                , y :: [Double]
                , name :: String }
    
    type family a || b where
      True  || b = True
      False || b = b
    
    type family Subseq xs ys where
      Subseq '[]      ys  = True
      Subseq nonempty '[] = False
      Subseq (x:xs) (x:ys) = Subseq xs ys || Subseq (x:xs) ys
      Subseq xs     (y:ys) = Subseq xs ys
    
    type family Member x xs where
      Member x '[] = False
      Member x (x:xs) = True
      Member x (y:xs) = Member x xs
    
    type Require procs pipeline = Require1 (Subseq procs pipeline) procs pipeline
    type family Require1 b procs pipeline :: Constraint where
      Require1 True procs pipeline = ()
      Require1 False procs pipeline
        = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                     Text " lacks required processing " :<>: ShowType procs)
    type Forbid proc pipeline = Forbid1 (Member proc pipeline) proc pipeline
    type family Forbid1 b proc pipeline :: Constraint where
      Forbid1 False proc pipeline = ()
      Forbid1 True proc pipeline
        = TypeError (Text "The pipeline " :<>: ShowType pipeline :<>:
                     Text " must not include " :<>: ShowType proc)
    
    
    foo1 :: (Subseq [Cleaned, Transformed] pipeline ~ True)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo1 = undefined
    
    foo2 :: ( Subseq [Cleaned, Transformed] pipeline ~ True
            , Member Scaled pipeline ~ False)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo2 = undefined
    
    foo3 :: (Require [Cleaned, Transformed] pipeline, Forbid Scaled pipeline)
         => Dataset pipeline -> Dataset (Scaled : pipeline)
    foo3 = undefined