I am trying to write orc files using orc-core to be later read by hive.
The files being written have the correct number of rows, but there's no content in the columns. I can see that both trying to read the file with a select query in hive, and both with hive --orcfiledump -d
.
I tried the example provided in the guide, which writes two long type columns, and the file produced is correctly read by hive. I suspect this has to do with the fact that I am writing instead string columns, but I still cannot make it work.
That's how I am currently writing the file:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).vector[row] = fields[j].getBytes();
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();
Any suggestion or helpful reference is really appreciated.
Thank you all.
Looks like an-in depth review of the API doc was what I needed. What I was missing:
initBuffer()
on each BytesColumnVector
in the initialization phasesetVal()
. This can be accomplished using also setRef()
. It is documented to be the fastest of the two, but I don't know if fits my specific case, I will try it.This is the updated code:
// File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);
// Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
bcv.initBuffer();
orcBuffers.add(i, bcv);
}
...
// Initializes writer
Writer writer=null;
try{
writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
partitionCounter++;
}
catch(IOException e){
log.error("Cannot open hdfs file. Reason: "+e.getMessage());
session.transfer(flowfile, hdfsFailure);
return;
}
// Writes content
String[] records = ...
for(int i=0;i<records.length;i++){
fields = records[i].split(fieldSeparator);
int row=batch.size++;
// Filling the orc buffers
for(int j=0;j<numFields;j++){
orcBuffers.get(j).setVal(row, fields[j].getBytes());
hashDigest.append(fields[j]);
}
if (batch.size == batch.getMaxSize()) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
}
if (batch.size != 0) {
try{
writer.addRowBatch(batch);
batch.reset();
}
catch(IOException e){
log.error("Cannot write to hdfs. Reason: "+e.getMessage());
return;
}
}
writer.close();