I want to perform a CoGroupByKey operation between two PCollections by a pair of key-values: id1
and id2
. I defined a custom class which includes both keys. I read that I also need to define a Coder
to specify the byte conversion, so I did.
However, I keep getting this error:
java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
Am I missing something? Non deterministic means that the keyCoder cannot always guarantee that id1
will come before id2
. Doesn't my Coder
already includes the order of the keys?
Thanks for your time!
public static PCollection<output> joinById1Id2(PCollection<Foo1> foo1, PCollection<Foo2> foo2){
PCollection<KV<KeysId1Id2, Foo1>> Foo1WithKeys = Foo1.apply(WithKeys.of(new MapFoo1ByKeysId1Id2()))
.setCoder(KvCoder.of(KeysId1Id2Coder.of(), SerializableCoder.of(Foo1.class)));
PCollection<KV<KeysId1Id2, Foo2>> Foo1WithKeys = Foo2.apply(WithKeys.of(new MapFoo2ByKeysId1Id2()))
.setCoder(KvCoder.of(KeysId1Id2Coder.of(), SerializableCoder.of(Foo2.class)));
TupleTag<Foo1> foo1TupleTag = new TupleTag();
TupleTag<Foo2> foo2TupleTag = new TupleTag();
return KeyedPCollectionTuple
.of(foo1TupleTag, Foo1WithKeys)
.and(foo2TupleTag, Foo2WithKeys)
.apply(CoGroupByKey.create())
.apply(ParDo.of(new JoinFoo1AndFoo2(Foo1TupleTag, Foo2TupleTag)));
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@SuperBuilder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class KeysId1Id2{
String id1;
String id2;
}
public class KeysId1Id2Coder extends CustomCoder<KeysId1Id2> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
public static KeysId1Id2Coder.of() {
return new KeysId1Id2Coder();
}
@Override
public void encode(KeysId1Id2 value, OutputStream outStream) throws IOException {
STRING_CODER.encode(value.getId1(), outStream);
STRING_CODER.encode(value.getId2(), outStream);
}
@Override
public KeysId1Id2 decode(InputStream inStream) throws IOException {
String id1 = STRING_CODER.decode(inStream);
String id2 = STRING_CODER.decode(inStream);
return new KeysId1Id2(id1, id2);
}
}
public class MapFoo1ByKeysId1Id2 extends SimpleFunction<Foo1, KeysId1Id2> {
@Override
public MapFoo1ByKeysId1Id2 apply(Foo1 line) {
return new KeysId1Id2(line.getId1(), line.getId2());
}
}
public class MapFoo2ByKeysId1Id2 extends SimpleFunction<Foo2, KeysId1Id2> {
@Override
public MapFoo2ByKeysId1Id2 apply(Foo2 line) {
return new KeysId1Id2(line.getId1(), line.getId2());
}
}
Good question.
A deterministic coder means that if two values are equal according to Java then they both will encode to the same bytes.
Why is this important? Because the engines that run Beam pipelines group by key according to the encoded bytes, because they are often implemented in different languages and do not call the equals
methods or anything like that.
So if you have a nondeterministic
coder, two equal values (like two Set
objects with the same elements) may encode to different bytes. Then they will not be grouped together.
Coders are responsible for reporting whether or not they are deterministic. For example, a SetCoder
that just uses iteration order is not deterministic. A SetCoder
that sorts elements before it encodes them is deterministic as long as the individual elements are also encoded deterministically. By default, the CustomCoder
class will report that it is nondeterministic. You can just override this verifyDeterministic
method.
But I recommend using Beam's Row
type as much as possible unless you really require custom encodings. If you just make your key type a Row
with a schema that has id1
and id2
fields then it will automatically know if it is deterministic based on the schema of the row.
Hope this helps!