I have several thousand output files that can't be process concurrently, therefore I want a function that process a chunk of n files at a time. So I decided to use TBQueue.
The idea of the implementation is to fill initially the Queue with n dummy values and then the loop tries to read the next dummy value in the queue. If there are values on the queue an IO action is performed and when the IO action is done a new value is added to the queue. Otherwise readTBQueue will block until one of the process finish (at least is what I hope).
My questions are then: 1. When there are not more files to process, is the main thread going to wait until all children finish? 2. What would happen if one async crashes? is the dummy value going to be written on the queue?
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
atomically $ replicateM_ n $ writeTBQueue tbQ ()
loop fs tbQ
where loop :: [FilePath] -> TBQueue () -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
readTBQueue queue
let file = head files
return $ withAsync (fun file) $ \a -> do
wait a
atomically $ writeTBQueue queue ()
loop (tail files) queue
Following the suggestion of MathematicalOrchid (Thanks!), I write a new implementation
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
loop fs tbQ
where loop :: [FilePath] -> TBQueue FilePath -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
writeTBQueue queue (head files)
let actionSTM = atomically $ readTBQueue queue
return $ withAsync actionSTM $ \a -> do
file <- wait a
async (fun file) >>= doSomethingOnException
loop (tail files) queue
doSomethingOnException :: Async () -> IO ()
doSomethingOnException a = do
r <- waitCatch a
case r of
Left exception -> undefined
Right _ -> return ()
But I'm still not sure if when the loop function returns, it Must wait for the pending jobs or not.
You seem to have two separate concerns here: synchronisation and reliability.
STM is all about letting multiple threads access mutable data without corrupting it. TBQueue
should handle that just fine. If you want "crashed" operations to get restarted... you need to build extra infrastructure for that.
Is there a specific reason why you fill the queue with "dummy values" rather than, say, the actual filenames to be processed? If it were me, the main threat's job would be to fill a queue with filenames (when the queue gets too full, the main thread will get blocked while the worker threads do their work). If you want to recover from "crashed" threads, the top-level code for each worker for each thread catches exceptions and retries the operation or something. Or, that's how I'd do it...