Search code examples
spring-cloud-streamspring-rabbitspring-cloud-function

Is there any way to build an sftpSupplier splitter (batch producer) with Spring Cloud Stream and RabbitMQ binder?


I'm trying to implement a Spring Cloud Stream application to:

  • read a gzipped file from an SFTP server
  • unzip it and get two fixed-length text files (a "layout" file and a "data" file with multiple records)
  • interpret the data file using the layout file and transform all that into JSON documents
  • send each document to RabbitMQ as a separate message

I've tried some different approaches, but I must be misunderstanding some things.

I've tried to model this case with a simple application, which works perfectly (generated with Spring Initializr and those relevant changes):

pom.xml

  <properties>
    <java.version>21</java.version>
    <spring-cloud.version>2023.0.4</spring-cloud.version>
    <spring-function.version>5.0.1</spring-function.version>
  </properties>
  <!-- ... -->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

Java code

@Configuration
public class CloudFunctionConfiguration {

  private final StreamBridge streamBridge;

  public CloudFunctionConfiguration(final StreamBridge streamBridge) {
    this.streamBridge = streamBridge;
  }

  @Bean
  Supplier<String> source() {
    return () -> "a,1,b,2,3,c";
  }

  @Bean
  Consumer<String> splitter() {
    return string -> {
      Arrays.asList(string.split(",")).forEach(s -> {
        final var message = MessageBuilder
            .withPayload("{\"attribute\": \"".concat(s).concat("\"}"))
            .setHeader("some-header", "some-content")
            .build();
        streamBridge.send("output", message);
      });
    };
  }
}

application.properties

spring.application.name=sftp-demo
logging.level.root=DEBUG
logging.file.name=app.log
spring.cloud.function.definition=source|splitter
spring.cloud.stream.output-bindings=output
spring.cloud.stream.bindings.output.producer.required-groups=app
spring.cloud.stream.bindings.output.destination=ex.docs
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.source|splitter-out-0.consumer.declareExchange=false
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

This works as expected; all messages are published each and every time the polled supplier is invoked.

But things stop working when I change that simple String supplier for the SFTP supplier function.

I've generated a sample GZipped file with three records, and then I made these changes:

pom.xml (added)

    <dependency>
      <groupId>org.springframework.cloud.fn</groupId>
      <artifactId>spring-sftp-supplier</artifactId>
      <version>${spring-function.version}</version><!-- 5.0.1 -->
    </dependency>

Java code (new version)

@Configuration
public class CloudFunctionConfiguration {

  protected static final Logger LOGGER = LoggerFactory.getLogger(CloudFunctionConfiguration.class);

  private final GzipMessageParser parser;
  private final StreamBridge streamBridge;

  public CloudFunctionConfiguration(
      final GzipMessageParser parser,
      final StreamBridge streamBridge) {
    this.parser = parser;
    this.streamBridge = streamBridge;
  }

  @Bean
  Consumer<Message<byte[]>> messageParser() {
    return gzipFile -> {
      parser.parse(gzipFile).forEach(message -> {
        LOGGER.info("SENDING MESSAGE {}", message);
        streamBridge.send("output", message);
      });
    };
  }

}

application.properties (changes) [edit: fixed a copy/paste mistake]

# changed
spring.cloud.function.definition=sftpSupplier|messageParser
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.producer.declareExchange=false
spring.cloud.stream.rabbit.bindings.sftpSupplier|messageParser-out-0.consumer.declareExchange=false
# added
spring.cloud.config.enabled=false
file.consumer.mode=contents
sftp.supplier.delay-when-empty=10s
sftp.supplier.filename-regex=^some-prefix.*?GZ$
sftp.supplier.remote-dir=upload
sftp.supplier.stream=true
sftp.supplier.factory.host=localhost
sftp.supplier.factory.port=2222
sftp.supplier.factory.username=sftpuser
sftp.supplier.factory.password=sftppwd
sftp.supplier.factory.private-key=file:///opt/keys/ssh_host_rsa_key
sftp.supplier.factory.allow-unknown-keys=true

It's actually a little more complicated than that (I've configured a database and a metadata store), but I'll omit those extra configurations (for they seem to work fine).

When I run this app, all messages are sent to RabbitMQ, but I get some errors. This is what's in the log.

  1. There is a message conversion (the whole compressed file gets converted pretty much to itself; I guess Spring Cloud Stream does this when the internal funcion pipeline passes the file from the sftpSupplier to the messageParser function)
(...) DEBUG 781757 --- [sftp-demo] [boundedElastic-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Converted Message: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}] to: GenericMessage [payload=byte[528], headers={file_remoteHostPort=localhost:2222, file_remoteFileInfo={"directory":false,"filename":"some-prefix_doc-files.GZ","link":false,"modified":1733178662000,"permissions":"rw-r--r--","remoteDirectory":"upload","size":528}, file_remoteDirectory=upload, id=8b2b9553-cb47-fdd3-111f-6c4d39af4593, contentType=application/octet-stream, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@6b5b5c4b, file_remoteFile=some-prefix_doc-files.GZ, timestamp=1733261637557}]
  1. I see that LOGGER.info("SENDING MESSAGE {}", message); entry, then a preSend hook log, a Publishing message (...) log and a postSend hook log.

  2. All looks nice, but then I see this:

(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Invoking function sftpSupplier|messageParser
(...) DEBUG 781757 --- [sftp-demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Poll resulted in Message: GenericMessage [payload=MonoMap, headers={id=e531d10a-8edc-88fd-fe07-3bba0b950a48, timestamp=1733261637927}]
(...)
(...) DEBUG 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler   : bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}], headers={id=cb9ce0d7-bac6-0ba2-6e0e-7ac40ad989a1, timestamp=1733263993106}] for original GenericMessage [payload=MonoMap, headers={id=364cff1e-6120-f005-ee63-a34865009323, timestamp=1733263993106}]
(...) ERROR 823084 --- [sftp-demo] [scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@4841e6], failedMessage=GenericMessage [payload=MonoMap, headers={id=c5903f2f-f4a7-1ca5-18e0-b76cb2f98fa6, timestamp=1733263993106}]
(...)
Caused by: java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: reactor.core.publisher.MonoMap

That is, Spring Cloud Stream is polling the function definition and automatically binding it to the RabbitMQ Binder. Since the poll results in a message with a Mono payload (I don't know why), the binder tries to convert the message and send it - and then it fails.

Curiously, my first working example does this polling as well, but this is the result:

DEBUG 759414 --- [demo] [scheduling-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'

And that's why it works, I think.

Then I've also tried the functional batch producer approach (by creating a Function<Message<String>, List<Message<String>>>), but I couldn't get the application to send the messages individually; RabbitMQ just gets one message with the serialized List.

It looks like I shouldn't be doing this thing this way, but by reading the docs I got the impression I could (and should?) compose functions (Spring Cloud Functions with my own) to easily build integration applications.

So how could I build such an application? Should I just do it with Spring Integration, or is there something I can do in code or configuration?


Solution

  • So, here is some kind of answer to your straggle: https://github.com/spring-cloud/spring-functions-catalog/pull/108.

    Honestly, even I had some obstacles to figure out how to make everything working together.

    With that sample, you can replace simple fileSupplier to your sftpSupplier requirements and so on.

    I didn't try with StreamBridge, but that is possible anyway, although it would disconnect the purpose of functions composition a bit.