Search code examples
javareactive-programmingspring-webfluxproject-reactor

Avoid GC pressure with Reactor Publisher


I search for a way to correctly employ Publishers from Project Reactor without producing useless GC pressure by instantiating the whole pipeline on each call.

In our code a typical handle function answering inter service HTTP requests looks like so:

final List<Function<ChangeEvent, Mono<Void>>> triggerOtherMicroservices;

@PostMapping("/handle")
public Mono<Void> handle(@RequestBody ChangeEvent changeEvent) {
    return Mono
            .defer(() -> someService.callToAnotherMicroServiceToFetchData(changeEvent))
            .subscribeOn(Schedulers.parallel())
            .map(this::mapping)
            .flatMap(data -> databaseService.save(data))
            .thenMany(Flux.fromIterable(triggerOtherMicroservices).flatMap(t -> t.apply(changeEvent)))
            .then();
}

If I understand correctly this means, on each invocation of handle the whole pipeline (which normally has huge stacktraces) needs to be instantiated (and thus collected later).

My question is: Can I somehow "prepare" the whole flow once and reuse it later?

I was thinking about something like Mono.create( ... ) ..... Or, am I completely wrong and there is no need to think about optimization here?

EDIT:

Thinking further I could do:

final List<Function<ChangeEvent, Mono<Void>>> triggerOtherMicroservices;

final Mono<Void> mono = Mono
        .defer(() -> Mono
                .subscriberContext()
                .map(context -> context.get("event"))
                .flatMap(event -> someService.callToAnotherMicroServiceToFetchData(event))
        )
        .subscribeOn(Schedulers.parallel())
        .flatMap(data -> databaseService.save(data))
        .thenMany(Mono
                .subscriberContext()
                .map(context -> context.get("event"))
                .flatMap(event -> Flux
                        .fromIterable(triggerOtherMicroservices)
                        .flatMap(t -> t.apply(event)))
        )
        .then(); 

public Mono<Void> handle(@Validated ChangeEvent changeEvent) throws NoSuchElementException {
    return mono.subscriberContext(context -> context.put("event", changeEvent));
}

Anyway, I doubt this is what subscriberContext is meant for.


