I am reading the content from file, I put some name into the file with new line separator but while reading the file by vertx file-system I am not able to provide the required filter. every-time it print all the data of file.
Here is the code snippet :-
vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
final AsyncFile asyncFile = handler.result();
final Observable<Buffer> observable = asyncFile.toObservable();
observable.subscribe(item -> {
final String[] split = item.toString().split("\n\r");
List<String> list = Arrays.asList(split);
final Observable<String> stringObservable = Observable.fromIterable(list);
stringObservable
.filter(name -> name.toString().startsWith("R"))
.take(2)
.subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));
}, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
});
Internal observable I used after i find out the above observable have all the data of file in one go.
This code snippet actually works fine for me if I split on "\n" instead of "\n\r". If you're having issues with it printing everything, my guess is that the first character in the whole file is "R", and then you don't actually have "\n\r" anywhere in the file. So when you try to split, you end up with just one large String that has the whole file in it.
That said, you can simplify this code significantly by using rxOpen
instead of open
, avoiding the nesting of subscribes, and simplifying how you turn the Buffer
into an Observable
that emits each line in the Buffer
.
Also note that I used RecordParser
to split the contents of the file into tokens using the "\n" character as a delimiter. Just directly turning AsyncFile
into an Observable<Buffer>
using toObservable()
could result in a Buffer
s that cut off partway through a line, which would screw up your parsing.
So putting it all together:
vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
.flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
.map(Buffer::toString)
.filter(name -> name.startsWith("R"))
.take(2)
.subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));
You go from 13 lines to 6.