Search code examples
javaapache-beamapache-beam-io

Beam PAssert messes up the Row


I am exploring testing with Beam and encountered a weird problem.

My driver program works as expected, but its test is failing with an error like this:

Expected: iterable with items [<Row: 
project_id:count
count_in:2
count_out:0
type:null
window_max_ts:86399999
>] in any order
     but: not matched: <Row: 
project_id:p1
count_in:2
count_out:0
type:count
window_max_ts:86399999
>

And here is my PAssert code:

PAssert
            .that(output)
            .inWindow(window)
            .containsInAnyOrder(
                Row
                    .withSchema(OUTPUT_SCHEMA)
                    .withFieldValue("type", "count")
                    .withFieldValue("count_in", 2L)
                    .withFieldValue("count_out", 0L)
                    .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis())
                    .build()
            );

On the last step of my pipeline, I log the element in question.

[direct-runner-worker] DEBUG co.botanalytics.data.processing.beam.transforms.Log - Window: [maxTimestamp=1970-01-01T23:59:59.999Z], Pane: [PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}], Element: Row: 
project_id:p1
count_in:2
count_out:0
type:count
window_max_ts:86399999

This is the expected result.

When I debugged the test, the problem boiled down to CoderUtils from Beam Java SDK.

After CoderUtils encodes and decodes, it produces a completely different expected Row. All of its fields are messed up, and as a result, PAssert fails.

I am wondering if there are any solutions to this problem. Any suggestions are more than welcome.

Thanks in advance!

OUTPUT_SCHEMA definition:

private static final transient Schema SCHEMA = Schema
            .builder()
            .addStringField("project_id")
            .addNullableField("type", Schema.FieldType.STRING)
            .addInt64Field("count_in")
            .addInt64Field("count_out")
            .build();

Solution

  • The code can work as expected and the test fails. I believe that is happening because of an error on the PAssert definition.

    • Add the project tag in the test row definition .withFieldValue("project_id", "p1"), it may solve the problem of the crossed parameters.

    • For the error Expected: iterable with items [<Row: ... >] in any order but: not matched: please provide the output variable as an Array of Rows, instead of only a single Row. Its expecting an array but just receiving a single Row.

    Your final code will be something like this:

    // just an example to convert to array, choose any suitable way for you
        Foo[] array = new Foo[output.size()];
        output.toArray(array);
    
            PAssert
                        .that(output)
                        .inWindow(window)
                        .containsInAnyOrder(
                            Row
                                .withSchema(OUTPUT_SCHEMA)
                                .withFieldValue("project_id", "p1")
                                .withFieldValue("type", "count")
                                .withFieldValue("count_in", 2L)
                                .withFieldValue("count_out", 0L)
                                .withFieldValue(AddWindowTimestamp.TIMESTAMP_FIELD, window.maxTimestamp().getMillis())
                                .build()
                        );