Search code examples
clojurechannelcore.async

How to avoid dropping items when using core.async pub/sub?


I have a channel acting as a publisher:

(def publisher (async/chan))
(def publication (async/pub publisher :topic))

Because of the nature of sub/pub, when I do this:

(async/put! publisher {:topic :foo})

The message gets consumed by the publication, and since there are no subscribers, it will get dropped.

If I try to subscribe to the :foo topic:

(def reader (async/chan))
(async/sub publication :foo reader)
(async/go (println "got val " (async/<! reader)))

I will see nothing printed. But if I put more items in the publisher:

(async/put! c1 {:topic :foo :msg "after"})
==> got val {:topic :foo :msg "after"}

Is there a way to not lose, say, the last n items produced by the publisher even if the subscriber has not subscribed yet?


Solution

  • pub accepts a buf-fn function which is given the topic. This function should return a buffer. Such as dropping-buffer or sliding-buffer. So if you want your :foo topic to be buffered:

    (pub pub-ch :topic #(if (= % :foo) (sliding-buffer 10) nil))
    

    Also see the relevant code section.