Search code examples
javareactive-programmingrx-java2rx-java3

RxJava valve use case


Is there an operator in RxJava, an external library or a way I'm missing to create a flowable/observable that recieves a function that controls the emission of data, like a valve?

I have a huge json file I need to process but I have to get a portion of the file, a list of entities, process it and then get another portion, I have tried using windows(), buffer() but the BiFunction I pass to Flowable.generate() keeps executing after I recieved the first list and I haven't finished processing it. I also tried FlowableTransformers.valve() from hu.akarnokd.rxjava3.operators but it just piles up the items before the flatMap() function that process the list

private Flowable<T> flowable(InputStream inputStream) {

    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        final var token = jsonParser.nextToken();

        if (token == null) {
            emitter.onComplete();
        }

        if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
            return jsonParser;
        }

        if (JsonToken.START_OBJECT.equals(token)) {
            emitter.onNext(reader.readValue(jsonParser));
        }

        return jsonParser;
    }, JsonParser::close);
}

Edit: I need to control de emission of items to don't overload the memory and the function that process the data, because that function reads and writes to database, also the processing needs to be sequentially. The function that process the data it's not entirely mine and it's written in RxJava and it's expected that I use Rx.

I managed to solve it like this but if there is another way let me know please:

public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        if (booleanSupplier.get()) {
            final var token = jsonParser.nextToken();

            if (token == null) {
                emitter.onComplete();
            }

            if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
                return jsonParser;
            }

            if (JsonToken.START_OBJECT.equals(token)) {
                emitter.onNext(reader.readValue(jsonParser));
            }

        }
        
        return jsonParser;
    }, JsonParser::close);
}

Edit2: This is one of the ways I'm currently consuming the function

public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
    final var atomicInteger = new AtomicInteger(0);
    final var atomicBoolean = new AtomicBoolean(true);

    return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
            .buffer(pageSize)
            .flatMapSingle(list -> {

                final var counter = atomicInteger.addAndGet(1);

                if (counter == numberOfPages) {
                    atomicBoolean.set(false);
                }

                return function.apply(list)
                        .doFinally(() -> {
                            if (atomicInteger.get() == numberOfPages) {
                                atomicInteger.set(0);
                                atomicBoolean.set(true);
                            }
                        });
            });
}

Solution

  • Managed to solve it like this

     public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
        return Flowable.defer(() -> {
            final var token = jsonParser.nextToken();
    
            if (token == null) {
    
                return Completable.fromAction(jsonParser::close)
                        .doOnError(Throwable::printStackTrace)
                        .onErrorComplete()
                        .andThen(Flowable.empty());
            }
    
    
            if (JsonToken.START_OBJECT.equals(token)) {
                final var value = reader.readValue(jsonParser);
                final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
                return Flowable.concat(just, flowable(jsonParser, reader, valve));
            }
    
    
            return flowable(jsonParser, reader, valve);
        });
    }