Search code examples
rakucro

Raku Cro service subscribing to data "in the background" general guidance


I am attempting to put together a Cro service that has a react/whenever block consuming data "in the background" So unlike many examples of websocket usage with Cro, this has nothing to do with routes that may be accessed via the browser.

My use case is to consume message received via an MQTT topic and do some processing with them. At a later stage in development I might create a supply out of this data, but for now, when data is received it will be stored in a variable and dependant on certain conditions, be sent to another service via a http post.

My thought was to include a provider() in the Cro::HTTP::Server setup like so:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

And in the DataProvider.pm6:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

This throws a bunch of errors:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

To be perfectly honest, I am totally guessing that this is how I would approach the need to subscribe to data in the background of a Cro service, but I was not able to find any information on what might be considered the recommended approach.

Initially I had my react/whenever block in the main service.pm6 file, but that did not seem right. And needed to be wrapped in a start{} block because as I have just learned, react is blocking :) and cro was not able to actually start.

But following the pattern of how Routes are implemented seemed logical, but I am missing something. The error speaks about setting up a new method, but I'm not convinced that is the root cause. Routes.pm6 does not have a constructor.

Can anyone point me in the right direction please?


Solution

  • You seem to be fine now but when I first saw this I made this https://github.com/jonathanstowe/Cro-MQTT which turns the MQTT client into a first class Cro service.

    I haven't released it yet but it may be instructive.