Search code examples
javahadoophiveorc

Java - empty orc file


I am trying to write orc files using orc-core to be later read by hive.

The files being written have the correct number of rows, but there's no content in the columns. I can see that both trying to read the file with a select query in hive, and both with hive --orcfiledump -d.

I tried the example provided in the guide, which writes two long type columns, and the file produced is correctly read by hive. I suspect this has to do with the fact that I am writing instead string columns, but I still cannot make it work.

That's how I am currently writing the file:

    //  File schema
    String outputFormat = "struct<";
    for(int i=0;i<outputSchema.length;i++){
        outputFormat+=outputSchema[i]+":string,";
    }
    outputFormat+="lastRecordHash:string,currentHash:string>";
    TypeDescription orcSchema = TypeDescription.fromString(outputFormat);

    //  Initializes buffers
    VectorizedRowBatch batch = orcSchema.createRowBatch();
    ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
    for(int i=0;i<numFields+2;i++){
        BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
        orcBuffers.add(i, bcv);
    }

    ...

    //  Initializes writer
    Writer writer=null;
    try{
        writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
        partitionCounter++;
    }
    catch(IOException e){
        log.error("Cannot open hdfs file. Reason: "+e.getMessage());
        session.transfer(flowfile, hdfsFailure);
        return;
    }

    //  Writes content
    String[] records = ...

    for(int i=0;i<records.length;i++){
        fields = records[i].split(fieldSeparator);

        int row=batch.size++;

        //  Filling the orc buffers
        for(int j=0;j<numFields;j++){
            orcBuffers.get(j).vector[row] = fields[j].getBytes();
            hashDigest.append(fields[j]);
        }
        if (batch.size == batch.getMaxSize()) {
            try{
                writer.addRowBatch(batch);
                batch.reset();
            }
            catch(IOException e){
                log.error("Cannot write to hdfs. Reason: "+e.getMessage());
                return;
            }
        }         
    }
    if (batch.size != 0) {
        try{
            writer.addRowBatch(batch);
            batch.reset();
        }
        catch(IOException e){
            log.error("Cannot write to hdfs. Reason: "+e.getMessage());
            return;
        }
    }
    writer.close();

Any suggestion or helpful reference is really appreciated.

Thank you all.


Solution

  • Looks like an-in depth review of the API doc was what I needed. What I was missing:

    • Call initBuffer() on each BytesColumnVector in the initialization phase
    • Assign the value of the columns calling setVal(). This can be accomplished using also setRef(). It is documented to be the fastest of the two, but I don't know if fits my specific case, I will try it.

    This is the updated code:

    //  File schema
    String outputFormat = "struct<";
    for(int i=0;i<outputSchema.length;i++){
        outputFormat+=outputSchema[i]+":string,";
    }
    outputFormat+="lastRecordHash:string,currentHash:string>";
    TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
    
    //  Initializes buffers
    VectorizedRowBatch batch = orcSchema.createRowBatch();
    ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
    for(int i=0;i<numFields+2;i++){
        BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
        bcv.initBuffer();
        orcBuffers.add(i, bcv);
    }
    
    ...
    
    //  Initializes writer
    Writer writer=null;
    try{
        writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
        partitionCounter++;
    }
    catch(IOException e){
        log.error("Cannot open hdfs file. Reason: "+e.getMessage());
        session.transfer(flowfile, hdfsFailure);
        return;
    }
    
    //  Writes content
    String[] records = ...
    
    for(int i=0;i<records.length;i++){
        fields = records[i].split(fieldSeparator);
    
        int row=batch.size++;
    
        //  Filling the orc buffers
        for(int j=0;j<numFields;j++){
            orcBuffers.get(j).setVal(row, fields[j].getBytes());
            hashDigest.append(fields[j]);
        }
        if (batch.size == batch.getMaxSize()) {
            try{
                writer.addRowBatch(batch);
                batch.reset();
            }
            catch(IOException e){
                log.error("Cannot write to hdfs. Reason: "+e.getMessage());
                return;
            }
        }         
    }
    if (batch.size != 0) {
        try{
            writer.addRowBatch(batch);
            batch.reset();
        }
        catch(IOException e){
            log.error("Cannot write to hdfs. Reason: "+e.getMessage());
            return;
        }
    }
    writer.close();