I'm working with Flume 1.4.0, and I'm trying to stop Flume's components in a certain way:
The above tasks are performed by a shutdown hook, as the one created in org.apache.flume.node.Application
(in fact, I'm developing a custom Application
).
The way I'm getting references to the sources, channels and sinks is:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
The point is I'm getting this NullPointerException
:
2015-02-17 16:03:28,094 (agent-shutdown-hook) [ERROR - org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:169)] Error while stopping HTTPSource. Exception follows.
java.lang.NullPointerException
at org.apache.flume.source.http.HTTPSource.stop(HTTPSource.java:165)
at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.stopSources(CygnusApplication.java:296)
at es.tid.fiware.fiwareconnectors.cygnus.nodes.CygnusApplication$AgentShutdownHook.run(CygnusApplication.java:231)
HTTPSource.java:165
is about stoping the Jetty server implementing the Http server part of the source, which is the one that seems to be null:
162 @Override
163 public void stop() {
164 try {
165 srv.stop();
166 srv.join();
167 srv = null;
168 } catch (Exception ex) {
169 LOG.error("Error while stopping HTTPSource. Exception follows.", ex);
170 }
171 sourceCounter.stop();
172 LOG.info("Http source {} stopped. Metrics: {}", getName(), sourceCounter);
173 }
Why is it null? The source works fine, and is able to receive Http requests.
I'm guessing this is not the proper way of closing Flume's components... If not, which is it?
Thanks!
Fixed. Thanks to Erik's pointer, I debuged the code until I realized that each time a configurationProvider.getConfiguration()
sentence is invoked a new MaterializedConfiguration
is created. Such a materialed configuration is a full set of running sources, channels ands sinks. Therefore, I had several copies for the same sources... oops! Nevertheless, somehow Flume is smart enough to detect those multiple materializations of the configuration and I've seen that it shuts down all the duplicated components... but that includes the volatile variable for the Jetty server and so on.
Thus, instead of doing this:
MaterializedConfiguration conf = configurationProvider.getConfiguration();
ImmutableMap<String, SourceRunner> sourcesRef = conf.getSourceRunners();
ImmutableMap<String, Channel> channelsRef = conf.getChannels();
ImmutableMap<String, SinkRunner> sinksRef = conf.getSinkRunners();
Now I get the references I need at handleConfigurationEvent(MaterializedConfiguration conf)
(it is overriden):
@Override
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
sourcesRef = conf.getSourceRunners();
channelsRef = conf.getChannels();
sinksRef = conf.getSinkRunners();
super.handleConfigurationEvent(conf);
} // handleConfigurationEvent
Thanks again to Erik!