Search code examples
akkaakka-clusterdistributedpubsub

Can't receive the messag in Akka Cluster


I'm attempting to send a message from senderActor within Microservice A using the concept of "Classic Distributed Publish Subscribe in Cluster" I'm trying to publish the message to the content-topic, but I'm not receiving it in the ReceiverActor located in Microservice B. Do you have any insight into what might be causing this issue?

Thank you!

the Conf-file in A

akka {
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      enabled = on
      canonical {
        hostname = "localhost"
        port = 2551
      }
    }
  }
  cluster {
    seed-nodes = [
      "akka://cluster-system@localhost:2551"
    ]
  }
}

The SenderActor:

public class SenderActor extends AbstractActor {
    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        String.class,
                        in -> {
                            String out = in.toUpperCase();
                            mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
                        })
                .build();
    }
}

the main in Microservice A:

    public static void main(String[] args) {
        SpringApplication.run(MicroserviceAaApplication.class, args);
        Config config = ConfigFactory.load("application.conf");

        ActorSystem system = ActorSystem.create("cluster-system", config);
        Cluster cluster = Cluster.get(system);
        ActorRef publisher = system.actorOf(Props.create(SenderActor.class), "publisher");
        publisher.tell("hello from microservice A ", null);
        
    }

the Conf-file in B

akka {
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      enabled = on
      canonical {
        hostname = "localhost"
        port = 2552
      }
    }
  }
  cluster {
    seed-nodes = [
      "akka://cluster-system@localhost:2551"
    ]
  }
}

The ReceiverActor class:

public class ReceiverActor extends AbstractActor {
    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    public ReceiverActor() {
        ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
        mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg -> {
                    log.info("Received message in receiver-actor-b: {}", msg);
                })
                .match(DistributedPubSubMediator.SubscribeAck.class,
                        msg -> log.info("SubscribeAck: {}", msg))
                .build();
    }
}

the main in Microservice B:

    public static void main(String[] args) {
        SpringApplication.run(MicroserviceBbApplication.class, args);
        Config config = ConfigFactory.load("application.conf");

        ActorSystem system = ActorSystem.create("cluster-system", config);
        Cluster cluster = Cluster.get(system);
        ActorRef subscriber = system.actorOf(Props.create(ReceiverActor.class), "subscriber");
    }

itry to get the message "hello from microservice A ", but i get just the flowings logs in B:

2023-08-20T15:53:14.067+02:00  INFO 7788 --- [           main] c.e.M.MicroserviceBbApplication          : Started MicroserviceBbApplication in 0.909 seconds (process running for 1.723)
2023-08-20T15:53:14.552+02:00  INFO 7788 --- [lt-dispatcher-5] akka.event.slf4j.Slf4jLogger             : Slf4jLogger started
2023-08-20T15:53:14.900+02:00  INFO 7788 --- [lt-dispatcher-5] akka.remote.artery.ArteryTransport       : Remoting started with transport [Artery tcp]; listening on address [akka://cluster-system@localhost:2552] with UID [6730186213375081245]
2023-08-20T15:53:14.925+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Starting up, Akka version [2.8.3] ...
2023-08-20T15:53:14.980+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Registered cluster JMX MBean [akka:type=Cluster]
2023-08-20T15:53:14.981+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Started up successfully
2023-08-20T15:53:15.006+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#downing
2023-08-20T15:53:15.624+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Received InitJoinAck message from [Actor[akka://cluster-system@localhost:2551/system/cluster/core/daemon#1745370250]] to [akka://cluster-system@localhost:2552]
2023-08-20T15:53:15.675+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Welcome from [akka://cluster-system@localhost:2551]
2023-08-20T15:53:16.077+02:00  INFO 7788 --- [lt-dispatcher-5] c.example.MicroserviceBB.ReceiverActor   : SubscribeAck: SubscribeAck(Subscribe(content,None,Actor[akka://cluster-system/user/subscriber1#-339648796]))
2023-08-20T16:24:19.549+02:00  INFO 7788 --- [t-dispatcher-40] akka.actor.CoordinatedShutdown           : Running CoordinatedShutdown with reason [JvmExitReason]

Process finished with exit code 130


Solution

  • It's important to remember that Distributed PubSub will only deliver messages to subscribers that happen to be subscribed at the time the mediator receives the published message (I'm simplifying a bit, but that's the intuition to have).

    In Microservice A's main, you start the cluster forming (the process of forming the cluster happens in the background) and then immediately spawn the SenderActor and send it a message, which gets forwarded to the mediator. Note also that the cluster can form with just one node (since Microservice A defines itself as a seed node), so you're sending the message to the mediator before the ReceiverActor in Microservice B has subscribed; as a consequence, the message is dropped.

    In this case, it may be useful to have an actor in Microservice A which is subscribed to a different pubsub topic; the ReceiverActor publishes to that topic that it has subscribed to the "content" topic, which causes the subscribing actor in Microservice A to tell the SenderActor that it can publish messages to the topic.

    As a side note, it's not clear if Microservices A and B are meant to be instances of the same microservice or are truly different microservices. If the latter, it's not recommended to use Akka Cluster for communication between microservices.