Search code examples
angularspring-webfluxproject-reactorangular2-observablesunsubscribe

Why I m still getting data from my webflux spring boot even if I unsubscribe?


I'm learning about the reactive web, for a tutorial I wanted to get some twitter hashtag result search from my spring webflux rest service to an angular 6 client.

When hitting my localhost:8081/search/test in chrome, I m getting the tweets in a json format in a reactive way (tweet by tweet and browser is showing every coming one).

so for more pleasure, I made a small angular search input and I would show in console tweets

the issue is when I search for java tag I will get console logs then if I try to search for spring tag I will have spring tweets logged in the console and the java ones are still coming

I did some research and I found that I should unsubscribe my consumer from the flux.

I tried to implement this but no success

Here is What I tried

Spring WebFlux Controller

private TwitHashLocalService localService;
    private TwitHashRemoteService remoteService;

    @GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE) 
    public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
        return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
    }

My services

local mongo db

private MongoService mongoService;

    public Flux<Tweet> findByTag(String tag) {

        return mongoService.findByTag(tag);
    }

remote twitter stream flux

public Flux<Tweet> findByTag(String hashtag) throws TwitterException {

    return Flux.create(sink -> {
        TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
        twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
        twitterStream.onException(sink::error);
        twitterStream.filter(hashtag);
        sink.onCancel(twitterStream::shutdown);
    }); 

}

ANGULAR

My reactive twitter search service

import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';


@Injectable({
    providedIn: 'root'
})
export class ReactiveTwitterService {

    myTweets: ITweet[] = new Array();

    tweetTag: string;

    baseUrl = 'http://localhost:8081/search';

    constructor(private http_client: HttpClient) { }

    getTweetStream(tag): Observable<Array<ITweet>> {

        this.myTweets = [];
        const url = this.baseUrl + '/' + tag;

        return Observable.create(observer => {
            const eventSource = new EventSource(url);
            eventSource.onmessage = (event) => {
                console.log('received event');
                const json = JSON.parse(event.data);
                console.log(json);
                console.log(json.tweetData.name, json.tweetData.text, json.tag);
                this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
                observer.next(this.myTweets);
            };
            eventSource.onerror = (error) => {
                // readyState === 0 (closed) means the remote source closed the connection,
                // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
                // is to insert a special element in the stream of events, which the client can identify as the last one.
                if (eventSource.readyState === 0) {
                    console.log('The stream has been closed by the server.');
                    eventSource.close();
                    observer.complete();
                } else {
                    observer.error('EventSource error: ' + error);
                }
            };
        });   
    }   
}

component search bar

import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';

@Component({
    selector: 'app-serach-bar',
    templateUrl: './serach-bar.component.html',
    styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
    innerWidth: number;


    subscription: Subscription = new Subscription();
    placeholder = 'search';

    styleClass = {
        wide_screen: 'w3-input w3-light-grey',
        break_point: 'w3-input w3-white'
    };

    tweets: Observable<ITweet[]>;

    constructor(private twiterService: ReactiveTwitterService) { }

    doSearch(tag) {
        console.log('test' + tag);
        this.subscription.unsubscribe();
        this.tweets = this.twiterService.getTweetStream(tag);
        this.subscription.add(this.tweets.subscribe());
    }


    ngOnInit() {
    }

    @HostListener('window:resize', ['$event'])
    onResize(event) {
        this.innerWidth = window.innerWidth;
    }

    getStyle() {
        return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
    }
}

As you see in the search I m trying to unsubscribe before researching but this is not working

What should I do?


Solution

  • 1) Make sure the cancelation is propagated in RxJs

    From the code I see, there is no cancelation handler on the Angular side.

    Try to do the following, to react on unsubscribe at JS side:

    Observable.create(observer => {
            const eventSource = new EventSource(url);
    
            // your code here 
    
            return () => eventSource.close(); // on cancel function
        }); 
    

    2) Add .log()ing

    I would recommend adding additional logging to the Reactor's pipe, so it will be clear what signals are propagated through the pipe. Use .log() operator for that purpose.

    3) Make sure the EventSource is really closed on the browser side.

    Use your browsers debug console to observe all opened connections to the server. Make sure, that after changing tag/ cleaning search request the connection is closed

    4) Remember about eventual consistency

    All propagated events through the reactive pipe are an async and non-blocking, so there could be some delay between actual action and final cancelation on the server side