Search code examples
spring-integrationspring-cloud-dataflow

Why aggregator app encode messages in base64?


I'm trying to use the aggregator processor 2021.1.x with SCDF 2.9.6 and Kafka.

It aggregates the messages except I get a list of base64 encoded messages instead of json messages. Something like :

[
  0: "base64encodedString",
  1: "base64encodedString"
]

Instead of :

[
  {id: 1, bar: "foo"},
  {id: 2, bar: "foo"}
]

I only set redis store properties and let default settings for aggregation, correlation and release.

The splitted messages have all the contentType header set to "application/json".

Why is this happening and how can I fix it ?

EDIT : Here is an example :
The DSL : test-aggregator = http | splitter | aggregator | log
Deployed with these properties :

version.http=3.2.1
version.splitter=3.2.1
version.aggregator=2021.1.x
version.log=3.2.1

app.http.server.port=8181
app.splitter.splitter.expression=#jsonPath(payload, '$.store.book')
app.aggregator.spring.cloud.stream.kafka.default.consumer.standard-headers=both

Then I post this JSON file to the http source :

{ "store": {
    "book": [
        {
            "author": "Nigel Rees",
            "title": "Sayings of the Century"
        },
        {
            "author": "Evelyn Waugh",
            "title": "Sword of Honour"
        },
        {
            "author": "Herman Melville",
            "title": "Moby Dick"
        },
        {
            "author": "J. R. R. Tolkien",
            "title": "The Lord of the Rings"
        }
    ]
}}

To do that, I use the SCDF shell like this :

http post --target http://<ip-http-source>:8181 --file data/test.json --contentType "application/json; charset=utf-8"

When I check the Kafka messages using Kowl after the splitter, I have the 4 books messages as JSON with the good contentType header. But after the aggregator, here is the results in the log sink and also with Kowl :

[
  "eyJhdXRob3IiOiJOaWdlbCBSZWVzIiwidGl0bGUiOiJTYXlpbmdzIG9mIHRoZSBDZW50dXJ5In0=",
  "eyJhdXRob3IiOiJFdmVseW4gV2F1Z2giLCJ0aXRsZSI6IlN3b3JkIG9mIEhvbm91ciJ9",
  "eyJhdXRob3IiOiJIZXJtYW4gTWVsdmlsbGUiLCJ0aXRsZSI6Ik1vYnkgRGljayJ9",
  "eyJhdXRob3IiOiJKLiBSLiBSLiBUb2xraWVuIiwidGl0bGUiOiJUaGUgTG9yZCBvZiB0aGUgUmluZ3MifQ=="
]

Solution

  • So, here is an answer for the current aggregator-processor version:

    configure this property:

    aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]
    

    It means to deserialize a byte[] payload of each grouped message to the Map, which is essentially are good Java representation of any JSON.

    This way an aggregator will produce a reply as a list of maps which in the end will be serialized as JSON on the binder producer.

    We are thinking about providing something out-of-the-box as a deserialization from byte array to map on the aggregator input, but there is a doubt if an input is always a JSON. So, we need to come up with some robust, common solution yet.