Search code examples
rssspring-integrationdslatom-feedenterprise-integration

how to dynamically register Feed Inbound Adapter in Spring Integration?


I'm trying to implement an RSS/Atom feed aggregator in spring-integration and I am primarily using the Java DSL to write my IntegrationFlow. A requirement of this aggregator is that feeds can be added / removed during runtime. That is to say, the feeds are not known at design time.

I found it simple to use the basic Feed.inboundAdapter() with a test url and extract the links out of the feed with a transformer and then pass it on to an outbound-file-adapter to save the links to a file. However, I have gotten very stuck when trying to read the (thousands) of feed urls from an inbound-file-adapter run the file through a FileSplitter and then pass each resulting Message<String> containing the feed url to then register a new Feed.inboundAdapter(). Is this not possible with the Java DSL?

Ideally I would love it if I could do the following:

@Bean
public IntegrationFlow getFeedsFromFile() throws MalformedURLException {
    return IntegrationFlows.from(inboundFileChannel(), e -> e.poller(Pollers.fixedDelay(10000)))
            .handle(new FileSplitter())
            //register new Feed.inboundAdapter(payload.toString()) foreach Message<String> containing feed url coming from FileSplitter
            .transform(extractLinkFromFeedEntry())
            .handle(appendLinkToFile())
            .get();
} 

Though after reading through the spring integration java DSL code multiple times (and learning a tonne of stuff along the way) I just can't see that it's possible to do it this way. So... A) is it? B) should it be? C) Suggestions?

It almost feels like I should be able to take the output of .handle(new FileSplitter()) and pass that into .handleWithAdapter(Feed.inboundAdapter(/*stuff here*/)) but the DSL only references outbound-adapters there. Inbound adapters are really just a subclass of AbstractMessageSource and it seems the only place you can specify one of those is as an argument to the IntegrationFlows.from(/*stuff here*/) method.

I would have thought it would be possible to take the input from a file, split it line by line, use that output to register inbound feed adapters, poll those feeds, extract the new links from feeds as they appear and append them to a file. It appears as though it's not.

Is there some clever subclassing I can do to make this work??

Failing that... and I suspect this is going to be the answer, I found the spring integration Dynamic Ftp Channel Resolver Example and this answer on how to adapt it dynamically register stuff for the inbound case...

So is this the way to go? Any help/guidance appreciated. After pouring over the DSL code and reading documentation for days, I think I'll have a go at implementing the dynamic ftp example and adapting it to work with FeedEntryMessageSource... in which case my question is... that dynamic ftp example works with XML configuration, but is it possible to do it with either Java config or the Java DSL?

Update

I've implemented the solution as follows:

@SpringBootApplication 
class MonsterFeedApplication {

public static void main(String[] args) throws IOException {
    ConfigurableApplicationContext parent = SpringApplication.run(MonsterFeedApplication.class, args);

    parent.setId("parent");
    String[] feedUrls = {
            "https://1nichi.wordpress.com/feed/",
            "http://jcmuofficialblog.com/feed/"};

    List<ConfigurableApplicationContext> children = new ArrayList<>();
    int n = 0;
    for(String feedUrl : feedUrls) {
        AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
        child.setId("child" + ++n);
        children.add(child);
        child.setParent(parent);
        child.register(DynamicFeedAdapter.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("feed.url", feedUrl);
        PropertiesPropertySource pps = new PropertiesPropertySource("feed", props);
        env.getPropertySources().addLast(pps);
        child.setEnvironment(env);
        child.refresh();
    }

    System.out.println("Press any key to exit...");
    System.in.read();
    for (ConfigurableApplicationContext child : children) {
        child.close();
    }
    parent.close();
}

@Bean
public IntegrationFlow aggregateFeeds() {       
    return IntegrationFlows.from("feedChannel")
            .transform(extractLinkFromFeed())
            .handle(System.out::println)
            .get();
}

@Bean
public MessageChannel feedChannel() {
    return new DirectChannel();
}

@Bean
public AbstractPayloadTransformer<SyndEntry, String> extractLinkFromFeed() {
    return new AbstractPayloadTransformer<SyndEntry, String>() {
        @Override
        protected String transformPayload(SyndEntry payload) throws Exception {
            return payload.getLink();
        }
    };

}

}

DynamicFeedAdapter.java

@Configuration
@EnableIntegration
public class DynamicFeedAdapter {

    @Value("${feed.url}")
    public String feedUrl;

    @Bean
    public static PropertySourcesPlaceholderConfigurer pspc() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @Bean
    public IntegrationFlow feedAdapter() throws MalformedURLException {

        URL url = new URL(feedUrl);

        return IntegrationFlows
                .from(s -> s.feed(url, "feedTest"), 
                        e -> e.poller(p -> p.fixedDelay(10000)))
                .channel("feedChannel")
                .get();
    }

}

And this works IF and only IF I have one of the urls defined in application.properties as feed.url=[insert url here]. Otherwise it fails telling me 'unable to resolve property {feed.url}'. I suspect what is happening there is that the @Beans defined in DynamicFeedAdapter.java all get singletons eagerly initialized, so aside from the beans being manually created in our for loop in the main method (which work fine because they have feed.url property injected) we have a stray singleton that has been eagerly initialized and if there is no feed.url defined in application.properties then it can't resolve the property and everything goes bang. Now from what I know of Spring, I know it should be possible to @Lazy initialize the beans in DynamicFeedAdapter.java so we don't wind up with this one unwanted stray singleton problem-child. The problem is now...if I just mark the feedAdapter() @Lazy then the beans never get initialized. How do I initialize them myself?

Update - problem solved

Without having tested it, I think the problem is that boot is finding the DynamicFeedAdapter during its component scan. A simple solution is to move it to a sibling package. If MonsterFeedApplication is in com.acme.foo, then put the adapter config class in com.acme.bar. That way, boot won't consider it "part" of the application

This was indeed the problem. After implementing Gary's suggestion, everything works perfect.


Solution

  • See the answer to this question and its follow up for a similar question about inbound mail adapters.

    In essence, each feed adapter is created in a child context that is parameterized.

    In that case the child contexts are created in a main() method but there's no reason it couldn't be done in a service invoked by .handle().