Search code examples
transactionsspring-integration

Multiple transaction synchronization factories in the same flow


I have the following flow: a SFTP inbound streaming adapter reads a file, which is a JSON. Then:

  • a transaction synchronization is registered directly on that adapter to rename the input file on transaction commit (because I've done with it)
  • the file that was read is converted into a DTO with the json-to-object and thrown to a transformer which transforms it to something else
  • this "something else" should be put on another channel, but only after transaction commit

This is the essence of the flow:

<int-sftp:inbound-streaming-channel-adapter
  session-factory="mySftpSessionFactory"
  channel="channel1"
  filename-pattern="*.JSON"
  remote-directory-expression="'/mypath'"
  max-fetch-size="10">
  <int:poller max-messages-per-poll="1" fixed-delay="1"
    time-unit="MINUTES" error-channel="unexpectedErrorChannel">
    <int:transactional
      synchronization-factory="synchFactory1"
      transaction-manager="transactionManager" />
    </int:poller>
</int-sftp:inbound-streaming-channel-adapter>

<int:transaction-synchronization-factory id="synchFactory1">
  <int:after-commit channel="onCommitRemoteFileRenameChannel" />
</int:transaction-synchronization-factory>

<int:channel id="channel1" />

<int:chain input-channel="channel1" output-channel="nullChannel">
  <int:stream-transformer charset="UTF-8" />
  <int:json-to-object-transformer type="com.example.MyDto" />
  <int:transformer ref="myTransformer">
    <int:transactional synchronization-factory="synchFactory2"
      transaction-manager="transactionManager" />
  </int:transformer>
</int:chain>

<int:transaction-synchronization-factory id="synchFactory2">
  <int:after-commit channel="onCommitSecondFlowChannel" />
</int:transaction-synchronization-factory>

<int:channel id="onCommitRemoteFileRenameChannel" />
<int-sftp:outbound-gateway
  session-factory="mySftpSessionFactory"
  request-channel="onCommitRemoteFileRenameChannel" 
  command="mv"
  expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE]))"
  rename-expression="headers[T(org.springframework.integration.file.FileHeaders).REMOTE_DIRECTORY].concat('/'.concat(headers[T(org.springframework.integration.file.FileHeaders).REMOTE_FILE].concat('.done')))"
  requires-reply="false"
  reply-channel="nullChannel" />

The second <int:transactional> declared inside the transformer is not meant to actually open a new transaction (the default propagation should be REQUIRED, so it should join the main transaction opened by the poller), but rather to just add a second synchronization that fires the message returned by the transformer to the channel named onCommitSecondFlowChannel for further processing.

However, this seems not to work properly: while the remote file renaming is performed correctly, the second synchronization is not applied, since I do not get any message obtained from the trasnformer into the onCommitSecondFlowChannel. I tried to debug and I see that both <int:transacationl-synchronization-factory>s are parsed, but the second one's method ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(IntegrationResourceHolder) is never executed.

Is there a way I can obtain this result without declaring a service activator and a gateway after the transformer to place the message on that second flow channel?


Solution

  • I think you are overcomplicating your flow with that extra tx-sync. It is realy was not designed to do something in the middle of the flow. It is rather some global hook from the point where we start transaction. Since you don't in that transformer, therefore it cannot be applied.

    We may revise it in the future since I see your feeling and it is really logical that such a tx-sync should join existing TX if cannot start a new one at this point.

    Anyway I wouldn't do that since it makes the flow not intuitive.

    I see you have this <int:chain input-channel="channel1" output-channel="nullChannel"> - nullChannel as an output after that <transformer>.

    So, how about to remove that <transactional> altogether and place onCommitSecondFlowChannel as an output for this <chain>? Anyway it looks like this is the end of your flow therefore no need to defer something to the end of transaction.