Search code examples
hadoopflumeflume-ng

Flume 1.6.0 spooling directory source with timestamp on header


I'm trying to create a new flume agent like source spooldir and puts them in HDFS. This is my config file:

agent.sources = file
agent.channels = channel
agent.sinks = hdfsSink

# SOURCES CONFIGURATION
agent.sources.file.type = spooldir
agent.sources.file.channels = channel
agent.sources.file.spoolDir = /path/to/json_files

# SINKS CONFIGURATION
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = /HADOOP/PATH/%Y/%m/%d/%H/

agent.sinks.hdfsSink.hdfs.filePrefix = common
agent.sinks.hdfsSink.hdfs.fileSuffix = .json
agent.sinks.hdfsSink.hdfs.rollInterval = 300
agent.sinks.hdfsSink.hdfs.rollSize = 5242880
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.maxOpenFiles = 2
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.callTimeout = 100000
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.channel = channel

# CHANNELS CONFIGURATION
agent.channels.channel.type = memory
agent.channels.channel.capacity = 10000
agent.channels.channel.transactionCapacity = 1000

I'm getting an error that describes Expected timestamp in the Flume event headers, but it was null. The files that I'm reading contains JSON structure, which has a field named timestamp.

Is there a way to add this timestamp in the header?


Solution

  • as explained in this post: http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/

    the changes needed is to include an interceptor and serializer to it:

    # SOURCES CONFIGURATION
    agent.sources.file.type = spooldir
    agent.sources.file.channels = channel
    agent.sources.file.spoolDir = /path/to/json_files
    agent.sources.file.interceptors = i1
    agent.sources.file.interceptors.i1.type = regex_extractor
    agent.sources.file.interceptors.i1.regex = <regex_for_timestamp>
    agent.sources.file.interceptors.i1.serializers = s1
    agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
    agent.sources.file.interceptors.i1.serializers.s1.name = timestamp
    agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex>
    

    thanks for pointing out that besides the link i needed to include a proper snippet :)