I'm learning about the reactive web, for a tutorial I wanted to get some twitter hashtag result search from my spring webflux rest service to an angular 6 client.
When hitting my localhost:8081/search/test
in chrome, I m getting the tweets in a json format in a reactive way (tweet by tweet and browser is showing every coming one).
so for more pleasure, I made a small angular search input and I would show in console tweets
the issue is when I search for java
tag I will get console logs then if I try to search for spring
tag I will have spring tweets logged in the console and the java ones are still coming
I did some research and I found that I should unsubscribe my consumer from the flux.
I tried to implement this but no success
Here is What I tried
Spring WebFlux Controller
private TwitHashLocalService localService;
private TwitHashRemoteService remoteService;
@GetMapping(value="search/{tag}",produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> getByTag(@PathVariable String tag) throws TwitterException{
return localService.findByTag(tag).mergeWith(remoteService.findByTag(tag).doOnNext(tweet -> localService.save(tweet)));
}
My services
local mongo db
private MongoService mongoService;
public Flux<Tweet> findByTag(String tag) {
return mongoService.findByTag(tag);
}
remote twitter stream flux
public Flux<Tweet> findByTag(String hashtag) throws TwitterException {
return Flux.create(sink -> {
TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
twitterStream.onStatus(status -> sink.next(Tweet.fromStatus(status,hashtag)));
twitterStream.onException(sink::error);
twitterStream.filter(hashtag);
sink.onCancel(twitterStream::shutdown);
});
}
ANGULAR
My reactive twitter search service
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { ITweet } from './itweet';
import { Observable, of } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class ReactiveTwitterService {
myTweets: ITweet[] = new Array();
tweetTag: string;
baseUrl = 'http://localhost:8081/search';
constructor(private http_client: HttpClient) { }
getTweetStream(tag): Observable<Array<ITweet>> {
this.myTweets = [];
const url = this.baseUrl + '/' + tag;
return Observable.create(observer => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
console.log('received event');
const json = JSON.parse(event.data);
console.log(json);
console.log(json.tweetData.name, json.tweetData.text, json.tag);
this.myTweets.push(new ITweet(json.tweetData.name, json.tweetData.text, json.tag));
observer.next(this.myTweets);
};
eventSource.onerror = (error) => {
// readyState === 0 (closed) means the remote source closed the connection,
// so we can safely treat it as a normal situation. Another way of detecting the end of the stream
// is to insert a special element in the stream of events, which the client can identify as the last one.
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
};
});
}
}
component search bar
import { Component, OnInit, HostListener } from '@angular/core';
import { ReactiveTwitterService } from '../reactive-twitter.service';
import { Observable, Subscription } from 'rxjs';
import { ITweet } from '../itweet';
@Component({
selector: 'app-serach-bar',
templateUrl: './serach-bar.component.html',
styleUrls: ['./serach-bar.component.css']
})
export class SerachBarComponent implements OnInit {
innerWidth: number;
subscription: Subscription = new Subscription();
placeholder = 'search';
styleClass = {
wide_screen: 'w3-input w3-light-grey',
break_point: 'w3-input w3-white'
};
tweets: Observable<ITweet[]>;
constructor(private twiterService: ReactiveTwitterService) { }
doSearch(tag) {
console.log('test' + tag);
this.subscription.unsubscribe();
this.tweets = this.twiterService.getTweetStream(tag);
this.subscription.add(this.tweets.subscribe());
}
ngOnInit() {
}
@HostListener('window:resize', ['$event'])
onResize(event) {
this.innerWidth = window.innerWidth;
}
getStyle() {
return (innerWidth > 769) ? this.styleClass.wide_screen : this.styleClass.break_point;
}
}
As you see in the search I m trying to unsubscribe before researching but this is not working
What should I do?
From the code I see, there is no cancelation handler on the Angular side.
Try to do the following, to react on unsubscribe at JS side:
Observable.create(observer => {
const eventSource = new EventSource(url);
// your code here
return () => eventSource.close(); // on cancel function
});
.log()
ingI would recommend adding additional logging to the Reactor's pipe, so it will be clear what signals are propagated through the pipe. Use .log()
operator for that purpose.
Use your browsers debug console to observe all opened connections to the server. Make sure, that after changing tag/ cleaning search request the connection is closed
All propagated events through the reactive pipe are an async and non-blocking, so there could be some delay between actual action and final cancelation on the server side