I'm exploring Apache Beam dataflow templates provided by GoogleCloudPlatform on Github.
In particular, I'm converting the PubSubToBigQuery template from Java into Kotlin.
By doing so, I get an Overload ambiguity resolution error in the MapElements.input(...).via(...)
transform on line 274
. The error message is:
Error:(62, 22) Kotlin: Cannot choose among the following candidates without completing type inference:
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
public final fun <NewInputT : Any!> via(fn: ((input: BigQueryInsertError!) -> FailsafeElement<String!, String!>!)!): MapElements<BigQueryInsertError!, FailsafeElement<String!, String!>!>! defined in org.apache.beam.sdk.transforms.MapElements
The relevant Java code snippet is:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
writeResult
.getFailedInsertsWithErr()
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
The Kotlin conversion looks like:
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
val failedInserts: PCollection<FailsafeElement<String, String>> =
writeResult.failedInsertsWithErr
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.encodedTypeDescriptor)
.via { e: BigQueryInsertError -> wrapBigQueryInsertError(e) })
.setCoder(FAILSAFE_ELEMENT_CODER)
I do not know how to resolve this. Any help would be nice.
The reason is that the overload rules are slightly different between Java and Kotlin, which means that in Kotlin there are two matching overloads;
public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn)
public <NewInputT> MapElements<NewInputT, OutputT> via(SerializableFunction<NewInputT, OutputT> fn)
The simplest fix is to just explicitly specify the lambda as a SerializableFunction
to get the correct overload;
.via<BigQueryInsertError> (SerializableFunction { e: BigQueryInsertError -> wrapBigQueryInsertError(e) }))