I am trying to write a Grafana plugin using RxJS. Every logic to get my data stream to Grafana works and the complete pipeline works.
Now I need to stop the stream in case any options are changed.
I found in different blog posts and in the documentation
the advice to use takeUntil
.
Therefore, I created a observable emitting a event when I want to abort the stream. But any try to subscribe to the stream fails now, with
You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
This problem exists only when takeUntil is introduced. This is especially weird since I use TypeScript and therefore the Return Type should be safe.
To debug the problem I pasted the example from the docs
/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
The problem persists. I checked my installed version of rxjs and yarn resolved it to 6.5.3. I checked the GitHub Issues of rxjs but didn't find any related issues.
I suppose the problem lies within my environment and conclusively I included all of my depedencies:
"dependencies": {
"@grafana/toolkit": "next",
"@grafana/ui": "latest",
"@types/grafana": "github:CorpGlory/types-grafana.git",
"@stomp/rx-stomp": "latest",
"rxjs-tslint-rules": "^4.25.0"
"sockjs-client": "^1.4.0",
"ts-loader": "^6.2.0",
"typescript": "^3.6.4"
}
Snippet of my own implementation
class LiveStreams {
private streams;
private stop$: Subject;
constructor() {
this.stop$ = new Subject<string>();
}
getStream(target): Observable<DataFrame[]> {
const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
stream = this.rxStomp.watch(target.streamName).pipe(
finalize(() => {
delete this.streams[target.streamId];
}),
map((message: IMessage) => {
let content = JSON.parse(message.body).content;
appendResponseToBufferedData(content, data);
return [data];
}),
throttleTime(target.throttleTime),
takeUntil(stop_id)
);
this.streams[target.streamId] = stream;
}
}
Any clues on which parts of my environment to check would be appreciated.
I didn't find a real answer, but I managed to circumvent to usage of TakeUntil for Grafana.
For anyone else coming here for Grafana: Grafana automatically unsubscribes if the requests do not match anymore. Therefore it is just necessary to remove the own cached version of the stream.
delete streams[streamId]