I am using Apache Beam and writing the data to BigQuery. My Pipeline works fine locally inside intellij using Direct runner and I am able to write to BigQuery table locally inside intellij. However, I am getting exception "java.lang.IllegalArgumentException: Invalid lambda deserialization" as soon as I deploy the code on Spark Cluster.
User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:712)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:432)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:409)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.somecompany.beam.BeamApplication.run(BeamApplication.java:43)
at com.somecompany.SparkApp.main(L1LoaderSparkApp.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 52 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer.$deserializeLambda$(ErrorContainer.java:33)
... 62 more
My writer class looks like this
public class MyTermsBigQueryWriter implements Serializable {
@Qualifier("bigQueryProperties")
private BigQueryProperties bigQueryProperties;
public BigQueryIO.Write<MyTerms> myTermsWriter() {
final BigQueryIO.Write<MyTerms> myTermsWrite = BigQueryIO.<MyTerms>write()
.withMethod(STREAMING_INSERTS)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.withJsonSchema(getMyTermsSchemaFile())
.withFormatFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms kt) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(kt);
}
})
.to(getTableSpec())
.withFormatRecordOnFailureFunction(new SerializableFunction<MyTerms, TableRow>() {
@Override
public TableRow apply(MyTerms myTerms) {
return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(myTerms);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
return myTermsWrite;
}
}
public class MyTerms implements Serializable {
public String xx;
public String xy;
// Has Equal and HashCode methods
}
If I don't use my writer, it works fine on cluster.
I have tried replacing lambda with method reference, anonymous inner class but no luck.
Any idea?
It turns out that org.apache.avro.generic.GenericData.Record
class was causing serialization issues. We removed it to use simple pojo and everything started working fine.