A reactor transform function I created does not give the same result when I extract the individual steps and execute them separately.
I have DTO record that contains a List of type Price.
public record CarrierDto(MetaData metaData, List<Daily> dailySeries) {
public record MetaData(String name, String information) {};
public record Daily(String date, Integer price) {};
I want to extract from a Mono<CarrierDto>
a Flux<Daily>
and then transform that to Flux<Price>
public record Price (
@Column("p_date") String date,
@Column("p_price") Integer price) {
To do this transformation I have this function:
static Function<Mono<CarrierDto>, Flux<Price>> transform = carrierDtoMono ->
.map(daily -> new Price(daily.date(), daily.price()));
However when I try to use this function only one price is given upon subscription. If I extract each individual step and execute ALL prices are provided. For example with this test class:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
public class TestMain {
static Function<Mono<CarrierDto>, Flux<Price>> transform = carrierDtoMono ->
.map(daily -> new Price(daily.date(), daily.price()));
public static void main(String[] args) {
final CarrierDto dto = new CarrierDto(new CarrierDto.MetaData("Test", "Test information"),
Arrays.asList(new CarrierDto.Daily("2023-01-01", 100),
new CarrierDto.Daily("2023-01-02", 101),
new CarrierDto.Daily("2023-01-03", 102),
new CarrierDto.Daily("2023-01-04", 103)));
System.out.println("Using Transform function.");
.subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));
System.out.println("Without transform function.");
final Mono<List<CarrierDto.Daily>> monoDailySeries = Mono.just(dto).map(CarrierDto::dailySeries);
final Flux<CarrierDto.Daily> dailyFlux = monoDailySeries.flatMapMany(Flux::fromIterable);
final Flux<Price> priceFlux = dailyFlux.map(daily -> new Price(daily.date(), daily.price()));
priceFlux.subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));
Gives the result:
Using Transform function.
Date: 2023-01-01 Price: 100
Without transform function.
Date: 2023-01-01 Price: 100
Date: 2023-01-02 Price: 101
Date: 2023-01-03 Price: 102
Date: 2023-01-04 Price: 103
Why is the result different?
That's because the transformer
in Mono
produces a new Mono
, i.e., requesting only the first item from the Publisher.
Internally it has:
Mono<V> transform(...) {
I suggest to start with Flux (or convert Mono to Flux). Like for your example:
static Function<Flux<CarrierDto>, Flux<Price>> transform = carrierDtoFlux ->
.map(daily -> new Price(daily.date(), daily.price()));
.subscribe(price -> System.out.println(String.format("Date: %s Price: %d", price.date(), price.price())));