Search code examples
apache-sparkapache-beamillegalargumentexception

Apache Beam java.lang.IllegalArgumentException: Invalid lambda deserialization


I am using Apache Beam and writing the data to BigQuery. My Pipeline works fine locally inside intellij using Direct runner and I am able to write to BigQuery table locally inside intellij. However, I am getting exception "java.lang.IllegalArgumentException: Invalid lambda deserialization" as soon as I deploy the code on Spark Cluster.

User class threw exception: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at org.apache.beam.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:712)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at org.apache.beam.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:432)
at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9.evaluate(StreamingTransformTranslator.java:409)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:449)
at org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:438)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:88)
at org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call(SparkRunnerStreamingContextFactory.java:46)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:627)
at org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:626)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:848)
at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:626)
at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:180)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:96)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.somecompany.beam.BeamApplication.run(BeamApplication.java:43)
at com.somecompany.SparkApp.main(L1LoaderSparkApp.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:75)
... 36 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
... 52 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.beam.sdk.io.gcp.bigquery.ErrorContainer.$deserializeLambda$(ErrorContainer.java:33)
... 62 more

My writer class looks like this

public class MyTermsBigQueryWriter implements Serializable {

    @Qualifier("bigQueryProperties")
    private BigQueryProperties bigQueryProperties;

    public BigQueryIO.Write<MyTerms> myTermsWriter() {
        final BigQueryIO.Write<MyTerms> myTermsWrite = BigQueryIO.<MyTerms>write()
                .withMethod(STREAMING_INSERTS)
                .withExtendedErrorInfo()
                .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                .withJsonSchema(getMyTermsSchemaFile())
                .withFormatFunction(new SerializableFunction<MyTerms, TableRow>() {
                    @Override
                    public TableRow apply(MyTerms kt) {
                        return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(kt);
                    }
                })
                .to(getTableSpec())
                .withFormatRecordOnFailureFunction(new SerializableFunction<MyTerms, TableRow>() {
                    @Override
                    public TableRow apply(MyTerms myTerms) {
                        return MyTermsBigQueryWriter.this.getTableRowForMyTermsTable(myTerms);
                    }
                })
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        return myTermsWrite;
    }
}

public class MyTerms implements Serializable {
   public String xx;
   public String xy;
   // Has Equal and HashCode methods
}

If I don't use my writer, it works fine on cluster.

I have tried replacing lambda with method reference, anonymous inner class but no luck.

Any idea?


Solution

  • It turns out that org.apache.avro.generic.GenericData.Record class was causing serialization issues. We removed it to use simple pojo and everything started working fine.