Search code examples
springspring-integrationspring-java-config

Spring-Integration in Java Config


I have to convert a project using Spring integration in xmls to Java config, and tried below, but I am getting exceptions.

What I am trying to achieve is, create a message listener which listens messages on a queue destination, puts it on to a channel, which is read and processed by a xsl transformer that puts in the next channel, which is again read and processed by JaxbUnmarshaller to create to Java object and put it the third channel, which will be consumed by a Java Consumer. In case of error, the message is put on a error channel which is again the same Java Consumer that handles it.

Below is the Spring config

  <int:channel id="channel_1"/>
  <int:channel id="channel_2"/>
  <int:channel id="channel_3"/>
  <int:channel id="channel_error"/>

  <int-jms:message-driven-channel-adapter connection-factory="factory" destination-name="destName" 
       pub-sub-domain="false" channel="channel_1" error-channel="channel_error"/>

  <int-xml:xslt-transformer input-channel="channel_1" output-channel="channel_2" xsl-resource="classpath:sample.xsl"/>

  <int-xml:unmarshalling-transformer input-channel="channel_2" output-channel="channel_3" unmarshaller="unmarshaller"/>

  <oxm:jaxb2-marshaller id="unmarshaller">
    <oxm:class-to-be-bound name="sample.example.MyData"/>
  </oxm:jaxb2-marshaller>

  <int:outbound-channel-adapter channel="channel_3" ref="consumer" method="handleMessage"/>

  <bean id="consumer" class="sample.example.MyConsumer"></bean>

  <int:outbound-channel-adapter channel="channel_error" ref="consumer" method="handleError"/>

Below is the Java Config I have for this

  @Bean
  public MessageChannel channel_1() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_2() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_3() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_error() {
    return new DirectChannel();
  }
  @Bean
  @Autowired
  public DefaultMessageListenerContainer container(String destinationQueueName) {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(factory());
    container.setDestinationName(destinationQueueName);
    container.setPubSubDomain(false);
    container.setMessageListener(messageListener());
    return container;
  }

  @Bean
  public ChannelPublishingJmsMessageListener messageListener(){
    ChannelPublishingJmsMessageListener adapter = new ChannelPublishingJmsMessageListener();
    adapter.setRequestChannel(channel_1());
    adapter.setErrorChannel(channel_error());
    return adapter;
  }

  @Bean
  @Transformer(inputChannel = "channel_1", outputChannel = "channel_2")
  public XsltPayloadTransformer transformer() {
    return new XsltPayloadTransformer(new ClassPathResource("sample.xsl"));
  }

  @Bean
  @Transformer(inputChannel = "channel_2", outputChannel = "channel_3")
  public UnmarshallingTransformer unmarshallingTransformer() {
    return new UnmarshallingTransformer(unmarshaller());
  }

  @Bean
  public Jaxb2Marshaller unmarshaller() {
    Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
    unmarshaller.setClassesToBeBound(MyData.class);
    return unmarshaller;
  }

  @Bean
  @ServiceActivator(inputChannel = "channel_3")
  public MessageHandler messageHandler() {
    return new MethodInvokingMessageHandler(consumer(), "handleMessage");
  }

  @Bean
  @ServiceActivator(inputChannel = "channel_error")
  public MessageHandler errorHandler() {
    return new MethodInvokingMessageHandler(consumer(), "handleError");
  }

I get the below exception, when I refer for this exception, it says that service activator is not configured properly. I am not sure what I am missing, can someone please help.

  org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.web.context.WebApplicationContext:/v1.channel_error'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:281)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:479)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:322)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:107)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 16 more

Logs on start up :

