Search code examples
clojurecore.async

Clojure Recipe for Dynamic Channels/pub-sub


I'm looking for a way to dynamically build up subscribers to publications using core.async (or anything that will work).

Problem: I have messages that I need to process based on the :sender of the message. Each message will be operated on by the same function, but the point is that each sender's messages will be processed in order, one at a time -- multiple topics based on the :sender key with one consumer each. I also need a way to limit the number of active consumers across all subscriptions to keep resource utilization down.

My thought is that I would have a pub channel:

(def in-chan (chan))
(def publication (pub in-chan :sender))

But I want to be able to ensure that there is always a subscriber as new senders are brought online. I'm open to better facilities as long as the code stays small and simple.

Question: Is there an idiomatic way of ensuring there is a subscriber for a specific publication before sending the message? How do I coordinate all the consumers of each subscription to use a shared thread pool?

EDIT: I’ve figured out how to coordinate the work using a thread pool and single consumer per topic. I think for checking if a sub exists I will use a ref of a map to store the topic name to the sub. If the ref doesn’t have an entry for a subscriber, I’d create one and add it to the map; next I’ll register the subscriber to the publication and publish the message. The purpose of this question is to see if there’s a better way to spin up and keep track of subscribers for dynamically created topics.


Solution

  • The solution I came up with is to use a ref:

    (def registration (ref {}))
    

    This registration is used by the thread that writes to the publication channel just before writing:

    (defn register
      [registration topic-name]
      (dosync
        (let [r (ensure registration)
              something-to-track :tracked] ;; In my case, I'm keeping track of a channel
          (when-not (get r topic-name)
            (alter registration assoc topic-name something-to-track)
            something-to-track))))
    

    Whenever we need to publish a message we can use this function to "register" a new subscriber. If one did not previously exist, it will return something-to-track. In my case, it is a channel that I will subsequently call sub on. If it is nil, I can ignore it. To truly not miss messages in a concurrent environment I would need to do something in the transaction (need to understand if ensure would protect me by giving exclusive access to the registration across threads) but my pipeline is so small that I can write to the pub single-threaded.