Search code examples
haskellasynchronousstm

Async and TBqueue


I have several thousand output files that can't be process concurrently, therefore I want a function that process a chunk of n files at a time. So I decided to use TBQueue.

The idea of the implementation is to fill initially the Queue with n dummy values and then the loop tries to read the next dummy value in the queue. If there are values on the queue an IO action is performed and when the IO action is done a new value is added to the queue. Otherwise readTBQueue will block until one of the process finish (at least is what I hope).

My questions are then: 1. When there are not more files to process, is the main thread going to wait until all children finish? 2. What would happen if one async crashes? is the dummy value going to be written on the queue?

processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
                 tbQ  <- atomically $ newTBQueue n
                 atomically $ replicateM_ n $ writeTBQueue tbQ () 
                 loop fs tbQ
 where loop :: [FilePath] -> TBQueue () -> IO () 
       loop files queue | null files = return ()  
                        | otherwise  = do 
                                       join . atomically $ do 
                                         readTBQueue queue
                                         let file = head files 
                                         return $ withAsync (fun file) $ \a -> do 
                                                        wait a 
                                                        atomically $ writeTBQueue queue ()
                                       loop (tail files) queue 

Following the suggestion of MathematicalOrchid (Thanks!), I write a new implementation

processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
                 tbQ  <- atomically $ newTBQueue n
                 loop fs tbQ
 where loop :: [FilePath] -> TBQueue FilePath -> IO () 
       loop files queue | null files = return ()  
                        | otherwise  = do 
                                       join . atomically $ do 
                                         writeTBQueue queue (head files)
                                         let actionSTM = atomically $ readTBQueue queue
                                         return $ withAsync actionSTM $ \a -> do 
                                                        file <- wait a 
                                                        async (fun file) >>= doSomethingOnException
                                       loop (tail files) queue
       doSomethingOnException  :: Async () -> IO ()
       doSomethingOnException a = do 
           r <- waitCatch a
           case r of
                Left exception -> undefined
                Right _        -> return ()

But I'm still not sure if when the loop function returns, it Must wait for the pending jobs or not.


Solution

  • You seem to have two separate concerns here: synchronisation and reliability.

    STM is all about letting multiple threads access mutable data without corrupting it. TBQueue should handle that just fine. If you want "crashed" operations to get restarted... you need to build extra infrastructure for that.

    Is there a specific reason why you fill the queue with "dummy values" rather than, say, the actual filenames to be processed? If it were me, the main threat's job would be to fill a queue with filenames (when the queue gets too full, the main thread will get blocked while the worker threads do their work). If you want to recover from "crashed" threads, the top-level code for each worker for each thread catches exceptions and retries the operation or something. Or, that's how I'd do it...