Search code examples
javasocketskotlinzeromqjeromq

How to make ZMQ pub client socket buffer messages while sub server socket is down


Given 2 applications where application A is using a publisher client to contentiously stream data to application B which has a sub server socket to accept that data, how can we configure pub client socket in application A such that when B is being unavailable (like its being redeployed, restarted) A buffers all the pending messages and when B becomes available buffered messages go trough and socket catches up with real time stream?

In a nutshell, how do we make PUB CLIENT socket buffer messages with some limit while SUB SERVER is unavailable?

The default behaviour for PUB client is to drop in mute state, but it would be great if we could change that to a limit sized buffer, is it possible with zmq? or do i need to do it on application level...

I've tried setting HWM and LINGER in my sockets, but if i'm not wrong they are only responsible for slow consumer case, where my publisher is connected to subscriber, but subscriber is so slow that publisher starts to buffer messages (hwm will limit number of those messages)...

I'm using jeromq since i'm targeting jvm platform.


Solution

  • I'm posting a quick update since the other two answers (though very informative were actually wrong), and i dont want others to be misinformed from my accepted answer. Not only you can do this with zmq, it is actually the default behaviour.

    The trick is that if you publisher client never connected to the subscriber server before it keeps dropping messages (and that is why i was thinking it does not buffer messages), but if your publisher connects to subscriber and you restart subscriber, publisher will buffer messages until HWM is reached which is exactly what i asked for... so in short publisher wants to know there is someone on the other end accepting messages only after that it will buffer messages...

    Here is some sample code which demonstrates this (you might need to do some basic edits to compile it).

    I used this dependency only org.zeromq:jeromq:0.5.1.

    zmq-publisher.kt

    fun main() {
       val uri = "tcp://localhost:3006"
       val context = ZContext(1)
       val socket = context.createSocket(SocketType.PUB)
    
       socket.hwm = 10000
       socket.linger = 0
       "connecting to $uri".log()
       socket.connect(uri)
    
       fun publish(path: String, msg: Msg) {
          ">> $path | ${msg.json()}".log()
          socket.sendMore(path)
          socket.send(msg.toByteArray())
       }
    
       var count = 0
    
       while (notInterrupted()) {
          val msg = telegramMessage("message : ${++count}")
          publish("/some/feed", msg)
          println()
    
          sleepInterruptible(1.second)
       }
    }
    

    and of course zmq-subscriber.kt

    
    fun main() {
       val uri = "tcp://localhost:3006"
       val context = ZContext(1)
       val socket = context.createSocket(SocketType.SUB)
    
       socket.hwm = 10000
       socket.receiveTimeOut = 250
    
       "connecting to $uri".log()
       socket.bind(uri)
    
       socket.subscribe("/some/feed")
    
       while (true) {
          val path = socket.recvStr() ?: continue
          val bytes = socket.recv()
          val msg = Msg.parseFrom(bytes)
          "<< $path | ${msg.json()}".log()
       }
    }
    

    Try running publisher first without subscriber, then when you launch subscriber you missed all the messages so far... now without restarting publisher, stop subscriber wait for some time and start it again.

    Here is an example of one of my services actually benefiting from this... This is the structure [current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]

    Because i restart the service in the middle, all the publishers start buffering their messages, the final service that was observing ~200 messages per second observes drop to 0 (those 1 or 2 are heartbeats) then sudden burst of 1000+ messages come in, because all publishers flushed their buffers (restart took about 5 seconds)... I am actually not loosing a single message here...

    enter image description here

    Note that you must have subscriber:server <= publisher:client pair (this way publisher knows "there is only one place i need to deliver these messages to" (you can try binding on publisher and connecting on subscriber but you will not see publisher buffering messages anymore simply because its questionable if subscriber that just disconnected did it because it no longer needs the data or because it failed)