Search code examples
javaamazon-web-servicesamazon-dynamodbreactive-programmingdynamodb-queries

Limiting the records per page and the number of pages in DynamoDB


I am using DynamoDBEnchancedAsyncClient to query DynamoDB using GSI and pagination. Below is the code that I am using to achieve the same. I am tying to limit the number of items per page and number of pages sent to the subscriber of the Flux

DynamoDBAsyncIndex<Customer> secindex= dynamodbasyncenhancedclient.table(Customer, Customer_Schema).index(GSI_INDEX_NAME)

SdkPublisher<Page<Customer>> query = secindex.query(QueryEnhancedRequest.builder().queryConditional(queryconditional)).limit(2).build()

Flux.from(PagePublisher.create(query).items().limit(5))

Few observations that I made are

The limit(2) in the QueryEnhancedRequest limits the number of items per page to 2. I checked it by subscribing to the SDKPublisher.

  1. However I want to limit the pages that are sent to the subscriber of the Flux above , but limit(5) is not giving the desired result. Is there any way to limit the number of pages that are emitted from the above flux?

  2. Is there any way to limit the number of pages that are fetched from the DDB for a particular query request?


Solution

  • When calling

    SdkPublisher<Page<Customer>> query = secindex.query(QueryEnhancedRequest.builder().queryConditional(queryconditional)).limit(2).build()
    

    the limit(2) is being called on the PagePublisher being the result of DynamoDbAsyncIndex#query result. In order to provide the QueryEnhancedRequest#limit you should call the limit on the QueryEnhancedRequest.Builder:

    secindex.query(QueryEnhancedRequest.builder().queryConditional(queryconditional).limit(2).build())
    

    On the other hand, the PagePublisher.create(query) result will be a publisher emitting flattened items from the query result pages, i.e., emitting all pages items as a sequence of results. To set the limit of emitted Pages, you should set the limit over the query result SdkPublisher<Page<Customer>>:

    Flux.from(PagePublisher.create(query.limit(5)).items())