Search code examples
haskellhaskell-pipes

Connecting Pipes with Consumers and Producers that return different values


I am writing a streaming function with the pipes ecosystem, and pipes-concurrency in particular, which is based on the operational library to allow me to quickly make little program snippets which I yield commands to a server over the network or to the stdin/out of a shell command and then read back the response. In this case it is asterisk, but could be generalized to be anything similar.

I initially wrote this with pipes in mind, but it doesn't work. The reason the following code doesn't work is that astPipe returns a Pipe _ _ IO a, whereas both i and o, from pipes-concurrency both return Consumer/Producer _ IO (). I thought about having astPipe yield Maybe ByteString, and then making the output Consumer consume Maybe ByteString, but that still doesn't solve the problem of the Producer returning ().

I feel like I'm really close to a solution, but I can't quite eek it out. You should be able to just run stack on this file to replicate.

#!/usr/bin/env stack
-- stack --resolver lts-6.20 runghc --package pipes  --package pipes-concurrency  --package operational --package process-streaming

{-# LANGUAGE OverloadedStrings, LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module West.Asterisk where

import System.Process.Streaming as PS
import Control.Monad.Operational as Op

import Pipes as P
import Pipes.Concurrent as PC;

import qualified Data.ByteString.Char8 as B

import Control.Concurrent.Async

import GHC.IO.Exception (ExitCode)

data Version = Version String
data Channels = Channels

data AsteriskInstruction a where
    Login :: AsteriskInstruction (Maybe Version)
    CoreShowChannels :: AsteriskInstruction (Maybe Channels)

type Asterisk a = Program AsteriskInstruction a

runAsterisk :: forall a. Asterisk a -> IO a
runAsterisk m = 
  let

    runAsterisk' :: Producer B.ByteString {- TODO Response -} IO () -> Consumer B.ByteString IO () -> Asterisk a -> IO a
    runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o
      where
        astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        astPipe k = 
          case Op.view m' of

            Return a -> return a

            Login :>>= k -> do
              yield logincmd
              resp <- await -- :: Response
              let v = undefined resp :: Maybe Version
              astPipe (k v)

            CoreShowChannels :>>= k -> do
              yield coreshowchannelscmd
              resp <- await
              let c = undefined resp :: Maybe Channels
              astPipe (k c)

  in do
    withSpawn unbounded $ \(out1, in1) -> do
        async $ asteriskManager (fromInput in1) (toOutput out1)
        runAsterisk' (fromInput in1) (toOutput out1) m 

asteriskManager :: Producer B.ByteString IO () -> Consumer B.ByteString IO () -> IO ExitCode
asteriskManager prod cons = do
  let ssh = shell "nc someserver 5038"
  execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode)


logincmd, coreshowchannelscmd :: B.ByteString
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n"
coreshowchannelscmd = "action: coreshowchannels\n\n"

The error:

  Blah.hs:38:45:
    Couldn't match type ‘a’ with ‘()’
      ‘a’ is a rigid type variable bound by
          the type signature for runAsterisk :: Asterisk a -> IO a
          at Blah.hs:33:23
    Expected type: Proxy () B.ByteString () B.ByteString IO ()
      Actual type: Pipe B.ByteString B.ByteString IO a
    Relevant bindings include
      astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a
        (bound at Blah.hs:41:9)
      m' :: Asterisk a (bound at Blah.hs:38:22)
      runAsterisk' :: Producer B.ByteString IO ()
                      -> Consumer B.ByteString IO () -> Asterisk a -> IO a
        (bound at Blah.hs:38:5)
      m :: Asterisk a (bound at Blah.hs:34:13)
      runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1)
    In the second argument of ‘(>->)’, namely ‘astPipe m'’
    In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’

Solution

  • Producers and Consumers that return () can stop by themselves. Producers and Consumers that are polymorphic on their return type never stop by themselves.

    To unify the return types in your case, put each one in different branches of an Either, using fmap.

    runAsterisk' :: Producer B.ByteString IO () 
                 -> Consumer B.ByteString IO () 
                 -> Asterisk a 
                 -> IO (Either () a)
    runAsterisk' i o m' = runEffect $ fmap Left i >-> fmap Right (astPipe m') >-> fmap Left o
    

    Pattern matching on the Either will reveal which component stopped the pipeline.

    Also, you can use drain to transform a Consumer a IO () into a consumer that never stops by itself:

    neverStop :: Consumer a IO () -> Consumer a IO r
    neverStop consumer = consumer *> drain
    

    All the inputs received after the original consumer stops will be discarded.