I am currently trying to create a streaming DataSource plugin for Grafana. To do this, I have adapted the code from the official instructions so that data from an MQTT Topic is passed to Grafana. Here is the official working demo code with a timed output of the frame
:
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = options.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const intervalId = setInterval(() => {
frame.add({ time: Date.now(), value: Math.random() });
subscriber.next({
data: [frame],
key: query.refId,
});
}, 100);
return () => {
clearInterval(intervalId);
};
});
});
return merge(...streams);
}
The connection to my Mosquitto broker and the submitting and receiving of the data works great. What is problematic is passing the received data to the Grafana dashboard through a subscriber that returns a CircularDataFrame
.
As suggested in this answer, I use the rxjs bindCallback
function to watch the message eventhandler. In doing so, the console output within the message callback function is printed correctly for each incoming message. Whereas the console output in the subscription of the clientOnObs
instance is only triggered on the first message and no more after that. However, it should be the case that the console output in the subscription is also triggered on each incoming message, so that I can expand the CircularDataFrame
and then pass it to the Grafana Dashboard via subscriber.next()
. I already tried swapping the order of bindCallback()
and this.mqttClient.on()
, but then no message was output in the subscribe
part at all. Additionally I tried to put the subscriber.next()
call directly into the this.mqttClient.on('message')
callback, also without success.
The goal is to have subscriber.next()
fire on every incoming message to pass the new data to the dashboard. How do I need to modify my implementation of the query
method to achieve this?
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
const clientOnObs = bindCallback(this.mqttClient.on).bind(this.mqttClient);
this.mqttClient.on('message', (topic: string, message: any) => {
// this is printed correctly with every message
console.log(topic, JSON.parse(message.toString()));
});
return () => {
clientOnObs('message').subscribe((payload: any[]) => {
// const topic: string = payload[0];
const message = JSON.parse(payload[1].toString());
// This is only printed on the first incoming message
console.log(message);
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
// The frame is not passed on at all
subscriber.next({
data: [frame],
key: query.refId,
});
});
};
});
});
return merge(...streams);
}
bindCallback
is a function used to transform a callback-based function into an Obaservable which emits once and then completes. So this is probably the reason the subscription is triggered only for the first message.
If I look at your code, what I would try to do is the following
query(request: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
const streams = request.targets.map(target => {
const query = defaults(target, defaultQuery);
return new Observable<DataQueryResponse>(subscriber => {
const frame = new CircularDataFrame({
append: 'tail',
capacity: 1000,
});
frame.refId = query.refId;
frame.addField({ name: 'time', type: FieldType.time });
frame.addField({ name: 'value', type: FieldType.number });
this.mqttClient.on('message', (topic: string, message: any) => {
// call next on the subscriber here
const message = JSON.parse(payload[1].toString());
frame.add({ time: Math.floor(message.time) * 1000, value: message.temperature });
subscriber.next({
data: [frame],
key: query.refId,
});
});
return () => {
// place here any code that has to run to clean up when the Observable
// completes. In the graphana official example, for instance, this is
// where the interval is cleared
};
});
});
return merge(...streams);
}