I wrote custom interceptor for apache flume 1.7. Interceptor must set special header to all event from kafka source which topic match regex defined by config. But it's doesn't work. My knowledge of java is too low, please help me to fix issue. Part of my config /etc/flume-ng/conf/flume.conf:
######################## kafka source ########################
agent.sources.kafka_source.type =
org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka_source.interceptors = i1
agent.sources.kafka_source.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor
agent.sources.kafka_source.interceptors.i1.regex = stat_.+
agent.sources.kafka_source.interceptors.i1.value = hourly
agent.sources.kafka_source.interceptors.i1.default = daily
I got error in flume.log:
31 Jul 2017 18:41:11,819 ERROR [conf-file-poller-0] (org.apache.flume.channel.ChannelProcessor.configureInterceptors:118) - Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
31 Jul 2017 18:41:11,823 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSources:361) -
Source kafka_source has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
Caused by: java.lang.NoSuchMethodException: org.apache.flume.interceptor.TopicRotationHeaderInterceptor.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
Source code of interceptor:
package org.apache.flume.interceptor;
import java.util.List;
import java.util.Map;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Interceptor class that appends topic rotation period header to all events.
*
* Properties:<p>
*
* regex: regex to match topics
*
* value: Value to use in header insertion.
* (default is "value")<p>
*
* Sample config:<p>
*
* <code>
* agent.sources.r1.channels = c1<p>
* agent.sources.r1.type = SEQ<p>
* agent.sources.r1.interceptors = i1<p>
* agent.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TopicRotationHeaderInterceptor<p>
* agent.sources.r1.interceptors.i1.regex = stat_.+<p>
* agent.sources.r1.interceptors.i1.value = hourly<p>
* </code>
*
*/
public class TopicRotationHeaderInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(TopicRotationHeaderInterceptor.class);
private String value;
private String defaultValue;
private Pattern matchRegex;
/**
* Only {@link TopicRotationHeaderInterceptor.Builder} can build me
*/
private TopicRotationHeaderInterceptor(Pattern matchRegex, String value, String defaultValue) {
this.matchRegex = matchRegex;
this.value = value;
this.defaultValue = defaultValue;
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
final String topic = (String)headers.get(Constants.TOPIC_HEADER);
String resultValue = defaultValue;
if (matchRegex != null) {
final Matcher matcher = matchRegex.matcher(topic);
if (matcher.matches()) {
resultValue = value;
}
}
headers.put(Constants.HEADER, resultValue);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instance of the TopicRotationHeaderInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private String value;
private String defaultValue;
private String regexStr;
private Pattern matchRegex;
@Override
public void configure(Context context) {
regexStr = context.getString(Constants.REGEX, Constants.REGEX_DEFAULT);
matchRegex = Pattern.compile(regexStr);
value = context.getString(Constants.VALUE, Constants.VALUE_DEFAULT);
defaultValue = context.getString(Constants.DEFAULT_VALUE, Constants.DEFAULT_VALUE_DEFAULT);
}
@Override
public Interceptor build() {
return new TopicRotationHeaderInterceptor(matchRegex, value, defaultValue);
}
}
public static class Constants {
public static final String REGEX = "regex";
public static final String REGEX_DEFAULT = ".+";
public static final String VALUE = "value";
public static final String VALUE_DEFAULT = "daily";
public static final String DEFAULT_VALUE = "default";
public static final String DEFAULT_VALUE_DEFAULT = "daily";
public static final String HEADER = "rotation";
public static final String TOPIC_HEADER = "topic";
}
}
Your flume.conf file have a mistake, change
org.apache.flume.interceptor.TopicRotationHeaderInterceptor
For:
org.apache.flume.interceptor.TopicRotationHeaderInterceptor**$Builder**
This invoque the build method of interceptor class
regards