Search code examples
javaapache-arrow

How to populate a Java Apache Arrow Field List of Structs?


I have a data set which is mostly a 2D table, however one column (Field) (called Attributes) contains a List of Structs in each cell. Each Struct has three Fields: Attribute Tag, Attribute Type and Attribute Value.

The definition of the Attributes Field is:

/**
 * Attribute Tag - Two character tag.
 */
public static final Field ATTRIBUTE_TAG_FIELD =
        new Field("AttributeTag", FieldType.notNullable(new ArrowType.FixedSizeBinary(2)), null);


/**
 * Attribute Type - One character type.
 */
public static final Field ATTRIBUTE_TYPE_FIELD =
        new Field(
                "AttributeType",
                new FieldType(false,
                new ArrowType.FixedSizeBinary(1), null),
                null
        );

/**
 * String representation of the Attribute value.
 */
public static final Field ATTRIBUTE_VALUE_FIELD = new Field("AttributeValue", FieldType.notNullable(new ArrowType.Utf8()), null);

/**
 * The field is a nullable List of Structs each with an attribute tag,
type and value.
 */
public static final Field ATTRIBUTES_FIELD =
        new Field("Attributes", FieldType.nullable(new ArrowType.List()), List.of(
                new Field("Attribute", FieldType.nullable(new ArrowType.Struct()), List.of(
                        ATTRIBUTE_TAG_FIELD, ATTRIBUTE_TYPE_FIELD, ATTRIBUTE_VALUE_FIELD))));

I have this code that attempts to populate the Attributes from some source data. Although this produces no errors when run, it doesn't result in any values in the attributes vector.

final ListVector attributes = (ListVector)
ATTRIBUTES_FIELD.createVector(allocator);

// this is the source of the attributes that I will populate into the
attributes vector
final List<SAMRecord.SAMTagAndValue> recordAttributes =
samRecord.getAttributes();

if (recordAttributes != null && recordAttributes.size() > 0 ) {
    final UnionListWriter listWriter = attributes.getWriter();
    listWriter.allocate();

    IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
        listWriter.setPosition(attributeIndex);
        listWriter.startList();

        // put the values of the attribute in the arrow struct
        final SAMRecord.SAMTagAndValue samTagAndValue recordAttributes.get(attributeIndex);

        // I think the problem is here. In a debugger this seems to create a new writer not related to my Vector??
        final BaseWriter.StructWriter structWriter = listWriter.struct("Attribute");
        structWriter.start();

        final byte[] tagBytes =
            samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
        // todo find out the type from the value
        final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
        final byte[] valueBytes =
            samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);

        ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
        tempBuf.setBytes(0, tagBytes);
        structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
        tempBuf.close();


        tempBuf = allocator.buffer(typeBytes.length);
        structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
        tempBuf.close();

        tempBuf = allocator.buffer(valueBytes.length);
        structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
        tempBuf.close();

        structWriter.end();
    });

    listWriter.setValueCount(recordAttributes.size());
    listWriter.end();
}

Why doesn't this result in any values in the attributes ListVector? What is the correct way to do this?


Solution

  • The solution I eventually found involves placing the UnionListWriter and the startList(), endList() and setRowCount() calls in the appropriate place.

    The general pattern is:

    try (
       BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
       
       // create all your vectors and other closable ...
    
       final ListVector attributes = (ListVector) SamAlignmentSchema.ATTRIBUTES_FIELD.createVector(allocator);
       // create the writer for the list vector
       final UnionListWriter listWriter = attributes.getWriter();
    
       // create the vector schema root
       VectorSchemaRoot samRecordBatch = new VectorSchemaRoot(
           aligmentSchema.getFields(),
           List.of(
               // add vectors including the list vecto
               attributes
           )
       )
    ){
        //allocate the VectorSchemaRoot
        samRecordBatch.allocateNew();
        
        // for reach sam record (read)
        samReader.iterator().stream().forEach(samRecord -> {
            int index = samRecordBatch.getRowCount();
            System.out.println("index = " + index);
    
            // set the values of the primitive vectors using set or setSafe ...
    
            // write a list of attributes into the attrbutes field
            final List<SAMRecord.SAMTagAndValue> recordAttributes = samRecord.getAttributes();
    
            // start a new list at the current index
            listWriter.startList();
            listWriter.setPosition(index);
    
            if (recordAttributes != null) {
                // for each attribute
                IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
    
                    // put the values of the attribute in the arrow struct
                    final SAMRecord.SAMTagAndValue samTagAndValue = recordAttributes.get(attributeIndex);
    
                    final BaseWriter.StructWriter structWriter = listWriter.struct();
                    
                    // start a struct to go into the list
                    structWriter.start();
    
                    final byte[] tagBytes = samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
                    final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
                    final byte[] valueBytes = samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
    
                    ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
                    tempBuf.setBytes(0, tagBytes);
                    structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
                    tempBuf.close();
    
                    tempBuf = allocator.buffer(typeBytes.length);
                    tempBuf.setBytes(0, typeBytes);
                    structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
                    tempBuf.close();
    
                    tempBuf = allocator.buffer(valueBytes.length);
                    tempBuf.setBytes(0, valueBytes);
                    structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
                    tempBuf.close();
    
                    //finished with this struct
                    structWriter.end();
                });
            }
            index++;
    
            // finished writing the list of structs for this position
            listWriter.endList();
    
            listWriter.setValueCount(index);
            samRecordBatch.setRowCount(index);
    });