I am new to Apache beam,I am using Apache beam and as runner using Dataflow in GCP.I am getting following error while executing pipeline.
coder of type class org.apache.beam.sdk.coders.ListCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element [Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:06:02.000Z, companyId=242, startTime=2020-04-01T09:00:33.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:07:47.000Z, companyId=242, startTime=2020-04-01T09:06:03.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:48:25.000Z, companyId=242, startTime=2020-04-01T09:07:48.000Z]]
PCollection is like PCollection< KV < String,List < Person > > > and PCollection< KV < String,Iterable < List < Person > > > >
I have implemented Person as serializable POJO class and override equals and hash method also.But i think i need to write custom ListCoder for person also and register in the pipeline. I am not sure how to resolve this issue,please help.
Here is a working example.
If you clone the repo, under the playground
root dir, run ./gradlew run
, then you can verify the effect. You could also run with ./gradlew run --args='--runner=DataflowRunner --project=$YOUR_PROJECT_ID --tempLocation=gs://xxx/staging --stagingLocation=gs://xxx/staging'
to run it on Dataflow.
The Person
class should look like this if you build it from scratch:
class Person implements Serializable {
public Person(
String businessDay,
String departmentId,
String companyId
) {
this.businessDay = businessDay;
this.departmentId = departmentId;
this.companyId = companyId;
}
public String companyId() {
return companyId;
}
public String businessDay() {
return businessDay;
}
public String departmentId() {
return departmentId;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
Person otherPerson = (Person) other;
return this.businessDay.equals(otherPerson.businessDay)
&& this.departmentId.equals(otherPerson.departmentId)
&& this.companyId.equals(otherPerson.companyId);
}
@Override
public int hashCode(){
return Objects.hash(this.businessDay, this.departmentId, this.companyId);
}
private final String businessDay;
private final String departmentId;
private final String companyId;
}
I recommend
using AutoValue instead of creating POJO from scratch. Here are some examples. You can view the whole project here. The advantage is that you don't have to implement the equals
and hashCode
from scratch every time you create a new object type.
In the KV, if the key is an iterable such as a List, wrap it in an object and explicitly deterministically serialize it (example) because the serialization in Java is underterministic.