I am trying to pass a csv file from flume to kafka. I am able to pass the file directly using the following config file to pass the entire file from flume to Kafka.
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /User/Desktop/logFile.csv
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafkaTopic
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.sink1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
But I want it to be converted to JSON format before passing to kafka for further processing. Can someone please advise me as how to convert a file from csv to JSON format.
Thanks!!
I think you need to write your own interceptor.
Example: https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/