Search code examples
javaspringrx-javanetflixhystrix

How to use Netflix ObservableResult and rxJava within Asynchronous mode


I was trying to use netflix observable however I managed to do so only synchronously:

This is how I define the remote call:

@Named
public class BroConsumerService {
..
@HystrixCommand(fallbackMethod = "stubbedMethod")
    public Observable<String> executeObservableBro(String name) {
        return new ObservableResult<String>() {
            @Override
            public String invoke() {
                return executeRemoteService(name);
            }
        };

    } 

 private String stubbedMethod(String name) {
        return "return stubbed";
    }

//here I am actually invoking (and observing this method)

  @RequestMapping("/executeObservableBro")
    public String executeObservableBro(@RequestParam(value = "name", required = false) String name) throws ExecutionException, InterruptedException {


         Observable<String> result= broConsumerService.executeObservableBro(name);


        result.subscribe(new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println("completed");

            }

            @Override
            public void onError(Throwable e) {
                System.out.printf(e.getMessage());

            }

            @Override
            public void onNext(String s) {
                System.out.println("on next..");

            }
        });
    }

But that works synchronously. I want to be able to "listen" to the executeObservableBro before I execute it. and each time it's being executed ill get notified.

Example would be highly appreciated.

Thanks, ray.


Solution

  • you have to provide schedulers in subscribeOn method like:

    public static void main(String[] args) throws InterruptedException {
    
        Observable<Integer> observable2 = Observable.create(subscriber->{
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Arrays.asList(1, 2, 3).forEach((value)-> subscriber.onNext(value));
            subscriber.onCompleted();
            subscriber.onError(new RuntimeException("error"));
        });
    
        System.out.println("Before");
    
        observable2
                .subscribeOn(Schedulers.io()).subscribe(
                    (next) -> log.info("Next element {}", next),
                    (error) -> log.error("Got exception", error),
                    () -> log.info("Finished")//on complete
        );
    
        System.out.println("After");
        //Thread.sleep(5000); //uncomment this to wait for subscriptions, otherwise main will quit
    }
    

    Its not async by default :)