Search code examples
javajsonmongodbreactive-programmingmicronaut

Stream large response in Micronaut controller without going out of memory


We're using Micronaut with Mongo to expose data through some controllers. Because the size of the response entities is growing, our apps go out of memory sometimes. So we’re investigating switching to the async mongo driver and using reactive responses to stream the data to clients. Unfortunately we can’t change API response structures nor content types (all application/json)

One of our APIs returned entities structured like this:

[
  { "field": "value" },
  { "field": "value" },
  ...
  { "field": "value" }
]

This we got working using this controller, where the dataStore returns a Publisher<Example>:

    @Get("all")
    Flowable<Example> getAllExamples() {
        return Flowable.fromPublisher(dataStore.find()).map(SomeMapper::toPublic);
    }

This works nicely, the huge list of examples doesn’t have to be fully loaded into memory before streaming it out to the client.

Other APIs return the (imo more sensible) structure:

{
  "list": [
    { "field": "value" },
    { "field": "value" },
    ...
    { "field": "value" }
  ],
  "meta": {
    ...
  }
}

Can we apply a similar publisher/flowable pattern for entities like this, or are stuck loading data for such responses into memory before sending them out?

We tried signatures like:

    @Get("all/dev")
    Single<ExamplesWrapper> getAllDev() {
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .map(mapper::map)
                .collect((Callable<ArrayList<Example>>) ArrayList::new, ArrayList::add)
                .map(ExampleWrapper::new);
    }

Where the wrapper would add some metadata. But this again loads it all into memory before sending it out, crashing the app.

Adding the Flowable into the response wrapper:


public class ExamplesWrapper {

    private final Flowable<Example> examples;

    @ConstructorProperties({"examples"})
    public ExamplesWrapper(Flowable<Example> examples) {
        this.examples = examples;
    }

    public Flowable<Example> getExamples() {
        return examples;
    }
}

Also fails with some nice Jackson mapping exception.

Metadata is not dependent on the actual example data (it adds some static company info). Can we somehow implement such an endpoint without having to load all data into memory?


Solution

  • From documentation:

    6.20 Writing Response Data

    Reactively Writing Response Data

    Micronaut’s HTTP server supports writing chunks of response data by returning a Publisher that emits objects that can be encoded to the HTTP response.

    The following table summarizes example return type signatures and the behaviour the server exhibits to handle them: Return Type Description

    • Flowable<byte[]>: A Flowable that emits each chunk of content as a byte[] without blocking
    • Flux<ByteBuf>: A Reactor Flux that emits each chunk as a Netty ByteBuf
    • Publisher<String>: A Publisher that emits each chunk of content as a String
    • Flowable<Book> When emitting a POJO, each emitted object is encoded as JSON by default without blocking

    When returning a reactive type, the server uses a Transfer-Encoding of chunked and keeps writing data until the Publisher onComplete method is called.

    I understand this so that if you want Micronaut mechanism to stream your stuff you'd need to have signature like Flowable<item> or Flux<item> or Publisher<item>, where item is a chunk of your response, not a full item. Micronaut will then respond with chunks as they come from Flowable or equivalent.

    In this case, one thing I thought of is that you can do splitting into suitable chunks yourself. That way streaming large responses without buffering them into memory should work.

    So something like this:

    @Get("all")
    public Flowable<String> getAllExamples() {
        ObjectMapper objectMapper = new ObjectMapper();
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .map(mapper::map)
                .concatMap(item -> Flowable.just(objectMapper.writeValueAsString(item), ","))
                .startWith("{\"list\": [")
                .concatWith(Flowable.just("],\"meta\":\"whatever\"}"));
    }
    

    it is hacky, but seems to work out for such a case.


    Some approaches that didn't work:

    I did test writing directly to JsonGenerator in a custom Jackson mapper, flushing objects as they go as outlined in jackson streaming api, but micronaut RoutingInboundHandler does not seem to flush the response back to end user but buffer it, resulting in out of memory. Approach works with Spring Boot so it is possibly a lacking feature in Micronaut.

    Same buffering happened for me also when using Micronaut Writeable (blocking) responses and trying to flush data as it is written. I opened an issue about that to micronaut core.