Search code examples
javareactive-programmingproject-reactor

Reactive transform function not behaving as anticipated


A reactor transform function I created does not give the same result when I extract the individual steps and execute them separately.

I have DTO record that contains a List of type Price.

public record CarrierDto(MetaData metaData, List<Daily> dailySeries) {

    public record MetaData(String name, String information) {};
    public record Daily(String date, Integer price) {};
}

I want to extract from a Mono<CarrierDto> a Flux<Daily> and then transform that to Flux<Price>.

@Table("price")
public record Price (
        @Column("p_date") String date,
        @Column("p_price") Integer price) {
}

To do this transformation I have this function:

static Function<Mono<CarrierDto>, Flux<Price>> transform = carrierDtoMono ->
            carrierDtoMono.map(CarrierDto::dailySeries)
                    .flatMapMany(Flux::fromIterable)
                    .map(daily -> new Price(daily.date(), daily.price()));

However when I try to use this function only one price is given upon subscription. If I extract each individual step and execute ALL prices are provided. For example with this test class:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

public class TestMain {

    static Function<Mono<CarrierDto>, Flux<Price>> transform = carrierDtoMono ->
            carrierDtoMono.map(CarrierDto::dailySeries)
                    .flatMapMany(Flux::fromIterable)
                    .map(daily -> new Price(daily.date(), daily.price()));

    public static void main(String[] args) {
        final CarrierDto dto = new CarrierDto(new CarrierDto.MetaData("Test", "Test information"),
                Arrays.asList(new CarrierDto.Daily("2023-01-01", 100),
                        new CarrierDto.Daily("2023-01-02", 101),
                        new CarrierDto.Daily("2023-01-03", 102),
                        new CarrierDto.Daily("2023-01-04", 103)));

        System.out.println("Using Transform function.");
        Mono.just(dto)
                .transform(transform)
                .subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));

        System.out.println("Without transform function.");

        final Mono<List<CarrierDto.Daily>> monoDailySeries = Mono.just(dto).map(CarrierDto::dailySeries);
        final Flux<CarrierDto.Daily> dailyFlux = monoDailySeries.flatMapMany(Flux::fromIterable);
        final Flux<Price> priceFlux = dailyFlux.map(daily -> new Price(daily.date(), daily.price()));

        priceFlux.subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));
    }
}

Gives the result:

Using Transform function.
Date: 2023-01-01 Price: 100
Without transform function.
Date: 2023-01-01 Price: 100
Date: 2023-01-02 Price: 101
Date: 2023-01-03 Price: 102
Date: 2023-01-04 Price: 103

Why is the result different?


Solution

  • That's because the transformer in Mono produces a new Mono, i.e., requesting only the first item from the Publisher.

    Internally it has:

    Mono<V> transform(...) {
       ...
       from(transformer.apply(this))
    }
    

    I suggest to start with Flux (or convert Mono to Flux). Like for your example:

    static Function<Flux<CarrierDto>, Flux<Price>> transform = carrierDtoFlux ->
            carrierDtoFlux.map(CarrierDto::dailySeries)
                    .flatMap(Flux::fromIterable)
                    .map(daily -> new Price(daily.date(), daily.price()));
    
    Flux.just(dto)
            .transform(transform)
            .subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));