Search code examples
javaspringapache-kafkaspring-integrationspring-integration-dsl

Can't convert .csv file to Json object and send to kafka due to cast exception


I am trying to SFTP into a server grab all the .csv files, split them into single line objects and transform to json. Once transformed to json I would like to send them to a kafka topic. However I am seeing a casting exception:

Can't convert value of class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Full Stack Trace:

023-06-13T14:15:44.196-05:00 ERROR 71399 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'publishToKafkaFlow.kafka:outbound-channel-adapter#0' for component 'EngageKafkaProducer'; defined in: 'class path resource [com/thrivent/enterprisemarketingchannelactivation/engage/config/AcousticEngageDataSftpToKafkaIntegrationFlow.class]'; from source: 'bean method publishToKafkaFlow'], failedMessage=GenericMessage [payload=SftpInputStreamAsync[ClientSessionImpl[[email protected]@transfer-campaign-us-2.goacoustic.com/54.88.73.172:22]][/download/email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv], headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv","link":false,"modified":1686258599000,"permissions":"rw-rw-r--","remoteDirectory":"/download","size":7358023}, sequenceNumber=1, file_remoteDirectory=/download, sequenceSize=1, correlationId=79dc50e4-5bb3-8ff6-9edc-1c9ec8f9e88f, id=fa20238f-c641-a407-f8c9-8d318238b4c2, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@64b375da, file_remoteFile=email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv, timestamp=1686683743081}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1008)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:952)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1022)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783)
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:564)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:528)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    ... 20 more
Caused by: java.lang.ClassCastException: class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync cannot be cast to class java.lang.String (org.apache.sshd.sftp.client.impl.SftpInputStreamAsync is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005)
    ... 28 more

Here is my full Integration flow -

@Configuration
@RequiredArgsConstructor
@Slf4j
public class AcousticEngageDataSftpToKafkaIntegrationFlow {

    private static final String KAFKA_TOPIC = "my-topic";

    @Bean
    public QueueChannel inboundFilesMessageChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public QueueChannel kafkaPojoMessageChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public MessageChannel kafkaProducerErrorRecordChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> engageSftpSessionFactory,
                                                IntegrationFlowProperties engageProperties,
                                                MessageChannel inboundFilesMessageChannel) {

        return IntegrationFlow
                .from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(engageSftpSessionFactory))
                          .filter(new SftpRegexPatternFileListFilter(engageProperties.getRemoteFilePattern()))
                              .remoteDirectory(engageProperties.getRemoteDirectory()),
                e -> e.id("sftpInboundAdapter")
                        .autoStartup(true)
                        .poller(Pollers.fixedRate(5000)))
                .log(LoggingHandler.Level.DEBUG, "AcousticEngageDataSftpToKafkaIntegrationFlow",
                     "headers['file_remoteDirectory'] + + T(java.io.File).separator  + headers['file_remoteFile']")
                .channel(inboundFilesMessageChannel)
                              .get();
    }

    @Bean
    public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                           QueueChannel kafkaPojoMessageChannel) {

        return IntegrationFlow.from(inboundFilesMessageChannel)
                              .split(s -> s.delimiters("\n"))
                              .log(LoggingHandler.Level.DEBUG,
                                   "AcousticEngageDataSftpToKafkaIntegrationFlow",
                                   m -> "Payload: " + m.getPayload())
                              .channel(kafkaPojoMessageChannel)
                              .get();
    }

    @Bean
    public IntegrationFlow publishToKafkaFlow(KafkaTemplate<String, String> kafkaTemplate,
                                              MessageChannel kafkaProducerErrorRecordChannel,
                                              QueueChannel kafkaPojoMessageChannel) {

        return IntegrationFlow.from(kafkaPojoMessageChannel)
                .log(LoggingHandler.Level.DEBUG,
                     "AcousticEngageDataSftpToKafkaIntegrationFlow", e -> "Payload: " + e.getPayload())
                              .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                                           .topic(KAFKA_TOPIC),
                                      e -> e.id("EngageKafkaProducer"))
                              .routeByException(r -> r
                                      .channelMapping(KafkaProducerException.class, kafkaProducerErrorRecordChannel)
                                      .defaultOutputChannel("errorChannel"))
                              .get();
    }

    @Bean
    @Transformer(inputChannel="inboundFilesMessageChannel", outputChannel="kafkaPojoMessageChannel")
    ObjectToJsonTransformer objectToJsonTransformer() {
        return new ObjectToJsonTransformer();
    }

    @Bean
    public IntegrationFlow logErrorsInErrorQueue() {
        return IntegrationFlow.from("errorChannel")
                              .wireTap(f -> f.handle(m -> log.error("Error occurred in AcousticEngageDataSftpToKafkaIntegrationFlow: {}", m)))
                              .channel("kafkaProducerErrorRecordChannel")
                              .get();
    }

    @Bean
    public IntegrationFlow kafkaProducerErrorFlow(
            final MessageChannel kafkaProducerErrorRecordChannel) {

        return IntegrationFlow.from(kafkaProducerErrorRecordChannel)
                               .handle("KafkaExceptionHandler",
                                       "kafkaProducerErrorChannel")
                               .get();
    }
}

I have tried using different transformers and a custom transformer. As of right now my value and key serializers are set to Spring. I feel like the problem is in my transformer, but could also be in my serializing. The goal is to move it to a json schema, however as of right now I just want to see some data on the topic. Any help would be appreciated, thanks.


Solution

  • Your problem is here Sftp.inboundStreamingAdapter() and here .split(s -> s.delimiters("\n")). You are asking for an InputStream for a file, but then you try to split this InputStream by line delimiter. Since splitter cannot do find anything like that for us in the InputStream, it produces an input as is. And now you don't have any transformers and just dump that payload into Kafka producer.

    See a FileSplitter instead. It supports an InputStream as a payload and it really was designed to emit line by line from a file.

    Docs are here: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter