Search code examples
javaapache-kafkaspring-cloud-streamspring-cloud-dataflow

A processor receive multiple message with same payload multiple time


I'm starting a new project with 'spring-cloud-dataflow', developing a bunch of jar to fit my need.

One of this is a processor to untar files coming from a file source, this application use a customized version of integration-zip with features to handle tar and gunzip file compression.

So my problem is the following one: while my source send a single message with the file reference the processor receive those message multiple time, same payload but with different id.

Here the log file of both component

As you can see file produce only on message:

2017-10-02 12:38:28.013  INFO 17615 --- [ask-scheduler-3] o.s.i.file.FileReadingMessageSource      : Created message: [GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={id=0b99b840-e3b3-f742-44ec-707aeea638c8, timestamp=1506940708013}]]

while producer has 3 message incoming:

2017-10-02 12:38:28.077  INFO 17591 --- [           -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar  : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2017-10-02 12:38:28.080  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=1
a4d4b9c-86fe-d3a8-d800-8013e8ae7027, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940708079}]' unpacking started...
2017-10-02 12:38:28.080  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:29.106  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=c
d611ca4-4cd9-0624-0871-dcf93a9a0051, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940709106}]' unpacking started...
2017-10-02 12:38:29.107  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.108  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]' unpacking started...
2017-10-02 12:38:31.108  INFO 17591 --- [           -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress
2017-10-02 12:38:31.116 ERROR 17591 --- [           -L-1] o.s.integration.handler.LoggingHandler   : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.io.FileNotFoundException: /tmp/patent/CNINO_im_201733_batch108.tgz (File o directory non esistente), failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}], failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]
        at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)

I can't find any solution to this problem, does anybody has the same problem and found a way to fix it? Or there's any configuration I miss?

EDIT:

I'm using the local version of SDFS version 1.2.2.RELEASE, so IO file operation work on the same filesystem, and I use version Ditmars.BUILD-SNAPSHOT for SCS.

Unfortunatly if I disable the file delete operation application, this app still process message multiple time. Here some code snippet, and i you like this is my project repo:

This is my processor class:

@EnableBinding(Processor.class)
@EnableConfigurationProperties(UnTarProperties.class)
public class UnTarProcessor {

  @Autowired
  private UnTarProperties properties;

  @Autowired
  private Processor processor;

  @Bean 
  public UncompressedResultSplitter splitter() {
    return new UncompressedResultSplitter();
  }

  @Bean 
  public UnTarGzTransformer transformer() {
    UnTarGzTransformer unTarGzTransformer = new UnTarGzTransformer(properties.isUseGzCompression());
    unTarGzTransformer.setExpectSingleResult(properties.isSingleResult());
    unTarGzTransformer.setWorkDirectory(new File(properties.getWorkDirectory()));
    unTarGzTransformer.setDeleteFiles(properties.isDeleteFile());

    return unTarGzTransformer;
  }

  @Bean  
  public IntegrationFlow process() {

    return IntegrationFlows.from(processor.input())
        .transform(transformer())
        .split(splitter())
        .channel(processor.output())
        .get();
  }
}

This is the core method used to decompress file:

  @Override
  protected Object doCompressTransform(final Message<?> message) throws Exception {
    logger.info(String.format("Message '%s' unpacking started...", message));

    try (InputStream checkMessage = checkMessage(message);
         InputStream inputStream = (gzCompression ? new BufferedInputStream(new GZIPInputStream(checkMessage)) : new BufferedInputStream(checkMessage))) {

      final Object payload = message.getPayload();
      final Object unzippedData;

      try (TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream)){        
        TarArchiveEntry entry = null;

        final SortedMap<String, Object> uncompressedData = new TreeMap<String, Object>();

        while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) {

          final String zipEntryName = entry.getName();
          final Date zipEntryTime = entry.getLastModifiedDate();
          final long zipEntryCompressedSize = entry.getSize();

          final String type = entry.isDirectory() ? "directory" : "file";

          final File tempDir = new File(workDirectory, message.getHeaders().getId().toString());
          tempDir.mkdirs(); // NOSONAR false positive

          final File destinationFile = new File(tempDir, zipEntryName);

          if (entry.isDirectory()) {
            destinationFile.mkdirs(); // NOSONAR false positive
          }
          else {
            unpackEntries(tarIn, entry, tempDir);
            uncompressedData.put(zipEntryName, destinationFile);
          }
        }

        if (uncompressedData.isEmpty()) {
          unzippedData = null;
        }
        else {
          if (this.expectSingleResult) {
            if (uncompressedData.size() == 1) {
              unzippedData = uncompressedData.values().iterator().next();
            }
            else {
              throw new MessagingException(message, String.format("The UnZip operation extracted %s " 
                        + "result objects but expectSingleResult was 'true'.", uncompressedData.size()));
            }
          }
          else {
            unzippedData = uncompressedData;
          }

        }

        logger.info("Payload unpacking completed...");
      }
      finally {
        if (payload instanceof File && this.deleteFiles) {
          final File filePayload = (File) payload;
          if (!filePayload.delete() && logger.isWarnEnabled()) {
            if (logger.isWarnEnabled()) {
              logger.warn("failed to delete File '" + filePayload + "'");
            }
          }
        }
      }
      return unzippedData;
    }
    catch (Exception e) {
      throw new MessageHandlingException(message, "Failed to apply Zip transformation.", e);
    }
}

Exception is thrown by method checkmessage()

    protected InputStream checkMessage(Message<?> message) throws FileNotFoundException {
      logger.info("Check message's payload type to decompress"); 

      InputStream inputStream;
      Object payload = message.getPayload();

      if (payload instanceof File) {
        final File filePayload = (File) payload;

          if (filePayload.isDirectory()) {
              throw new UnsupportedOperationException(String.format("Cannot unzip a directory: '%s'",
                      filePayload.getAbsolutePath()));
          }

          inputStream = new FileInputStream(filePayload);
        }
        else if (payload instanceof InputStream) {
            inputStream = (InputStream) payload;
        }
        else if (payload instanceof byte[]) {
            inputStream = new ByteArrayInputStream((byte[]) payload);
        }
        else {
            throw new IllegalArgumentException(String.format("Unsupported payload type '%s'. " +
                    "The only supported payload types are java.io.File, byte[] and java.io.InputStream",
                    payload.getClass().getSimpleName()));
        }

      return inputStream;
} 

I really appreciate any help. Thanks a lot


Solution

  • We would need more information. Version of SCDF and SCS apps. Your DSL on how you deployed your apps at very least.

    Just checked your logs, did you realize your consumer is failing to consume the message due a FileNotFoundException? You are not receiving the same message multiple times, SCS is just trying to redeliver it before failing. Check your full logs and how you are failing to open the file in the specified location