2016-03-30 13:06:46,423 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletConfigInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,403 DEBUG com.amazonaws.jmx.MBeans.registerMBean[58] - Failed to register mbean com.amazonaws.management:type=AwsSdkMetrics
javax.management.InstanceAlreadyExistsException: com.amazonaws.management:type=AwsSdkMetrics
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at com.amazonaws.jmx.MBeans.registerMBean(MBeans.java:52)
    at com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
    at com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:355)
    at com.amazonaws.metrics.AwsSdkMetrics.<clinit>(AwsSdkMetrics.java:316)
    at com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:629)
    at com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:570)
    at com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:562)
    at com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:523)
    at com.amazonaws.services.sqs.AmazonSQSClient.getQueueUrl(AmazonSQSClient.java:525)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:260)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:230)
    at com.amazon.sqs.javamessaging.SQSSession.createQueue(SQSSession.java:576)
    at org.springframework.jms.support.destination.DynamicDestinationResolver.resolveQueue(DynamicDestinationResolver.java:84)
    at org.springframework.jms.support.destination.DynamicDestinationResolver.resolveDestinationName(DynamicDestinationResolver.java:58)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:98)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:204)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1167)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1143)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
    at java.lang.Thread.run(Thread.java:745)
2016-03-30 13:06:46,423 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletConfigInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,427 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletContextInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,427 DEBUG com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean[369] - Admin mbean registered under com.amazonaws.management:type=AwsSdkMetrics/4
2016-03-30 13:06:46,427 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletContextInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [jndiProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [jndiProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemEnvironment] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemEnvironment] PropertySource with lowest search precedence
2016-03-30 13:06:46,429 DEBUG org.springframework.web.context.support.StandardServletEnvironment.<init>[126] - Initialized StandardServletEnvironment with PropertySources [servletConfigInitParams,servletContextInitParams,jndiProperties,systemProperties,systemEnvironment]
2016-03-30 13:06:46,429 DEBUG org.springframework.web.context.support.StandardServletEnvironment.<init>[126] - Initialized StandardServletEnvironment with PropertySources [servletConfigInitParams,servletContextInitParams,jndiProperties,systemProperties,systemEnvironment]
2016-03-30 13:06:46,432 DEBUG com.choiceedge.library.common.service.v1.endpoint.AbstractEndpoint$RequestContextAwareFilter.init[177] - Initializing filter 'RequestContextAwareFilter'
2016-03-30 13:06:46,436 DEBUG com.choiceedge.library.common.service.v1.endpoint.AbstractEndpoint$RequestContextAwareFilter.init[202] - Filter 'RequestContextAwareFilter' configured successfully
2016-03-30 13:06:46,437 DEBUG com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials[105] - Loading credentials from EnvironmentVariableCredentialsProvider
2016-03-30 13:06:46,438 DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBean[248] - Returning cached instance of singleton bean 'cxf'
2016-03-30 13:06:46,438 DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBean[248] - Returning cached instance of singleton bean 'cxf'

This is how I register my AppConfig in my WebApplicationInitializer

  @Override
  public void onStartup(final ServletContext servletContext) throws ServletException {

    rootContext = new AnnotationConfigWebApplicationContext();
    rootContext.getEnvironment().setActiveProfiles(System.getenv(ENVIRONMENT));
    LOGGER.info(System.getenv(ENVIRONMENT));
    System.out.println("Starting application in " + System.getenv(ENVIRONMENT));
    rootContext.register(AppConfig.class);
    rootContext.setDisplayName(DISPLAY_NAME);

    // Manage the lifecycle of the root application context
    servletContext.addListener(new ContextLoaderListener(rootContext));
.....

And in my AppConfig I have

@Configuration
@ImportResource("...") - this is for another custom library
@ComponentScan(...)
@EnableIntegration
public class AppConfig {

Solution

  • SOLVED : The changes that resolved the issue, was to autowire the channels to the listener. And Make the PropertyPlaceHolderConfigurer bean as static

    @Bean
    @Autowired
    public ChannelPublishingJmsMessageListener messageListener(MessageChannel channel_1, MessageChannel channel_error){
        ChannelPublishingJmsMessageListener adapter = new ChannelPublishingJmsMessageListener();
         adapter.setRequestChannel(channel_1);
         adapter.setErrorChannel(channel_error);
         return adapter;
    }