Search code examples
spring-cloud-streamspring-cloud-dataflow

mongodb sink issue while writing a json data from http source


My stream setup is http source -> mongodb sink. When I send test message (JSON) from http source, getting below error in mongodb sink.

An error occurred in message handler [bean 'mongoConsumerMessageHandler'; defined in: 'org.springframework.cloud.fn.consumer.mongo.MongoDbConsumerConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@1b11ef33'] on message [GenericMessage [payload=byte[24], headers={amqp_receivedDeliveryMode=PERSISTENT, content-length=24, amqp_receivedExchange=http-rabbit-stream-http-source, amqp_deliveryTag=1, amqp_redelivered=false, b3=a080d23cbf78b659-65e9906748c2b-1, host=http-src-http-source-v1.dt-u2.cf.test.net, connection=close, id=7fac727a-de9a-aea95-92c574a246d1, cache-control=no-cache, sourceData=(Body:' {"hello": "world !!"} ... 15:38:39.818: [APP/PROC/WEB.0] org.springframework.data.mapping.MappingException: Couldn't find PersistentEntity for type class [B! 15:38:39.818: [APP/PROC/WEB.0] at org.springframework.data.mapping.context.MappingContext.getRequiredPersistentEntity(MappingContext.java:79) ~[spring-data-commons-2.3.7.RELEASE.jar:2.3.7.RELEASE] 15:38:39.818: [APP/PROC/WEB.0] at org.springframework.data.mongodb.core.EntityOperations$AdaptibleMappedEntity.of(EntityOperations.java:652) ~[spring-data-mongodb-3.0.7.RELEASE.jar:3.0.7.RELEASE] 15:38:39.818: [APP/PROC/WEB.0] at org.springframework.data.mongodb.core.EntityOperations$AdaptibleMappedEntity.access$100(EntityOperations.java:632) ~[spring-data-mongodb-3.0.7.RELEASE.jar:3.0.7.RELEASE] 15:38:39.818: [APP/PROC/WEB.0] at org.springframework.data.mongodb.core.EntityOperations.forEntity(EntityOperations.java:108) ~[spring-data-mongodb-3.0.7.RELEASE.jar:3.0.7.RELEASE]


Solution

  • This seems to be an issue with the app. We will look into it and address it soon. In the meantime, if you want to fix it locally, try the following steps.

    1. Clone the repo: https://github.com/spring-cloud/stream-applications
    2. Change this function definition to <functionDefinition>byteArrayTextToString|mongodbConsumer</functionDefinition>
    3. Add the following as a dependency here.
    <dependency>
        <groupId>org.springframework.cloud.fn</groupId>
        <artifactId>payload-converter-function</artifactId>                            
    </dependency>
    
    1. From the root of the repo: ./mvnw clean install -pl :mongodb-sink
    2. cd applications/sink/mongodb-sink/apps/mongodb-sink-rabbit
    3. ./mvnw clean package.
    4. Take the resultant jar and install it in SCDF.

    That should work with a pipeline like HTTP | MongoDB.