Search code examples
typescriptrxjsgrafanagrafana-plugin

rxjs bindCallback Subscription on mqtt-client message-event only triggers once and does not trigger DataQueryResponse subscriber.next()


I am currently trying to create a streaming DataSource plugin for Grafana. To do this, I have adapted the code from the official instructions so that data from an MQTT Topic is passed to Grafana. Here is the official working demo code with a timed output of the frame:

Code from the Grafana Guide

query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
  const streams = options.targets.map(target => {
    const query = defaults(target, defaultQuery);

    return new Observable<DataQueryResponse>(subscriber => {
      const frame = new CircularDataFrame({
        append: 'tail',
        capacity: 1000,
      });

      frame.refId = query.refId;
      frame.addField({ name: 'time', type: FieldType.time });
      frame.addField({ name: 'value', type: FieldType.number });

      const intervalId = setInterval(() => {
        frame.add({ time: Date.now(), value: Math.random() });

        subscriber.next({
          data: [frame],
          key: query.refId,
        });
      }, 100);

      return () => {
        clearInterval(intervalId);
      };
    });
  });

  return merge(...streams);
}

The connection to my Mosquitto broker and the submitting and receiving of the data works great. What is problematic is passing the received data to the Grafana dashboard through a subscriber that returns a CircularDataFrame.

As suggested in this answer, I use the rxjs bindCallback function to watch the message eventhandler. In doing so, the console output within the message callback function is printed correctly for each incoming message. Whereas the console output in the subscription of the clientOnObs instance is only triggered on the first message and no more after that. However, it should be the case that the console output in the subscription is also triggered on each incoming message, so that I can expand the CircularDataFrame and then pass it to the Grafana Dashboard via subscriber.next(). I already tried swapping the order of bindCallback() and this.mqttClient.on(), but then no message was output in the subscribe part at all. Additionally I tried to put the subscriber.next() call directly into the this.mqttClient.on('message') callback, also without success.

The goal is to have subscriber.next() fire on every incoming message to pass the new data to the dashboard. How do I need to modify my implementation of the query method to achieve this?

My adapted code

query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
    const streams = request.targets.map(target => {
      const query = defaults(target, defaultQuery);

      return new Observable<DataQueryResponse>(subscriber => {
        const frame = new CircularDataFrame({
          append: 'tail',
          capacity: 1000,
        });

        frame.refId = query.refId;
        frame.addField({ name: 'time', type: FieldType.time });
        frame.addField({ name: 'value', type: FieldType.number });

        const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);

        this.mqttClient.on('message', (topic: string, message: any) => {
          // this is printed correctly with every message
          console.log(topic, JSON.parse(message.toString()));
        });

        return () => {
          clientOnObs('message').subscribe((payload: any[]) => {
            // const topic: string = payload[0];
            const message = JSON.parse(payload[1].toString());
            
            // This is only printed on the first incoming message
            console.log(message);

            frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
            
            // The frame is not passed on at all
            subscriber.next({
              data: [frame],
              key: query.refId,
            });
          });
        };
      });
    });

    return merge(...streams);
  }

Solution

  • bindCallback is a function used to transform a callback-based function into an Obaservable which emits once and then completes. So this is probably the reason the subscription is triggered only for the first message.

    If I look at your code, what I would try to do is the following

    query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
        const streams = request.targets.map(target => {
          const query = defaults(target, defaultQuery);
    
          return new Observable<DataQueryResponse>(subscriber => {
            const frame = new CircularDataFrame({
              append: 'tail',
              capacity: 1000,
            });
    
            frame.refId = query.refId;
            frame.addField({ name: 'time', type: FieldType.time });
            frame.addField({ name: 'value', type: FieldType.number });
    
            this.mqttClient.on('message', (topic: string, message: any) => {
              // call next on the subscriber here
              const message = JSON.parse(payload[1].toString());
              frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
              subscriber.next({
                data: [frame],
                key: query.refId,
              });
            });
    
            return () => {
              // place here any code that has to run to clean up when the Observable
              // completes. In the graphana official example, for instance, this is 
              // where the interval is cleared
            };
          });
        });
    
        return merge(...streams);
      }