Search code examples
rxjsgrafanarxjs6

Subscribing to pipe containing TakeUntil fails


I am trying to write a Grafana plugin using RxJS. Every logic to get my data stream to Grafana works and the complete pipeline works.

Now I need to stop the stream in case any options are changed. I found in different blog posts and in the documentation the advice to use takeUntil.

Therefore, I created a observable emitting a event when I want to abort the stream. But any try to subscribe to the stream fails now, with

You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

This problem exists only when takeUntil is introduced. This is especially weird since I use TypeScript and therefore the Return Type should be safe.

To debug the problem I pasted the example from the docs

/// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

//emit value every 1s
const source = interval(1000);
//after 5 seconds, emit value
const timer$ = timer(5000);
//when timer emits after 5s, complete source
const example = source.pipe(takeUntil(timer$));
//output: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));

The problem persists. I checked my installed version of rxjs and yarn resolved it to 6.5.3. I checked the GitHub Issues of rxjs but didn't find any related issues.

I suppose the problem lies within my environment and conclusively I included all of my depedencies:

 "dependencies": {
    "@grafana/toolkit": "next",
    "@grafana/ui": "latest",
    "@types/grafana": "github:CorpGlory/types-grafana.git",
    "@stomp/rx-stomp": "latest",
    "rxjs-tslint-rules": "^4.25.0"
    "sockjs-client": "^1.4.0",
    "ts-loader": "^6.2.0",
    "typescript": "^3.6.4"    
  }

Snippet of my own implementation

class LiveStreams {
 private streams;
 private stop$: Subject;

 constructor() {
   this.stop$ = new Subject<string>();
 }

 getStream(target): Observable<DataFrame[]> {
     const stop_id = this.stop$.pipe(takeWhile(id => id == target.streamId));
     stream = this.rxStomp.watch(target.streamName).pipe(
                finalize(() => {
                    delete this.streams[target.streamId];
                }),
                map((message: IMessage) => {
                    let content = JSON.parse(message.body).content;
                    appendResponseToBufferedData(content, data);
                    return [data];
                }),
                throttleTime(target.throttleTime),
                takeUntil(stop_id)
            );
       this.streams[target.streamId] = stream;
   } 
}

Any clues on which parts of my environment to check would be appreciated.


Solution

  • I didn't find a real answer, but I managed to circumvent to usage of TakeUntil for Grafana.

    For anyone else coming here for Grafana: Grafana automatically unsubscribes if the requests do not match anymore. Therefore it is just necessary to remove the own cached version of the stream.

    delete streams[streamId]