Search code examples
spring-cloud-streamspring-cloud-dataflow

Spring Cloud DataFlow: Getting payload as List<Map>


Using Spring Cloud DataFlow 1.3.0.M2 with Spring Cloud Stream Starters Celsius.M1.

I have two processors. First produces a List<Map> that is supposed to be consumed by the other. Here is simplified code.

// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
    final List<Map<String, Object>> results = worker.doWork(payload);
    LOG.debug("Returning " + results.size() + " objects");
    return results;
}

// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
    LOG.debug("Received " + payload.size() + " objects");
    final List<Map<String, Object>> results = worker.moreWork(payload);
    return results;
}

I am deploying these two processors in a pipeline using the SCDF shell:

<source> | otherProcessors | processor1 | processor2 | log

The debug message for Processor 1 says it has 2 objects in the List. Processor 2's debug message says it received 40 objects (each map has 20 key=value pairs) - it appears the two maps got flattened into one list of key=value pairs.

I have debug logging enabled for org.spring.integration and the message appears to have a list of maps format (this is from Processor 2):

preSend on channel 'input', message: GenericMessage 
  [payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
   {"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]

I'd like Processor 2 to receive the 2 maps produced by Processor 1. I wonder if this is something related to generic types. Can someone point me towards the configuration to make this happen?

---- Update for Artem's comments ----

Processor 1 has this in its application.properties file:

spring.cloud.stream.bindings.output.content-type=application/json

I had also tried modifying the stream definition like this but it didn't seem to make a difference:

<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log

Solution

  • Ok, you actually stumbled upon a bug :)

    This has been fixed on the 2.0 branch, which is a bit unstable for the moment considering it's a snapshot.

    Things should be better once we release in a few days.

    Team is discussing the path forward on back porting the fix to 1.3 line.