Search code examples
mqttakka-streamalpakka

How to retrieve messages from Alpakka Mqtt Streaming client?


I was following document for writing a Mqtt client subscriber using alpakka.

https://doc.akka.io/docs/alpakka/3.0.4/mqtt-streaming.html?_ga=2.247958340.274298740.1642514263-524322027.1627936487

After the code marked in bold, I’m not sure how could I retrieve/interact with subscribed messages. Any lead?

Pair<SourceQueueWithComplete<Command>, CompletionStage> run =
Source.<Command>queue(3, OverflowStrategy.fail())
.via(mqttFlow)
.collect(
new JavaPartialFunction<DecodeErrorOrEvent, Publish>() {
@Override
public Publish apply(DecodeErrorOrEvent x, boolean isCheck) {
if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
return (Publish) x.getEvent().get().event();
else throw noMatch();
}
})
.toMat(Sink.head(), Keep.both())
.run(system);

SourceQueueWithComplete<Command> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
new Command<>(
new Publish(
ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
topic,
ByteString.fromString(“ohi”))));

// for shutting down properly
commands.complete();
commands.watchCompletion().thenAccept(done → session.shutdown());

Also, in the following example, it shows how to subscribe to the client but nothing about how to get messages after the subscription.

https://github.com/pbernet/akka_streams_tutorial/blob/master/src/main/scala/alpakka/mqtt/MqttEcho.scala

Will be grateful if anyone knows the solution or can point to any resource which uses the same connector as mqtt client and can retrieve messages.


Solution

  • The code to retrieve messages for the subscriber is hidden in the client method which is used for both publisher and subscriber:

         ...
         //Only the Publish events are interesting for the subscriber
        .collect { case Right(Event(p: Publish, _)) => p }
        .wireTap(event => logger.info(s"Client: $connectionId received: ${event.payload.utf8String}"))
        .toMat(Sink.ignore)(Keep.both)
        .run()
    

    https://github.com/pbernet/akka_streams_tutorial/blob/3e4484c5356e55522366e65e42e1741c18830a18/src/main/scala/alpakka/mqtt/MqttEcho.scala#L136

    I was struggling with this connector and then tried an example with the one based on Eclipse Paho, which in the end looks better:

    https://github.com/pbernet/akka_streams_tutorial/blob/3e4484c5356e55522366e65e42e1741c18830a18/src/main/scala/alpakka/mqtt/MqttPahoEcho.scala#L41

    Paul