I was following document for writing a Mqtt client subscriber using alpakka.
After the code marked in bold, I’m not sure how could I retrieve/interact with subscribed messages. Any lead?
Pair<SourceQueueWithComplete<Command>, CompletionStage> run =
Source.<Command>queue(3, OverflowStrategy.fail())
.via(mqttFlow)
.collect(
new JavaPartialFunction<DecodeErrorOrEvent, Publish>() {
@Override
public Publish apply(DecodeErrorOrEvent x, boolean isCheck) {
if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
return (Publish) x.getEvent().get().event();
else throw noMatch();
}
})
.toMat(Sink.head(), Keep.both())
.run(system);
SourceQueueWithComplete<Command> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
new Command<>(
new Publish(
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString(“ohi”))));
// for shutting down properly
commands.complete();
commands.watchCompletion().thenAccept(done → session.shutdown());
Also, in the following example, it shows how to subscribe to the client but nothing about how to get messages after the subscription.
Will be grateful if anyone knows the solution or can point to any resource which uses the same connector as mqtt client and can retrieve messages.
The code to retrieve messages for the subscriber is hidden in the client
method which is used for both publisher and subscriber:
...
//Only the Publish events are interesting for the subscriber
.collect { case Right(Event(p: Publish, _)) => p }
.wireTap(event => logger.info(s"Client: $connectionId received: ${event.payload.utf8String}"))
.toMat(Sink.ignore)(Keep.both)
.run()
I was struggling with this connector and then tried an example with the one based on Eclipse Paho, which in the end looks better:
Paul