Search code examples
javaflume-ng

How to stop a Flume's HTTP source


I'm working with Flume 1.4.0, and I'm trying to stop Flume's components in a certain way:

  • First, stop the sources.
  • Then wait until all the events within the channels are consumed by the sinks.
  • Once all the events have been consumed, stop the channels and the sinks.

The above tasks are performed by a shutdown hook, as the one created in org.apache.flume.node.Application (in fact, I'm developing a custom Application).

The way I'm getting references to the sources, channels and sinks is:

MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();

The point is I'm getting this NullPointerException:

2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
    at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
    at     org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
    at     es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)

HTTPSource.java:165 is about stoping the Jetty server implementing the Http server part of the source, which is the one that seems to be null:

162  @Override
163  public void stop() {
164    try {
165      srv.stop();
166      srv.join();
167      srv = null;
168    } catch (Exception ex) {
169      LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170    }
171    sourceCounter.stop();
172    LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173  }

Why is it null? The source works fine, and is able to receive Http requests.

I'm guessing this is not the proper way of closing Flume's components... If not, which is it?

Thanks!


Solution

  • Fixed. Thanks to Erik's pointer, I debuged the code until I realized that each time a configurationProvider.getConfiguration() sentence is invoked a new MaterializedConfiguration is created. Such a materialed configuration is a full set of running sources, channels ands sinks. Therefore, I had several copies for the same sources... oops! Nevertheless, somehow Flume is smart enough to detect those multiple materializations of the configuration and I've seen that it shuts down all the duplicated components... but that includes the volatile variable for the Jetty server and so on.

    Thus, instead of doing this:

    MaterializedConfiguration conf = configurationProvider.getConfiguration();
    ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
    ImmutableMap<String, Channel> channelsRef = conf.getChannels();
    ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
    

    Now I get the references I need at handleConfigurationEvent(MaterializedConfiguration conf) (it is overriden):

    @Override
    @Subscribe
    public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
        sourcesRef = conf.getSourceRunners();
        channelsRef = conf.getChannels();
        sinksRef = conf.getSinkRunners();
        super.handleConfigurationEvent(conf);
    } // handleConfigurationEvent
    

    Thanks again to Erik!