Search code examples
spring-cloudspring-webfluxspring-cloud-stream

issue with spring webflux stream in spring cloud data flow


I have custom spring cloud data flow source application which connects to a rest end point and buffer the data and send to a custom sink . I am using Spring webflux for the same. Everything works fine if i do a block() and collect all the data available and send the same to the message Channel .The issue is i need to send the data in chunks and hence buffer the data in stream say chunks of 100K records and send to the Message Channel .Now when i try to do the buffer and send the chunks to the messaging channel its failing .

Working Code (Blocking call to collect all data and send at once)

@InboundChannelAdapter(
    value = Source.OUTPUT
    poller = @Poller(fixedDelay = "120000", maxMessagesPerPoll = "100000")
public  MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
    .get()
    .uri(builder -> builder.scheme("https")
    .host("server_name")
    .path("/api/endpoint")                  
    .build())
    .header("Cookie",cookie)
    .accept(MediaType.APPLICATION_JSON)
    .exchange()
    .flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))      
    .collectList()
    .block();                      
 return ()->MessageBuilder.withPayload(list).build();
}        

Now i want to buffer the same and send it instead of doing a collect and blocking call but this is failing

SOURCE CODE

@Scheduled(fixedDelay = 120000)
public  MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
List<MyDataDTO> list=WebClient.create()
    .get()
    .uri(builder -> builder.scheme("https")
    .host("server_name")
    .path("/api/endpoint")                  
    .build())
    .header("Cookie",cookie)
    .accept(MediaType.APPLICATION_JSON)
    .exchange()
    .flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())              
    .buffer(100000)
    .concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
}

@InboundChannelAdapter(
    value = Source.OUTPUT
)
public MessageSource<List<MyDataDTO>> sendToChannel(List<MyDataDTO> list ) 
{
    return ()->MessageBuilder.withPayload(list).build();
}

STACK TRACE

2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] 2019-09-16 06:27:47.859 ERROR 9 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:117) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.Thread.run(Thread.java:748) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] Caused by: java.lang.IllegalArgumentException: wrong number of arguments 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at java.lang.reflect.Method.invoke(Method.java:498) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:246) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:230) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:114) 2019-09-16T02:27:47.860-04:00 [APP/PROC/WEB/0] [OUT] ... 18 more 2019-09-16T02:27:48.533-04:00 [CELL/0] [OUT] Container became healthy


Solution

  • I got this working with below code

    @Scheduled(fixedDelay = 120000)
    public  MessageSource<List<MyDataDTO>> sendEdpiMessageToChannel() {
    List<MyDataDTO> list=WebClient.create()
        .get()
        .uri(builder -> builder.scheme("https")
        .host("server_name")
        .path("/api/endpoint")                  
        .build())
        .header("Cookie",cookie)
        .accept(MediaType.APPLICATION_JSON)
        .exchange()
        .flatMapMany(clientResponse -> clientResponse.bodyToFlux(MyDataDTO.class))//.subscribeOn(Schedulers.parallel())              
        .buffer(100000)
        .concatMap(streamRecordsList -> Mono.fromRunnable(()->sendToChannel(streamRecordsList));
    }
    
    public void sendToChannel(List<MyDataDTO> list ) 
    {
       this.source.output().send(MessageBuilder.withPayload(list).build());
    }