Search code examples
javajoinapache-beam

Fix non deterministic keyCoder in a GroupByKey error


I want to perform a CoGroupByKey operation between two PCollections by a pair of key-values: id1 and id2 . I defined a custom class which includes both keys. I read that I also need to define a Coder to specify the byte conversion, so I did.

However, I keep getting this error:

java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic

Am I missing something? Non deterministic means that the keyCoder cannot always guarantee that id1 will come before id2 . Doesn't my Coder already includes the order of the keys?

Thanks for your time!

public static PCollection<output> joinById1Id2(PCollection<Foo1> foo1, PCollection<Foo2> foo2){

        PCollection<KV<KeysId1Id2, Foo1>> Foo1WithKeys = Foo1.apply(WithKeys.of(new MapFoo1ByKeysId1Id2()))
             .setCoder(KvCoder.of(KeysId1Id2Coder.of(), SerializableCoder.of(Foo1.class)));

        PCollection<KV<KeysId1Id2, Foo2>> Foo1WithKeys = Foo2.apply(WithKeys.of(new MapFoo2ByKeysId1Id2()))
             .setCoder(KvCoder.of(KeysId1Id2Coder.of(), SerializableCoder.of(Foo2.class)));

        TupleTag<Foo1> foo1TupleTag = new TupleTag();
        TupleTag<Foo2> foo2TupleTag = new TupleTag();

        return KeyedPCollectionTuple
                .of(foo1TupleTag, Foo1WithKeys)
                .and(foo2TupleTag, Foo2WithKeys)
                .apply(CoGroupByKey.create())
                .apply(ParDo.of(new JoinFoo1AndFoo2(Foo1TupleTag, Foo2TupleTag)));
    }
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class KeysId1Id2{
    String id1;
    String id2;
}
public class KeysId1Id2Coder extends CustomCoder<KeysId1Id2> {
    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();

    public static KeysId1Id2Coder.of() {
        return new KeysId1Id2Coder();
    }

    @Override
    public void encode(KeysId1Id2 value, OutputStream outStream) throws IOException {
        STRING_CODER.encode(value.getId1(), outStream);
        STRING_CODER.encode(value.getId2(), outStream);
    }

    @Override
    public KeysId1Id2 decode(InputStream inStream) throws IOException {
        String id1 = STRING_CODER.decode(inStream);
        String id2 = STRING_CODER.decode(inStream);
        return new KeysId1Id2(id1, id2);
    }
}
public class MapFoo1ByKeysId1Id2 extends SimpleFunction<Foo1, KeysId1Id2> {
    @Override
    public MapFoo1ByKeysId1Id2 apply(Foo1 line) {

        return new KeysId1Id2(line.getId1(), line.getId2());
    }
}

public class MapFoo2ByKeysId1Id2 extends SimpleFunction<Foo2, KeysId1Id2> {
    @Override
    public MapFoo2ByKeysId1Id2 apply(Foo2 line) {

        return new KeysId1Id2(line.getId1(), line.getId2());
    }
}

Solution

  • Good question.

    A deterministic coder means that if two values are equal according to Java then they both will encode to the same bytes.

    Why is this important? Because the engines that run Beam pipelines group by key according to the encoded bytes, because they are often implemented in different languages and do not call the equals methods or anything like that.

    So if you have a nondeterministic coder, two equal values (like two Set objects with the same elements) may encode to different bytes. Then they will not be grouped together.

    Coders are responsible for reporting whether or not they are deterministic. For example, a SetCoder that just uses iteration order is not deterministic. A SetCoder that sorts elements before it encodes them is deterministic as long as the individual elements are also encoded deterministically. By default, the CustomCoder class will report that it is nondeterministic. You can just override this verifyDeterministic method.

    But I recommend using Beam's Row type as much as possible unless you really require custom encodings. If you just make your key type a Row with a schema that has id1 and id2 fields then it will automatically know if it is deterministic based on the schema of the row.

    Hope this helps!