Search code examples
spring-bootspring-integrationspring-integration-dsl

IntegrationFlow Exception Handling


I am trying to create a Flow for message as follow:

TCPinboundAdapter ----> Message Brocker(ActiveMQ)

Flow:

This flow is created in the following way

  1. The message is received via TCP connection to TCP Adapter which may be client or server.
  2. The message received to TCP adapter is send to JMS Adapter(ActiveMQ Broker).

The code is as follow:

@EventListener
public void handleTcpConnectionClientEvent(TcpConnectionFailedEvent event){

     TcpNioClientConnectionFactory tcp = (TcpNioClientConnectionFactory)event.getSource();
     System.out.println(tcp); 
     System.out.println("connection exception client :::"+event.getSource());

     this.status = event.toString();

 }
 @EventListener
 public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){
     System.out.println("connection exception server :::");

     this.status = event.toString();

 }

 // this method is invoked when the connection with the sever got disconnected
 @EventListener
 public void handleTcpConnectionServerEvent(TcpConnectionExceptionEvent event){
     System.out.println("connection exception serversssss :::"+event.getConnectionFactoryName());
     this.status = event.toString();

 }

 //when the connection got established (not for first time)
 @EventListener
 public void handleTcpConnectionCloseEvent(TcpConnectionOpenEvent event){
     System.out.println("connection opened :::"+event.getConnectionFactoryName());
    // status = event.toString();

 }

// create a server connection and flow to JMS  
private void createServerConnection(HostConnection hostConnection)  throws Throwable{
    this.status = "success";

    // IntegrationFlow flow;


IntegrationFlowRegistration theFlow;
     IntegrationFlow flow = 
IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
             .serializer(customSerializer)
             .deserializer(customSerializer)
             .id(hostConnection.getConnectionNumber()).soTimeout(10000)))
             .enrichHeaders(f->f.header("abc","abc")))
             .channel(directChannel())
             .handle(Jms.outboundAdapter(ConnectionFactory())
             .destination("jmsInbound"))
             .get();

           theFlow = this.flowContext.registration(flow).id("test.flow").register();


           if(this.status.equals("success"))
           createInboundFlow(hostConnection);

          // startConnection(hostConnection.getConnectionNumber());

}

Issue:

This flow is created successfully and get registered to Application Context when there is no Exception. But in case When there is an exception i.e (BindException)

  1. When creating server to a particular port and the Port is already used then it raise BindException then also the flow got registered So, we want that the flow should not be registered when there is exception in any of the flow component below.

    IntegrationFlowRegistration theFlow;
          IntegrationFlow flow = 
               IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
              .serializer(customSerializer)
              .deserializer(customSerializer)
              .id("server").soTimeout(10000)))
              .enrichHeaders(f->f.header("abc","abc")))
              .channel(directChannel())
              .handle(Jms.outboundAdapter(ConnectionFactory())
              .destination("jmsInbound"))
              .get();
    
          theFlow =this.flowContext.registration(flow).id("test.flow").register();
    

There are various Listener implemented to check exception in TCP connection try{}catch() block don't raise any exception.

Please provide a suitable approach to handle Exceptions for adapters currently I am using Listeners for various event to know there is something wrong with the tcp adapters.

After applying this approach provided by Mr. Artem Bilan

@EventListener public void handleTcpConnectionServerExceptionEvent(TcpConnectionServerExceptionEvent event){ System.out.println("connection exception server :::"+event); this.status = event.getCause().getMessage(); AbstractConnectionFactory server = (AbstractConnectionFactory)event.getSource(); System.out.println(server.getComponentName()); this.flowContext.remove(server.getComponentName()+"out.flow"); }

I am able to remove the flow using FlowId but I am not able to catch the Exception The Exception below is printing on the console and can't be handled Even I have changed method to

private void createServerConnection(HostConnection hostConnection) throws Throwable{}

and handled these Exception with try{}catch(Throwable t){} in calling function

Exception in thread "pool-4-thread-1" java.lang.NullPointerException

Exception is described in more elaborated form in the logs provided below:

    2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] 
    .s.i.i.t.c.TcpNetServerConnectionFactory : started Co123, port=1234
2018-05-17 21:01:40.850  INFO 18332 --- [nio-8080-exec-4] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : started org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851 ERROR 18332 --- [pool-5-thread-1] .s.i.i.t.c.TcpNetServerConnectionFactory : Error on ServerSocket; port = 1234

java.net.BindException: Address already in use: JVM_Bind
    at java.net.DualStackPlainSocketImpl.bind0(Native Method) ~[na:1.8.0_111]
    at java.net.DualStackPlainSocketImpl.socketBind(Unknown Source) ~[na:1.8.0_111]
    at java.net.AbstractPlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.PlainSocketImpl.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.bind(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at java.net.ServerSocket.<init>(Unknown Source) ~[na:1.8.0_111]
    at javax.net.DefaultServerSocketFactory.createServerSocket(Unknown Source) ~[na:1.8.0_111]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.createServerSocket(TcpNetServerConnectionFactory.java:211) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:106) ~[spring-integration-ip-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [na:1.8.0_111]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_111]

connection exception server :::TcpConnectionServerExceptionEvent [source=Co123, port=1234, cause=java.net.BindException: Address already in use: JVM_Bind]
Co123
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.ip.tcp.TcpReceivingChannelAdapter  : stopped org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter#3
2018-05-17 21:01:40.851  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {transformer} as a subscriber to the 'Co123out.flow.channel#0' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#0' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#11
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {jms:outbound-channel-adapter} as a subscriber to the 'Co123out.flow.channel#1' channel
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.Co123out.flow.channel#1' has 0 subscriber(s).
2018-05-17 21:01:40.852  INFO 18332 --- [pool-5-thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#12
Exception in thread "pool-4-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "pool-5-thread-1" java.lang.NullPointerException
    at org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory.run(TcpNetServerConnectionFactory.java:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)`

Solution

  • You register an IntegrationFlow via:

    this.flowContext.registration(flow).id("test.flow").register();
    

    The same his.flowContext bean and that id for the flow can be used to destroy the flow from any other place, e.g. an event listener, when you catch the mentioned BindException:

        /**
     * Destroy an {@link IntegrationFlow} bean (as well as all its dependant beans)
     * for provided {@code flowId} and clean up all the local cache for it.
     * @param flowId the bean name to destroy from
     */
    void remove(String flowId);