Search code examples
multithreadinghaskelltypessignals-slots

Storing arbitrary function calls across threads


I'm trying to write a library aiming to reproduce Qt's threading semantics: signals can be connected to slots, and all slots execute in a known thread, so that slots tied to the same thread are threadsafe with regards to each other.

I have the following API:

data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

The problem is getting from emit to execute. The argument needs to be stored at runtime somehow, and then an IO action performed, but I can't seem to get past the type checker.

The things I need:

  1. Type safety: signals shouldn't be connected to slots expecting a different type.
  2. Type-independence: there can be more than one slots for any given type (Perhaps this can be relaxed with newtype and/or TH).
  3. Ease of use: since this is a library, signals and slots should be easy to create.

The things I've tried:

  • Data.Dynamic: makes the whole thing really fragile, and I haven't found a way to perform a correctly-typed IO action on a Dynamic. There's dynApply, but it's pure.
  • Existential types: I need to execute the function passed to mkSlot, as opposed to an arbitrary function based on the type.
  • Data.HList: I'm not smart enough to figure it out.

What am I missing?


Solution

  • Firstly, are you sure Slots really want to execute in a specific thread? It's easy to write thread-safe code in Haskell, and threads are very lightweight in GHC, so you're not gaining much by tying all event-handler execution to a specific Haskell thread.

    Also, mkSlot's callback doesn't need to be given the Slot itself: you can use recursive do-notation to bind the slot in its callback without adding the concern of tying the knot to mkSlot.

    Anyway, you don't need anything as complicated as those solutions. I expect when you talk about existential types, you're thinking about sending something like (a -> IO (), a) through a TChan (which you mentioned using in the comments) and applying it on the other end, but you want the TChan to accept values of this type for any a, rather than just one specific a. The key insight here is that if you have (a -> IO (), a) and don't know what a is, the only thing you can do is apply the function to the value, giving you an IO () — so we can just send those through the channel instead!

    Here's an example:

    import Data.Unique
    import Control.Applicative
    import Control.Monad
    import Control.Concurrent
    import Control.Concurrent.STM
    
    newtype SlotGroup = SlotGroup (IO () -> IO ())
    
    data Signal a = Signal Unique (TVar [Slot a])
    data Slot a = Slot Unique SlotGroup (a -> IO ())
    
    -- When executed, this produces a function taking an IO action and returning
    -- an IO action that writes that action to the internal TChan. The advantage
    -- of this approach is that it's impossible for clients of newSlotGroup to
    -- misuse the internals by reading the TChan or similar, and the interface is
    -- kept abstract.
    newSlotGroup :: IO SlotGroup
    newSlotGroup = do
      chan <- newTChanIO
      _ <- forkIO . forever . join . atomically . readTChan $ chan
      return $ SlotGroup (atomically . writeTChan chan)
    
    mkSignal :: IO (Signal a)
    mkSignal = Signal <$> newUnique <*> newTVarIO []
    
    mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
    mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f
    
    connect :: Signal a -> Slot a -> IO ()
    connect (Signal _ v) slot = atomically $ do
      slots <- readTVar v
      writeTVar v (slot:slots)
    
    emit :: Signal a -> a -> IO ()
    emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)
    
    execute :: Slot a -> a -> IO ()
    execute (Slot _ (SlotGroup send) f) a = send (f a)
    

    This uses a TChan to send actions to the worker thread each slot is tied to.

    Note that I'm not very familiar with Qt, so I may have missed some subtlety of the model. You can also disconnect Slots with this:

    disconnect :: Signal a -> Slot a -> IO ()
    disconnect (Signal _ v) (Slot u _ _) = atomically $ do
      slots <- readTVar v
      writeTVar v $ filter keep slots
      where keep (Slot u' _) = u' /= u
    

    You might want something like Map Unique (Slot a) instead of [Slot a] if this is likely to be a bottleneck.

    So, the solution here is to (a) recognise that you have something that's fundamentally based upon mutable state, and use a mutable variable to structure it; (b) realise that functions and IO actions are first-class just like everything else, so you don't have to do anything special to construct them at runtime :)

    By the way, I suggest keeping the implementations of Signal and Slot abstract by not exporting their constructors from the module defining them; there are many ways to tackle this approach without changing the API, after all.