Search code examples
javaapache-kafkarx-java

How to loop and limit the number of items fetched each time in an Observable


I have the following Observable which receives kafka consumer records and inserts them into a database. It is currently working where I can receive the data as expected in the consumer and extracting those to perform some mapping and put it into a list. Data in this list will then be inserted into the DB.

The way it is written right now, it is going to attempt to insert everything at the same time. There are chances for the kafka record to hold between 100k - 1 Million records. I am looking for a way to break this up such that I only take 1000 items from the consumer records, insert into DB and repeat again for the next 1000 items and keep going till the records is empty. Is this possible?

I attempted to use variations of take, takeuntil with repeat, but they do not work. As in after I subscribe, the call just ends, does not even enter the observable when I do these.

Could I get some advice on how I could write this such that I can fetch 1000 records from the kafka records, insert them to db and keep doing this until done with all kafka records? Thanks.

Please note I am using RXJava 1 and need to stick to this version.

private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();

public Observable<KafkaConsumerRecord<String, CustomObj>> dbInsert(KafkaConsumerRecords<String, CustomObj> records) {

    return Observable.just(records.getDelegate().records())
            // attempting to loop based on following counts. Not preferred but unsure of a better way.
            // the figures captured here are correct.
            // plus this doesn't currently matter anyway cos not able to get it to work using takeUntil, repeat. 
            .doOnSubscribe(() -> {
                RECORD_COUNT.set(records.getDelegate().records().count());
                REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
                REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
            })
            .map(consumerRecords -> consumerRecords.records("Topic name"))
            .map(it -> {
                List<CustomRequest> requests = new ArrayList<>();
                it.forEach(r -> {
                    ConsumerRecord<String, SomeObj> record = (ConsumerRecord<String, SomeObj>) r;
                    CustomRequest request = new CustomRequest (
                        new String(record.headers().headers("id").iterator().next().value(), StandardCharsets.UTF_8),
                        Long.parseLong(new String(record.headers().headers("code").iterator().next().value(), StandardCharsets.UTF_8)),
                        record.value()
                    );
                    requests.add(request);
                });
                return requests;
            })
            // nothing happens if I uncomment these. 
            // .takeUntil(customRequests -> customRequests.size() == INSERT_COUNT.get())
            // .repeat(REPEAT_COUNT.get())
            .doOnNext(customRequests -> {
                // planning to do some db inserts here in a transaction of 1000 inserts at a time. 
            })
            .doOnCompleted(() -> System.out.println("Completed"));
}

Solution

  • The following should work with RxJava 1.3.8

    rx.Observable.from(List.of(1, 2, 3, 4, 5, 6))
      .buffer(2)
      .doOnNext(r -> System.out.println(r))
      .subscribe();
    

    following was the output -

    [1, 2]
    [3, 4]
    [5, 6]
    

    I used following version to test the above code -

    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxjava</artifactId>
      <version>1.3.8</version>
    </dependency>