Search code examples
multithreadinghaskellstm

Using TChan with Timeout


I have a TChan as input for a thread which should behave like this:

If sombody writes to the TChan within a specific time, the content should be retrieved. If there is nothing written within the specified time, it should unblock and continue with Nothing.

My attempt on this was to use the timeout function from System.Timeout like this:

timeout 1000000 $ atomically $ readTChan pktChannel

This seemed to work but now I discovered, that I am sometimes loosing packets (they are written to the channel, but not read on the other side. In the log I get this:

2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439

Where "Pushing Recorded Packet" is the writing from the one thread and "Sending Recorded Packet" is the reading from the TChan in the sender thread. The line with Sending Recorded Packet 2 1439 is missing, which would indicate a successful read from the TChan.

It seems that if the timeout is received at the wrong point in time, the channel looses the packet. I suspect that the threadKill function used inside timeout and STM don't play well together.

Is this correct? Does somebody have another solution that does not loose the packet?


Solution

  • Use registerDelay, an STM function, to signal a TVar when the timeout is reached. You can then use the orElse function or the Alternative operator <|> to select between the next TChan value or the timeout.

    import Control.Applicative
    import Control.Monad
    import Control.Concurrent
    import Control.Concurrent.STM
    import System.Random
    
    -- write random values after a random delay
    packetWriter :: Int -> TChan Int -> IO ()
    packetWriter maxDelay chan = do
      let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
      forM_ xs $ \ x -> do
        threadDelay x
        atomically $ writeTChan chan x
    
    -- block (retry) until the delay TVar is set to True
    fini :: TVar Bool -> STM ()
    fini = check <=< readTVar
    
    -- Read the next value from a TChan or timeout
    readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
    readTChanTimeout timeoutAfter pktChannel = do
      delay <- registerDelay timeoutAfter
      atomically $
            Just <$> readTChan pktChannel
        <|> Nothing <$ fini delay
    
    -- | Print packets until a timeout is reached
    readLoop :: Show a => Int -> TChan a -> IO ()
    readLoop timeoutAfter pktChannel = do
      res <- readTChanTimeout timeoutAfter pktChannel
      case res of
        Nothing -> putStrLn "timeout"
        Just val -> do
          putStrLn $ "packet: " ++ show val
          readLoop timeoutAfter pktChannel
    
    main :: IO ()
    main = do
      let timeoutAfter = 1000000
    
      -- spin up a packet writer simulation
      pktChannel <- newTChanIO
      tid <- forkIO $ packetWriter timeoutAfter pktChannel
    
      readLoop timeoutAfter pktChannel
    
      killThread tid