Search code examples
rx-javareactive-programmingrx-java2vert.xrx-java3

Vertx RxJava Operator iteration not filtering the content


I am reading the content from file, I put some name into the file with new line separator but while reading the file by vertx file-system I am not able to provide the required filter. every-time it print all the data of file.

Here is the code snippet :-

vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
                final AsyncFile asyncFile = handler.result();
                final Observable<Buffer> observable = asyncFile.toObservable();
                observable.subscribe(item -> {
                    final String[] split = item.toString().split("\n\r");
                    List<String> list = Arrays.asList(split);
                    final Observable<String> stringObservable = Observable.fromIterable(list);
                    stringObservable
                            .filter(name -> name.toString().startsWith("R"))
                            .take(2)
                            .subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));


                }, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
            });

Internal observable I used after i find out the above observable have all the data of file in one go.


Solution

  • This code snippet actually works fine for me if I split on "\n" instead of "\n\r". If you're having issues with it printing everything, my guess is that the first character in the whole file is "R", and then you don't actually have "\n\r" anywhere in the file. So when you try to split, you end up with just one large String that has the whole file in it.

    That said, you can simplify this code significantly by using rxOpen instead of open, avoiding the nesting of subscribes, and simplifying how you turn the Buffer into an Observable that emits each line in the Buffer.

    Also note that I used RecordParser to split the contents of the file into tokens using the "\n" character as a delimiter. Just directly turning AsyncFile into an Observable<Buffer> using toObservable() could result in a Buffers that cut off partway through a line, which would screw up your parsing.

    So putting it all together:

    vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
        .flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
        .map(Buffer::toString)
        .filter(name -> name.startsWith("R"))
        .take(2)
        .subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));
    

    You go from 13 lines to 6.