Search code examples
javaspringspring-integration

Transform InputStream into Object, then Object into Json


I am reading in an InputStream from an Sftp.inboundStreamingAdapter that stream is passed to a message channel that this flow reads from and tries to convert the stream (which is a .csv) into an object, and then for the secondary transformation it should convert that object into json. The problem is the header file (I think). The csv looks like this the screenshot below. My object uses the headers to map to my object, but since I am splitting the file up by each line in the file I believe that causes the error. I'm assuming this isn't the correct approach to this problem, any help would be much appreciated.

Sample Csv

@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                       QueueChannel kafkaPojoMessageChannel) {

    return IntegrationFlow.from(inboundFilesMessageChannel)
                          .split(Files.splitter())
                          .transform(new StreamToMyObject) // TODO: Turn InputStream to MyObject Object
                          .transform(new ObjectToJsonTransformer())
                          .log(LoggingHandler.Level.DEBUG,
                               "AcousticEngageDataSftpToKafkaIntegrationFlow",
                               m -> "Payload: " + m.getPayload())
                          .channel(kafkaPojoMessageChannel)
                          .get();
}

I have tried creating my own custom InputStreamToObject Transformers, but I have came up short, my latest attempt I used an ObjectInputStream and have tried to use the ois.readFrom(inputStream) method to convert it into my ObjectClass using something like this, and adding the StreamToObject class in the first transformer.

public class StreamToMyObjectConverter implements GenericTransformer<InputStream, MyObject> {
    @Override
    public MyObject transform(InputStream inputStream) {
        try (ObjectInputStream ois = new ObjectInputStream(inputStream)){
            return (MyObject) ois.readObject();
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

    }

Object: Which is generated by jsonschema2pojo.

I think the issue is I can't map to the object from the stream because of the headers. I'm not sure if this is the best approach. Any suggestions would be much appreciated.

@JsonInclude(Include.NON_NULL)
@JsonPropertyOrder({"email", "RECIPIENT_ID", "ENCODED_RECIPIENT_ID", "contactId", "code", "messageId", "userAgent", "messageName", "mailingTemplateId", "subjectLine", "docType", "reportId", "sendType", "bounceType", "urlDescription", "clickUrl", "optOutDetails", "messageGroupId", "programId", "timestamp", "originatedFrom", "eventId", "externalSystemName", "externalSystemReferenceId", "trackingCode"})
public class MyObject {
    @JsonProperty("email")
    private String email;
    @JsonProperty("RECIPIENT_ID")
    private String recipientId;
    @JsonProperty("ENCODED_RECIPIENT_ID")
    private String encodedRecipientId;
    @JsonProperty("contactId")
    private String contactId;
    @JsonProperty("code")
    private String code;
    @JsonProperty("messageId")
    private String messageId;
    @JsonProperty("userAgent")
    private String userAgent;
    @JsonProperty("messageName")
    private String messageName;
    @JsonProperty("mailingTemplateId")
    private String mailingTemplateId;
    @JsonProperty("subjectLine")
    private String subjectLine;
    @JsonProperty("docType")
    private String docType;
    @JsonProperty("reportId")
    private String reportId;
    @JsonProperty("sendType")
    private String sendType;
    @JsonProperty("bounceType")
    private String bounceType;
    @JsonProperty("urlDescription")
    private String urlDescription;
    @JsonProperty("clickUrl")
    private String clickUrl;
    @JsonProperty("optOutDetails")
    private String optOutDetails;
    @JsonProperty("messageGroupId")
    private String messageGroupId;
    @JsonProperty("programId")
    private String programId;
    @JsonProperty("timestamp")
    private String timestamp;
    @JsonProperty("originatedFrom")
    private String originatedFrom;
    @JsonProperty("eventId")
    private String eventId;
    @JsonProperty("externalSystemName")
    private String externalSystemName;
    @JsonProperty("externalSystemReferenceId")
    private String externalSystemReferenceId;
    @JsonProperty("trackingCode")
    private String trackingCode;
}

StackTrace:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@5f2f9a4d], failedMessage=GenericMessage [payload=FileMarker [filePath=/downloaddummy_acoustic.csv, mark=START], headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"dummy_acoustic.csv","link":false,"modified":1686691008000,"permissions":"rw-r-----","remoteDirectory":"/download","size":1250}, file_remoteDirectory=/download, id=ca7bd14f-6967-0040-4f56-e63c715ae1c5, file_marker=START, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@4e9ca373, file_remoteFile=dummy_acoustic.csv, timestamp=1686758001364}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:117)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:115)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
    ... 42 more
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method transform(org.springframework.integration.file.splitter.FileSplitter$FileMarker) cannot be found on type com.thrivent.enterprisemarketingchannelactivation.engage.converter.StreamToEmailInteractionConverter
    at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:225)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:135)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:380)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:93)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:119)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:611)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:604)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:590)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:561)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
    ... 44 more


Solution

  • It looks like you are missing this option on that Files.splitter():

    /**
     * Specify the header name for the first line to be carried as a header in the
     * messages emitted for the remaining lines.
     * @param firstLineHeaderName the header name to carry first line.
     * @return the FileSplitterSpec
     */
    public FileSplitterSpec firstLineAsHeader(String firstLineHeaderName) {
    

    The org.springframework.integration.file.splitter.FileSplitter$FileMarker error is not possible if you don't use markers = true for that FileSplitter.

    See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter