Search code examples
csvhadoopavroflumebigdata

Transfering csv files into hdfs, with converting them to avro, using flume


I am new to Big Data and I have task to transfer csv files to HDFS using Flume, but it also should convert those csv to avro. I tried to do that using following flume configuration:

a1.channels = dataChannel
a1.sources = dataSource
a1.sinks = dataSink

a1.channels.dataChannel.type = memory
a1.channels.dataChannel.capacity = 1000000
a1.channels.dataChannel.transactionCapacity = 10000

a1.sources.dataSource.type = spooldir
a1.sources.dataSource.spoolDir = {spool_dir}
a1.sources.dataSource.fileHeader = true
a1.sources.dataSource.fileHeaderKey = file
a1.sources.dataSource.basenameHeader = true
a1.sources.dataSource.basenameHeaderKey = basename
a1.sources.dataSource.interceptors.attach-schema.type = static
a1.sources.dataSource.interceptors.attach-schema.key = flume.avro.schema.url
a1.sources.dataSource.interceptors.attach-schema.value = {path_to_schema_in_hdfs}

a1.sinks.dataSink.type = hdfs
a1.sinks.dataSink.hdfs.path = {sink_path}
a1.sinks.dataSink.hdfs.format = text
a1.sinks.dataSink.hdfs.inUsePrefix = .
a1.sinks.dataSink.hdfs.filePrefix = drone
a1.sinks.dataSink.hdfs.fileSuffix = .avro
a1.sinks.dataSink.hdfs.rollSize = 180000000
a1.sinks.dataSink.hdfs.rollCount = 100000
a1.sinks.dataSink.hdfs.rollInterval = 120
a1.sinks.dataSink.hdfs.idleTimeout = 3600
a1.sinks.dataSink.hdfs.fileType = DataStream
a1.sinks.dataSink.serializer = avro_event

The output where avro file with flume's default schema.I also tried to use AvroEventSerializer, but I just got a lot of different error, I solved all of them, except this one:

ERROR hdfs.HDFSEventSink: process failed
java.lang.ExceptionInInitializerError
        at org.apache.hadoop.hdfs.DFSOutputStream.computePacketChunkSize(DFSOutputStream.java:1305)
        at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1243)
        at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1266)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1101)
        at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1059)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232)
        at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75)

Thank you for any help.


Solution

  • Sory for mistakes in the config. I fixed them and found the way to convert css to avro. I a little bit modified AvroEventSerializer this way:

    public void write(Event event) throws IOException {
            if (dataFileWriter == null) {
                initialize(event);
            }
            String[] items = new String(event.getBody()).split(",");
            city.put("deviceID", Long.parseLong(items[0]));
            city.put("groupID", Long.parseLong(items[1]));
            city.put("timeCounter", Long.parseLong(items[2]));
            city.put("cityCityName", items[3]);
            city.put("cityStateCode", items[4]);
            city.put("sessionCount", Long.parseLong(items[5]));
            city.put("errorCount", Long.parseLong(items[6]));
            dataFileWriter.append(citi);
        }
    

    and here is city definition:

    private GenericRecord city = null;
    

    Please reply, if you know a better way to do that