Search code examples
jsonflume

Converting csv file to JSON in flume


I am trying to pass a csv file from flume to kafka. I am able to pass the file directly using the following config file to pass the entire file from flume to Kafka.

   # Name the components on this agent
   a1.sources = r1
   a1.sinks = k1
   a1.channels = c1

  # Describe the source
  a1.sources.r1.type = exec
  a1.sources.r1.command = cat /User/Desktop/logFile.csv


   # Describe the sink
   a1.sinks.k1.type  = org.apache.flume.sink.kafka.KafkaSink
   a1.sinks.k1.topic = kafkaTopic
   a1.sinks.k1.brokerList = localhost:9092
   a1.sinks.sink1.batchSize = 20

   # Use a channel which buffers events in memory
   a1.channels.c1.type = memory
   a1.channels.c1.capacity = 10000
   a1.channels.c1.transactionCapacity = 10000

   # Bind the source and sink to the channel
   a1.sources.r1.channels = c1
   a1.sinks.k1.channel = c1

But I want it to be converted to JSON format before passing to kafka for further processing. Can someone please advise me as how to convert a file from csv to JSON format.

Thanks!!


Solution

  • I think you need to write your own interceptor.

    1. Start with implement interceptor interface
    2. Read CSV from flume event body.
    3. Parse it and Compose JSON
    4. Stick it back to event body

    Example: https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/