I am trying to SFTP into a server grab all the .csv files, split them into single line objects and transform to json. Once transformed to json I would like to send them to a kafka topic. However I am seeing a casting exception:
Can't convert value of class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Full Stack Trace:
023-06-13T14:15:44.196-05:00 ERROR 71399 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'publishToKafkaFlow.kafka:outbound-channel-adapter#0' for component 'EngageKafkaProducer'; defined in: 'class path resource [com/thrivent/enterprisemarketingchannelactivation/engage/config/AcousticEngageDataSftpToKafkaIntegrationFlow.class]'; from source: 'bean method publishToKafkaFlow'], failedMessage=GenericMessage [payload=SftpInputStreamAsync[ClientSessionImpl[cp.api@engage.thrivent@transfer-campaign-us-2.goacoustic.com/54.88.73.172:22]][/download/email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv], headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv","link":false,"modified":1686258599000,"permissions":"rw-rw-r--","remoteDirectory":"/download","size":7358023}, sequenceNumber=1, file_remoteDirectory=/download, sequenceSize=1, correlationId=79dc50e4-5bb3-8ff6-9edc-1c9ec8f9e88f, id=fa20238f-c641-a407-f8c9-8d318238b4c2, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@64b375da, file_remoteFile=email_metadata_UNICA_CUSTID_Recipient-Event-Bulk-Export_6748469_Email_Jun-08-2023-20-10-00_20-21.csv, timestamp=1686683743081}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:474)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:460)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1008)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:952)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1022)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:783)
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:754)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:564)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:528)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
... 20 more
Caused by: java.lang.ClassCastException: class org.apache.sshd.sftp.client.impl.SftpInputStreamAsync cannot be cast to class java.lang.String (org.apache.sshd.sftp.client.impl.SftpInputStreamAsync is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005)
... 28 more
Here is my full Integration flow -
@Configuration
@RequiredArgsConstructor
@Slf4j
public class AcousticEngageDataSftpToKafkaIntegrationFlow {
private static final String KAFKA_TOPIC = "my-topic";
@Bean
public QueueChannel inboundFilesMessageChannel() {
return MessageChannels.queue().get();
}
@Bean
public QueueChannel kafkaPojoMessageChannel() {
return MessageChannels.queue().get();
}
@Bean
public MessageChannel kafkaProducerErrorRecordChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> engageSftpSessionFactory,
IntegrationFlowProperties engageProperties,
MessageChannel inboundFilesMessageChannel) {
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(engageSftpSessionFactory))
.filter(new SftpRegexPatternFileListFilter(engageProperties.getRemoteFilePattern()))
.remoteDirectory(engageProperties.getRemoteDirectory()),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.log(LoggingHandler.Level.DEBUG, "AcousticEngageDataSftpToKafkaIntegrationFlow",
"headers['file_remoteDirectory'] + + T(java.io.File).separator + headers['file_remoteFile']")
.channel(inboundFilesMessageChannel)
.get();
}
@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(inboundFilesMessageChannel)
.split(s -> s.delimiters("\n"))
.log(LoggingHandler.Level.DEBUG,
"AcousticEngageDataSftpToKafkaIntegrationFlow",
m -> "Payload: " + m.getPayload())
.channel(kafkaPojoMessageChannel)
.get();
}
@Bean
public IntegrationFlow publishToKafkaFlow(KafkaTemplate<String, String> kafkaTemplate,
MessageChannel kafkaProducerErrorRecordChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(kafkaPojoMessageChannel)
.log(LoggingHandler.Level.DEBUG,
"AcousticEngageDataSftpToKafkaIntegrationFlow", e -> "Payload: " + e.getPayload())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(KAFKA_TOPIC),
e -> e.id("EngageKafkaProducer"))
.routeByException(r -> r
.channelMapping(KafkaProducerException.class, kafkaProducerErrorRecordChannel)
.defaultOutputChannel("errorChannel"))
.get();
}
@Bean
@Transformer(inputChannel="inboundFilesMessageChannel", outputChannel="kafkaPojoMessageChannel")
ObjectToJsonTransformer objectToJsonTransformer() {
return new ObjectToJsonTransformer();
}
@Bean
public IntegrationFlow logErrorsInErrorQueue() {
return IntegrationFlow.from("errorChannel")
.wireTap(f -> f.handle(m -> log.error("Error occurred in AcousticEngageDataSftpToKafkaIntegrationFlow: {}", m)))
.channel("kafkaProducerErrorRecordChannel")
.get();
}
@Bean
public IntegrationFlow kafkaProducerErrorFlow(
final MessageChannel kafkaProducerErrorRecordChannel) {
return IntegrationFlow.from(kafkaProducerErrorRecordChannel)
.handle("KafkaExceptionHandler",
"kafkaProducerErrorChannel")
.get();
}
}
I have tried using different transformers and a custom transformer. As of right now my value and key serializers are set to Spring. I feel like the problem is in my transformer, but could also be in my serializing. The goal is to move it to a json schema, however as of right now I just want to see some data on the topic. Any help would be appreciated, thanks.
Your problem is here Sftp.inboundStreamingAdapter()
and here .split(s -> s.delimiters("\n"))
. You are asking for an InputStream
for a file, but then you try to split this InputStream
by line delimiter. Since splitter cannot do find anything like that for us in the InputStream
, it produces an input as is. And now you don't have any transformers and just dump that payload into Kafka producer.
See a FileSplitter
instead. It supports an InputStream
as a payload and it really was designed to emit line by line from a file.
Docs are here: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter