I have a data set which is mostly a 2D table, however one column (Field
)
(called Attributes) contains a List
of Structs
in each cell. Each
Struct
has three Field
s: Attribute Tag, Attribute Type and Attribute
Value.
The definition of the Attributes Field
is:
/**
* Attribute Tag - Two character tag.
*/
public static final Field ATTRIBUTE_TAG_FIELD =
new Field("AttributeTag", FieldType.notNullable(new ArrowType.FixedSizeBinary(2)), null);
/**
* Attribute Type - One character type.
*/
public static final Field ATTRIBUTE_TYPE_FIELD =
new Field(
"AttributeType",
new FieldType(false,
new ArrowType.FixedSizeBinary(1), null),
null
);
/**
* String representation of the Attribute value.
*/
public static final Field ATTRIBUTE_VALUE_FIELD = new Field("AttributeValue", FieldType.notNullable(new ArrowType.Utf8()), null);
/**
* The field is a nullable List of Structs each with an attribute tag,
type and value.
*/
public static final Field ATTRIBUTES_FIELD =
new Field("Attributes", FieldType.nullable(new ArrowType.List()), List.of(
new Field("Attribute", FieldType.nullable(new ArrowType.Struct()), List.of(
ATTRIBUTE_TAG_FIELD, ATTRIBUTE_TYPE_FIELD, ATTRIBUTE_VALUE_FIELD))));
I have this code that attempts to populate the Attributes from some source data. Although this produces no errors when run, it doesn't result in any values in the attributes vector.
final ListVector attributes = (ListVector)
ATTRIBUTES_FIELD.createVector(allocator);
// this is the source of the attributes that I will populate into the
attributes vector
final List<SAMRecord.SAMTagAndValue> recordAttributes =
samRecord.getAttributes();
if (recordAttributes != null && recordAttributes.size() > 0 ) {
final UnionListWriter listWriter = attributes.getWriter();
listWriter.allocate();
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
listWriter.setPosition(attributeIndex);
listWriter.startList();
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue recordAttributes.get(attributeIndex);
// I think the problem is here. In a debugger this seems to create a new writer not related to my Vector??
final BaseWriter.StructWriter structWriter = listWriter.struct("Attribute");
structWriter.start();
final byte[] tagBytes =
samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
// todo find out the type from the value
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes =
samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(typeBytes.length);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
structWriter.end();
});
listWriter.setValueCount(recordAttributes.size());
listWriter.end();
}
Why doesn't this result in any values in the attributes
ListVector
? What is the correct way to do this?
The solution I eventually found involves placing the UnionListWriter
and the startList()
, endList()
and setRowCount()
calls in the appropriate place.
The general pattern is:
try (
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// create all your vectors and other closable ...
final ListVector attributes = (ListVector) SamAlignmentSchema.ATTRIBUTES_FIELD.createVector(allocator);
// create the writer for the list vector
final UnionListWriter listWriter = attributes.getWriter();
// create the vector schema root
VectorSchemaRoot samRecordBatch = new VectorSchemaRoot(
aligmentSchema.getFields(),
List.of(
// add vectors including the list vecto
attributes
)
)
){
//allocate the VectorSchemaRoot
samRecordBatch.allocateNew();
// for reach sam record (read)
samReader.iterator().stream().forEach(samRecord -> {
int index = samRecordBatch.getRowCount();
System.out.println("index = " + index);
// set the values of the primitive vectors using set or setSafe ...
// write a list of attributes into the attrbutes field
final List<SAMRecord.SAMTagAndValue> recordAttributes = samRecord.getAttributes();
// start a new list at the current index
listWriter.startList();
listWriter.setPosition(index);
if (recordAttributes != null) {
// for each attribute
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue = recordAttributes.get(attributeIndex);
final BaseWriter.StructWriter structWriter = listWriter.struct();
// start a struct to go into the list
structWriter.start();
final byte[] tagBytes = samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(typeBytes.length);
tempBuf.setBytes(0, typeBytes);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
tempBuf.setBytes(0, valueBytes);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
//finished with this struct
structWriter.end();
});
}
index++;
// finished writing the list of structs for this position
listWriter.endList();
listWriter.setValueCount(index);
samRecordBatch.setRowCount(index);
});