Search code examples
distributed-systemaeron

How to setup multiple concurrent subscribers for a multicast channel?


enter image description hereI am trying to explore the Aeron, I have a cluster of three nodes in my machine. What I am trying is setting up a simple model: one channel publisher in Node 1, and two channel subscriptions in Node 2, and Node 3.

Three of them are trying to communicate via multicast address as below:

  • Node 1: Broadcast to aeron:udp?endpoint=239.255.255.1:4300|interface=192.168.64.6|ttl=16
  • Node 2: Starting Receive... channel: aeron:udp?endpoint=239.255.255.1:4300|interface=192.168.64.5|ttl=16
  • Node 3: Starting Receive... channel: aeron:udp?endpoint=239.255.255.1:4300|interface=192.168.64.4|ttl=16

I don't understand the result of attached image. Node 1 Publisher is trying to publish as fast as possible into the channel, unfortunately Node 2 and Node 3 are not consuming at the same time, there is only one consuming data at a time... In the screenshot below, node 2 always catchup the publisher, but Node 3 receive a few hundred of messages and then be hanged until Node 2 is completed 1 million of messages firstly. Are there any thing I missed? However if I reduce the frequency of publisher to 1 message per second, for example, then I can see both subscribers Node 2 and Node 3 are receiving messages same time.

The implementation of code: The publisher code: https://github.com/cseblog/aeron-demos/blob/main/aeron-multicast-ping-demo/src/main/java/org/multicast/Main.java

The Subscriber code: https://github.com/cseblog/aeron-demos/blob/main/aeron-multicast-pong-demo/src/main/java/org/multicast/Main.java

I am looking to understand this issue


Solution

  • This is most probably flow control. The default flow control algorithm for multicast is max which will allow slow receivers to drop off. If you want all receivers to see all data then you should try min. You can configure this in a few ways:

    URL configuration:

    channel: aeron:udp?endpoint=239.255.255.1:4300|interface=192.168.64.5|ttl=16|fc=min
    

    Media Driver Code configuration:

    MediaDriver.Context ctx = new MediaDriver.Context()
        .multicastFlowControlSubscriber(new MinMulticastFlowControlSupplier())
    

    Media Driver property configuration:

    aeron.multicast.flow.control.strategy=min