I'm trying to make distributed pub-sub across different cluster system but it's not working whatever i try.
All I'm trying to do is create a simple example where.
1) I create a topic, say "content".
2) One node in say jvm A creates the topic, subscribes to it, and a publisher who publishes to it too.
3) In a different node , say jvm B on a different port , I create a subscriber.
4) When i sent a message to the topic from jvm A, then I want the subscriber on jvm B to receive it too as its subscribed to the same topic.
Any helps would be greatly appreciated or a simple working example of distributed pub sub with subscribers and publishers in different cluster system on different ports, in Java.
here is the code for app1 and its config file.
public class App1{
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2551");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
clusterClientReceptionist1.registerSubscriber("content", subcriber1);
ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
clusterClientReceptionist1.registerSubscriber("content", publisher1);
publisher1.tell("testMessage1", ActorRef.noSender());
}
}
app1.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
code for app2 and its config file
public class App
{
public static Set<ActorPath> initialContacts() {
return new HashSet<ActorPath>(Arrays.asList(
ActorPaths.fromString("akka.tcp://[email protected]:2551/system/receptionist")));
}
public static void main( String[] args ) {
System.setProperty("akka.remote.netty.tcp.port", "2553");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
clusterSystem).withInitialContacts(initialContacts())), "client");
ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
clusterClientReceptionist2.registerSubscriber("content", subcriber2);
ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
publisher2.tell("testMessage2", ActorRef.noSender());
clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);
}
}
app2.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
Publisher and Subscriber class are same for both application which is given below.
Publisher:
public class Publisher extends UntypedActor {
private final ActorRef mediator =
DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
} else {
unhandled(msg);
}
}
}
Subscriber:
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber(){
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());
}
public void onReceive(Object msg) throws Throwable {
if (msg instanceof String) {
log.info("Got: {}", msg);
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
log.info("subscribing");
} else {
unhandled(msg);
}
}
}
i got this error in receiver side app while running both apps.Dead letters encounterd
[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
and in sender side app message send successfully is displayed in log.
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
Using the ClusterClient in that way does not really make sense and does not have anything to do with using the distributed pub sub, as both your nodes are a part of the cluster you can just use the distributed pub sub api directly.
Here is a simple main including config creating a two node cluster using your exact Publisher and Subscriber actors that works as expected:
public static void main(String[] args) throws Exception {
final Config config = ConfigFactory.parseString(
"akka.actor.provider=cluster\n" +
"akka.remote.netty.tcp.port=2551\n" +
"akka.cluster.seed-nodes = [ \"akka.tcp://[email protected]:2551\"]\n");
ActorSystem node1 = ActorSystem.create("ClusterSystem", config);
ActorSystem node2 = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
.withFallback(config));
// wait a bit for the cluster to form
Thread.sleep(3000);
ActorRef subscriber = node1.actorOf(
Props.create(Subscriber.class),
"subscriber");
ActorRef publisher = node2.actorOf(
Props.create(Publisher.class),
"publisher");
// wait a bit for the subscription to be gossiped
Thread.sleep(3000);
publisher.tell("testMessage1", ActorRef.noSender());
}
Note that distributed pub sub does not give any guarantees of delivery, so if you send a message before the mediators has gotten in contact with each other, the message will simply be lost (hence the Thread.sleep
statements, which are ofc not something you should do in actual code).