Search code examples
apache-kafkareal-timeflume

How to use a growing file as Apache Kafka producer and read only the newly appended data


I am trying to use file as my kafka producer. The source file grows continuously (say 20 records/lines per second). Below is a post similar to my problem:

How to write a file to Kafka Producer

But in this case, the whole file is read and added to the Kafka topic every time a new line is inserted into the file. I want only the newly appended lines to be sent to the topic (ie. if the file holds 10 lines already and 4 more lines are appended to it, only those 4 lines need to be sent to the topic).

Is there a way to achieve this ??

Other solutions tried:

  1. Apache flume by using source type as 'spooldir'. But it was of no use since it reads data from new files that are added to the directory and not when data is appended to an already-read file.

  2. Also we tried with flume source type as 'exec' and command as 'tail –F /path/file-name'. This too doesn't seem to work.

Suggestions for using any other tool is also welcomed as my objective is to read the data from the file in real time (ie. I need the data as soon as it is inserted into the file).


Solution

  • There are a couple of options that you could look at, depending on your exact needs.

    Kafka Connect

    As stated by Chin Huang above the FileSource Connector from Kafka Connect should be able to do what you want without installing additional software. Check out the Connect Quickstart for guidance on how to get this up and running, they actually have an example for reading a file into Kafka.

    Logstash

    Logstash is the classic option for something like this, with its Kafka output it will do just what you want it to do, for one or muliple files. The following configuration should give you roughly what you want.

    input {
      file {
        path => "/path/to/your/file"
      }
    output {
        kafka {
            bootstrap_servers => "127.0.0.1:9092"
            topic_id => "topicname"
        }
    }
    

    Filebeat

    Filebeat is pretty similar to Logstash, it just offers less functionality if you want to perform additional processing on data read from the file. Also, it is written in go instead of java, so the footprint on the machine its running on should be smaller. The following should be a minimal config to get you started (from memory, you might need to add a parameter or two if they are mandatory):

    filebeat.prospectors:
    - type: log
      paths:
        - /path/to/your/file
    
    output.kafka:
      hosts: ["127.0.0.1:9092"]
      topic: 'topicname'
    

    Flume

    If you want to revisit your Flume option, have a look at the TaildirSource, I have not used it, but it sounds like it should fit your use case quite nicely.