Search code examples
javaspringspring-bootspring-integrationspring-java-config

Mapping spring configuration to integration flows


I have a Spring Boot / Integration app whose integration flows begin with RabbitMQ queues. The flows themselves and the app in general are working well, but there's a lot of redundant configuration for the initial inbound AMQP configuration.

At the moment I have ten DataTypes, each with slightly varying properties, some of which need to be defined at runtime. I initialize an inbound flow for each, set a couple of headers, and quickly dump them into a common channel for processing.

The Java configuration for just one of the NetworkDiagnostic DataType looks like:

@Bean
public IntegrationFlow inboundNetworkDiagnosticFlow(@Qualifier("connectionFactory") ConnectionFactory connectionFactory,
                                                    @Qualifier(BeanNames.INBOUND_EVENTS_CHANNEL) MessageChannel outbound,
                                                    @Qualifier(BeanNames.JSON_NODE_MESSAGE_CONVERTER) MessageConverter messageConverter,
                                                    @Qualifier("networkDiagnosticQueue") Queue queue,
                                                    @Value("${networkDiagnostic.numConsumers}") int numConsumers,
                                                    @Value("${networkDiagnostic.prefetchCount}") int prefetchCount) {
    return makeEventIntegrationFlow(connectionFactory, outbound, messageConverter, queue, numConsumers, prefetchCount,
            DataTypes.EVENT_NETWORK_DIAGNOSTIC);

}

@Bean
public Binding networkDiagnosticBinding(@Qualifier("networkDiagnosticQueue") Queue queue) {
    return makeFanoutBinding(queue, NETWORK_DIAGNOSTIC_EXCHANGE_NAME);
}


@Bean
public Queue networkDiagnosticQueue() {
    return makeQueue(NETWORK_DIAGNOSTIC_QUEUE_STRING);
}

@Bean
public FanoutExchange networkDiagnosticExchange() {
    return new FanoutExchange(NETWORK_DIAGNOSTIC_EXCHANGE_NAME);
}

There is parallel configuration for another nine. I would like to factor this out more, so that a) repetition is removed, and b) more inputs would be configurable simply from a configuration file on a server.

My general thought is that I would have a yaml configuration file:

data_types:
   - name: network-diagnostic
     schema: event
     window_type: hourly
     exchange_name: blahblahblah
     queue_name: blahblahblah
     ...
   - name: log-diagnostic
   ...

which, via @ConfigurationProperties I'd map to a class more or less like:

 /**
 * Organizes information and configuration for a DataType
 */
public class DataType {
    private String name;
    private Schema schema;
    private WindowType windowType;
    private long bucketLength;

    private String exchange;
    private String routingKey;
    ...

And that I'd need some method -- registerAllBeans -- expecting all of the DataTypes, which creates all of the necessary beans (and their interrelationships), and calls SingletonBeanRegistry::registerSingleton on each.

That said, I'm unsure when that method should run, and how to get it to run then. On the one hand, I need it to run once the beans created by configuration properties are accessible, but before life cycle management is started (so my integration flows will be managed), and preferably before RabbitAdmin::afterPropertiesSet so I can also get the implicit declaration of my RabbitMQ objects.

How can I achieve this?

UPDATE: I followed @ArtemBilan's advice from below and was able to code a mock example, which I'm including here.

A main class of:

@EnableAutoConfiguration
@Configuration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplicationBuilder parentBuilder = new SpringApplicationBuilder(DemoApplication.class);
        parentBuilder.child(ChildConfiguration.class).properties("name=bob", "message='hi how are you?'").run();
        parentBuilder.child(ChildConfiguration.class).properties("name=jane", "message='hi how are you?'").run();
    }

    @Bean
    public IntegrationFlow integrationFlow() {
        Object object = new Object();
        return IntegrationFlows.from("inputChannel")
                .handle(m -> System.out.println(object + " " + m.getPayload() + " " + System.currentTimeMillis()))
                .get();
    }
}

A child config of:

@EnableAutoConfiguration
@EnableConfigurationProperties(Sample.class)
@Configuration
public class ChildConfiguration {

    @Bean
    public IntegrationFlow anotherOutgoingFlow(Sample sample) {
        return IntegrationFlows
                .from(() -> new GenericMessage<>("hello " + sample.getName() + "; " + sample.getMessage()),
                        m -> m.poller(Pollers.fixedDelay(500)))
                .channel("inputChannel")
                .get();
    }
}

And a model class:

@ConfigurationProperties
public class Sample {
    private String name;
    private String message;

    public Sample() { }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

prints, for example:

2016-03-25 17:12:04.109  INFO 24637 --- [           main] com.example.DemoApplication              : Started DemoApplication in 0.169 seconds (JVM running for 3.878)
java.lang.Object@25c4da11 hello bob; 'hi how are you?' 1458940324438
java.lang.Object@25c4da11 hello jane; 'hi how are you?' 1458940324607
java.lang.Object@25c4da11 hello bob; 'hi how are you?' 1458940324938
java.lang.Object@25c4da11 hello jane; 'hi how are you?' 1458940325108
java.lang.Object@25c4da11 hello bob; 'hi how are you?' 1458940325439

Solution

  • Consider to use parent/child architecture for your application, when you will be able to reuse your template configuration based on the provided environment.

    See Spring Boot Reference Manual for more information.