Search code examples
haskellhaskell-stackstmconduit

Can't pass data via stdin to process spawned with conduit-extra


In my program I am starting external process and communicate with it via stdin and stdout. I'm feeding the input through conduit (producer) started from STMs TQueue. It worked like a charm until I've decided to bump lts version. It worked great with lts <= 8.24.

Here is the minimized program that reproduces my problem:

#!/usr/bin/env stack
-- stack --resolver lts-10.4 --install-ghc runghc --package conduit-extra --package stm-conduit
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Monad.STM
import Control.Concurrent.STM.TQueue

import           Data.Conduit
import qualified Data.Conduit.Binary       as CB
import qualified Data.Conduit.List         as CL
import           Data.Conduit.Process     (CreateProcess (..),
                                           proc, sourceProcessWithStreams)
import qualified Data.Conduit.TQueue       as CTQ

import qualified Data.ByteString.Char8     as BS
import           Data.Monoid              ((<>))

main :: IO ()
main = do
  putStrLn "Enter \"exit\" to exit."

  q <- open
  putStrLn "connection opened"

  loop q
  where loop q = do
          s <- BS.getLine
          case s of
            "exit" -> return ()
            req -> do
              atomically $ writeTQueue q req
              loop q

open :: IO (TQueue BS.ByteString)
open = do
  req <- atomically newTQueue
  let chat :: CreateProcess
      chat = proc "cat" []

      input :: Producer IO BS.ByteString
      input = toProducer
            $ CTQ.sourceTQueue req
           -- .| CL.mapM_ (\bs -> BS.putStrLn (("queue: " :: BS.ByteString) <> bs))

      output :: Consumer BS.ByteString IO ()
      output = toConsumer
             $ CL.mapM_ BS.putStrLn

  _ <- forkIO (sourceProcessWithStreams chat input output output >> pure ())
  pure req

With newer lts it seems like the problem is not with communication via TQueue, as uncommenting the line which prints content from input conduit gives shows data from the queue. It looks like the spawned process never receives anything on it's stdin.

Furthermore writing to spawned cat stdin from console, like so:

echo "test" > /proc/<pid of spawned cat>/fd/0

produces output in my program.

Am I missing something that changed between versions?


Solution

  • So the issue was that default behaviour of sinkHandle was changed to not flush after every chunk of data.

    I've fixed the issue by first porting to Data.Conduit.Process.Typed and then rolling my own variant of createSink that is using sinkHandleFlush instead of sinkHandle.