I've recently upgraded an existing pipeline from dataflow 1.x to dataflow 2.x, and I'm seeing an error that doesn't make sense to me. I'll put the relevant code below, then include the error I'm seeing.
// This is essentially the final step in our pipeline, where we write
// one of the side outputs from the pipeline to a BigQuery table
results.get(matchedTag)
.apply("CountBackfill", Count.<String>perElement())
.apply("ToReportRow", ParDo.of(new ToReportRow()))
// at this point, there is now a PCollection<TableRow>
.apply("WriteReport", BigQueryIO.writeTableRows()
.to(reportingDataset + ".AttributeBackfill_" + dayStr)
.withSchema(ReportSchema.get())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
/*
* Create a TableRow from a key/value pair
*/
public static class ToReportRow extends DoFn<KV<String, Long>, TableRow> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
KV<String, Long> row = c.element();
c.output(new TableRow()
.set(ReportSchema.ID, row.getKey())
.set(ReportSchema.COUNT, row.getValue()));
}
}
And here is the error I am seeing:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1426) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:989) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:525) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:297) at com.prod.merge.DailyUniqueProfiles.buildPipeline(DUP.java:106) at com.prod.merge.MergePipeline.main(MergePipeline.java:91)
The line .apply("WriteReport", BigQueryIO.writeTableRows()
is line 106 in DUP.java
, so that is the line I suspect is wrong somehow.
Any ideas on what the problem might be?
The solution to this problem ended up being in the maven dependencies. After adding the following dependency, and recompiling with mvn
, the error went away.
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>