Search code examples
cassandrauser-defined-functionscqluser-defined-types

Cassandra: How to reference a field in user defined type in a user defined function (Java)


How does one reference fields within a user defined type when using Java user defined functions. I have found examples for map, set and tuple, but not for user defined types with multiple fields.

I have the following type defined:

create type avg_type_1 (
  accum tuple<text,int,double>,   // source, count, sum
  avg_map map<text,double>        // source, average
);

The following code:

CREATE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
    CALLED ON NULL INPUT
    RETURNS avg_type_1
    LANGUAGE java
    AS $$

        // when no source yet, save the first source, set the count to 1, and set the value
        if (state.accum.getString(0) == null) {
            state.accum.setString(0, source);
            state.accum.setInt(1, 1);
            state.accum.setDouble(2, value);
        }
        ...

returns the error:

InvalidRequest: Error from server: code=2200 [Invalid query] message="Java source compilation failed:
Line 4: accum cannot be resolved or is not a field

Solution

  • In Java the UDT variable is represented by the class com.datastax.driver.core.UDTValue. This class has get and set methods. There are methods using an index (0 ...) to identify the fields (in the order they are defined in the UDT), and methods that use the field name.

    See API Doc.

    Here are some examples, using the type defined in the question:

    TupleValue accumState = state.getTupleValue( "accum");
    String prevSource = accumState.getString( 0);
    Map<String,Double> avgMap = state.getMap( "avg_map", String.class, Double.class);
    

    The first line gets the accum field from the function's state. Instead of the name, the index 0 (zero, it is the first field) could be used.

    The second line gets the first element of the tuple. Only the index version can be used, as the elements of a tuple are not named.

    The third line gets the avg_map field.

    accumState.setDouble( 2, value);
    state.setTupleValue( "accum", accumState);
    

    The above example sets the third element in the tuple, and then puts the tuple back into the function's state variable. Note that you have to put the tuple back into the state variable. The following does not work.

    // does not work
    state.getTupleValue( "accum").setDouble( 2, value);
    

    Below is the full example UDF.

    // sums up until the source changes, then adds the avg to the map
    // IMPORTANT: table must be ordered by source
    CREATE OR REPLACE FUNCTION average_by_source_1( state avg_type_1, source text, value double)
        CALLED ON NULL INPUT
        RETURNS avg_type_1
        LANGUAGE java
        AS $$
    
            TupleValue accumState = state.getTupleValue( "accum");
            String prevSource = accumState.getString( 0);
    
            // when no source yet, save the first source, set the count to 1, and set the value
            if (prevSource == null) {
                accumState.setString( 0, source);
                accumState.setInt( 1, 1);
                accumState.setDouble( 2, value);
                state.setTupleValue( "accum", accumState);
            }
    
            // when same source, increment the count and add the value
            else if (prevSource.equals( source)) {
                accumState.setInt( 1, accumState.getInt( 1) + 1);
                accumState.setDouble( 2, accumState.getDouble( 2) + value);
                state.setTupleValue( "accum", accumState);
            }
    
            // when different source, calc average and copy to map, then re-init accumulation
            else if (accumState.getInt( 1) > 0) {
                double avgVal = accumState.getDouble( 2) / accumState.getInt( 1);
                Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
                mapState.put( prevSource, avgVal);
                state.setMap( "avg_map", mapState, String.class, Double.class);
                accumState.setString( 0, source);
                accumState.setInt( 1, 1);
                accumState.setDouble( 2, value);
                state.setTupleValue( "accum", accumState);
            }
    
            // should not happen - prev case uses "if" to avoid division by zero
            else {
                Map<String,Double> mapState = state.getMap( "avg_map", String.class, Double.class);
                mapState.put( "ERROR: div by zero", null);
                accumState.setString( 0, source);
                accumState.setInt( 1, 1);
                accumState.setDouble( 2, value);
                state.setTupleValue( "accum", accumState);
            }
    
            // IMPROTANT: final function must calculate the average for the last source and
            //            add it to the map.
    
            return state;
    
        $$
    ;