Solution

  • Note: There are many JVM implementations and this answer doesn't claim to have tested all of them, nor to be a general statement for all possible situations.

    According to https://www.bettercodebytes.com/the-cost-of-object-creation-in-java-including-garbage-collection/, it is possible that there is no overhead of object creation when objects only live within a method. This is, since the JIT doesn't actually instantiate the object but rather executes the contained methods directly. Hence, there is also no garbage collection required later on.

    A test of this combined with the question can be implemented like so:

    Controller:

    final List<Function<Event, Mono<Void>>> triggerOtherMicroservices = Arrays.asList(
            event -> Mono.empty(),
            event -> Mono.empty(),
            event -> Mono.empty()
    );
    
    final Mono<Void> mono = Mono
            .defer(() -> Mono
                    .subscriberContext()
                    .<Event>map(context -> context.get("event"))
                    .flatMap(this::fetch)
            )
            .subscribeOn(Schedulers.parallel())
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .flatMap(this::duplicate)
            .thenMany(Mono
                    .subscriberContext()
                    .<Event>map(context -> context.get("event"))
                    .flatMapMany(event -> Flux
                            .fromIterable(triggerOtherMicroservices)
                            .flatMap(t -> t.apply(event))
                    )
            )
            .then();
    
    @PostMapping("/event-prepared")
    public Mono<Void> handle(@RequestBody @Validated Event event) throws NoSuchElementException {
        return mono.subscriberContext(context -> context.put("event", event));
    }
    
    @PostMapping("/event-on-the-fly")
    public Mono<Void> handleOld(@RequestBody @Validated Event event) throws NoSuchElementException {
        return Mono
                .defer(() -> fetch(event))
                .subscribeOn(Schedulers.parallel())
                .flatMap(this::duplicate)
                .flatMap(this::duplicate)
                .flatMap(this::duplicate)
                .flatMap(this::duplicate)
                .thenMany(Flux.fromIterable(triggerOtherMicroservices).flatMap(t -> t.apply(event)))
                .then();
    }
    
    
    private Mono<Data> fetch(Event event) {
        return Mono.just(new Data(event.timestamp));
    }
    
    private Mono<Data> duplicate(Data data) {
        return Mono.just(new Data(data.a * 2));
    }
    

    Data:

    long a;
    
    public Data(long a) {
        this.a = a;
    }
    
    @Override
    public String toString() {
        return "Data{" +
                "a=" + a +
                '}';
    }
    

    Event:

     @JsonSerialize(using = EventSerializer.class)
     public class Event {
         UUID source;
         long timestamp;
    
         @JsonCreator
         public Event(@JsonProperty("source") UUID source, @JsonProperty("timestamp") long timestamp) {
             this.source = source;
             this.timestamp = timestamp;
         }
    
         @Override
         public String toString() {
             return "Event{" +
                     "source=" + source +
                     ", timestamp=" + timestamp +
                     '}';
         }
     }
    

    EventSerializer:

     public class EventSerializer extends StdSerializer<Event> {
    
         public EventSerializer() {
             this(null);
         }
    
         public EventSerializer(Class<Event> t) {
             super(t);
         }
    
         @Override
         public void serialize(Event value, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException {
             jsonGenerator.writeStartObject();
             jsonGenerator.writeStringField("source", value.source.toString());
             jsonGenerator.writeNumberField("timestamp", value.timestamp);
             jsonGenerator.writeEndObject();
         }
     }
    

    and finally the test itself:

     @SpringBootTest
     @AutoConfigureWebTestClient
     class MonoAssemblyTimeTest {
    
         @Autowired
         private WebTestClient webTestClient;
    
         final int number_of_requests = 500000;
    
         @Test
         void measureExecutionTime() throws IOException {
             measureExecutionTime("on-the-fly");
             measureExecutionTime("prepared");
         }
    
         private void measureExecutionTime(String testCase) throws IOException {
             warmUp("/event-" + testCase);
    
             final GCStatisticsDifferential gcStatistics = new GCStatisticsDifferential();
             long[] duration = benchmark("/event-" + testCase);
    
             StringBuilder output = new StringBuilder();
             int plotPointsInterval = (int) Math.ceil((float) number_of_requests / 1000);
    
             for (int i = 0; i < number_of_requests; i++) {
                 if (i % plotPointsInterval == 0) {
                     output.append(String.format("%d , %d %n", i, duration[i]));
                 }
             }
    
             Files.writeString(Paths.get(testCase + ".txt"), output.toString());
    
             long totalDuration = LongStream.of(duration).sum();
             System.out.println(testCase + " duration: " + totalDuration / 1000000 + " ms.");
             System.out.println(testCase + " average: " + totalDuration / number_of_requests + " ns.");
             System.out.println(testCase + ": " + gcStatistics.get());
         }
    
         private void warmUp(String path) {
             UUID source = UUID.randomUUID();
             IntStream.range(0, number_of_requests).forEach(i -> call(new Event(source, i), path));
             System.out.println("done with warm-up for path: " + path);
         }
    
         private long[] benchmark(String path) {
             long[] duration = new long[number_of_requests];
    
             UUID source = UUID.randomUUID();
             IntStream.range(0, number_of_requests).forEach(i -> {
                 long start = System.nanoTime();
                 call(new Event(source, i), path).returnResult().getResponseBody();
                 duration[i] = System.nanoTime() - start;
             });
             System.out.println("done with benchmark for path: " + path);
             return duration;
         }
    
         private WebTestClient.BodySpec<Void, ?> call(Event event, String path) {
             return webTestClient
                     .post()
                     .uri(path)
                     .contentType(MediaType.APPLICATION_JSON)
                     .bodyValue(event)
                     .exchange()
                     .expectBody(Void.class);
         }
    
         private static class GCStatisticsDifferential extends GCStatistics {
    
             GCStatistics old = new GCStatistics(0, 0);
    
             public GCStatisticsDifferential() {
                 super(0, 0);
                 calculateIncrementalGCStats();
             }
    
             public GCStatistics get() {
                 calculateIncrementalGCStats();
                 return this;
             }
    
             private void calculateIncrementalGCStats() {
                 long timeNew = 0;
                 long countNew = 0;
    
                 for (GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {
    
                     long count = gc.getCollectionCount();
    
                     if (count >= 0) {
                         countNew += count;
                     }
    
                     long time = gc.getCollectionTime();
    
                     if (time >= 0) {
                         timeNew += time;
                     }
                 }
    
                 time = timeNew - old.time;
                 count = countNew - old.count;
    
                 old = new GCStatistics(timeNew, countNew);
             }
    
         }
    
         private static class GCStatistics {
             long count, time;
    
             public GCStatistics(long count, long time) {
                 this.count = count;
                 this.time = time;
             }
    
             @Override
             public String toString() {
                 return "GCStatistics{" +
                         "count=" + count +
                         ", time=" + time +
                         '}';
             }
         }
    
     }
    

    The results are not always the same, but the "on-the-fly" method constantly outperforms the "prepared" method. Plus, the "on-the-fly" method has way less garbage collections.

    A typical result looks like:

    done with warm-up for path: /event-on-the-fly

    done with benchmark for path: /event-on-the-fly

    on-the-fly duration: 42679 ms.

    on-the-fly average: 85358 ns.

    on-the-fly: GCStatistics{count=29, time=128}

    done with warm-up for path: /event-prepared

    done with benchmark for path: /event-prepared

    prepared duration: 44678 ms.

    prepared average: 89357 ns.

    prepared: GCStatistics{count=86, time=67}

    This result were done on a MacBook Pro (16-inch, 2019), 2,4 GHz 8-Core Intel Core i9, 64 GB 2667 MHz DDR4.

    Note: Comments, better answers, or ... are still very welcome.