Using Spring Cloud DataFlow 1.3.0.M2 with Spring Cloud Stream Starters Celsius.M1.
I have two processors. First produces a List<Map>
that is supposed to be consumed by the other. Here is simplified code.
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
I am deploying these two processors in a pipeline using the SCDF shell:
<source> | otherProcessors | processor1 | processor2 | log
The debug message for Processor 1 says it has 2 objects in the List. Processor 2's debug message says it received 40 objects (each map has 20 key=value pairs) - it appears the two maps got flattened into one list of key=value pairs.
I have debug logging enabled for org.spring.integration
and the message appears to have a list of maps format (this is from Processor 2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
I'd like Processor 2 to receive the 2 maps produced by Processor 1. I wonder if this is something related to generic types. Can someone point me towards the configuration to make this happen?
---- Update for Artem's comments ----
Processor 1 has this in its application.properties
file:
spring.cloud.stream.bindings.output.content-type=application/json
I had also tried modifying the stream definition like this but it didn't seem to make a difference:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
Ok, you actually stumbled upon a bug :)
This has been fixed on the 2.0 branch, which is a bit unstable for the moment considering it's a snapshot.
Things should be better once we release in a few days.
Team is discussing the path forward on back porting the fix to 1.3 line.