I'm setting up an ETL pipeline from AWS Aurora to BigQuery and am using Glue to do so. The goal is to be able to use arbitrary SQL as input, so I'm dumping to S3 as Parquet, crawling that with a Glue Crawler and using the Glue Catalog as a source for the ETL job. These parts work, and I can print out the data and schema of the source table inside the ETL job.
However, when writing to BigQuery, I get a NullPointerException:
2023-06-15 16:28:59,300 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
File "/tmp/simple.py", line 25, in <module>
target = glueContext.write_dynamic_frame.from_options(frame = source, connection_type = "marketplace.spark", connection_options =
{
"connectionName": "bigquery-bigquery-main",
"dataset": "datawarehouse",
"parentProject": "datawarehouse-123123",
"table": "simple",
"temporaryGcsBucket": "bucket-name"
}
, transformation_ctx = "target")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 802, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 331, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 354, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 39, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 32, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o102.pyWriteDynamicFrame.
: java.lang.RuntimeException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:69)
at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:111)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at com.amazonaws.services.glue.marketplace.connector.SparkCustomDataSink.writeDynamicFrame(CustomDataSink.scala:45)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:532)
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:87)
at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:66)
... 42 more
This exception pops up very rarely on Google and seems unspecific. I tried tracking it down but I don't know Java so this is a futile endeavour. The connector is only available as 0.24.2 so I can't try newer versions, and I couldn't find out how to activate / use the 0.22.0-2 connector.
The source table is already as simple as can be - for testing, I created a dump with a single column (id = bigint, not nullable) and 1000 records and crawled it as a data source. There are no partition keys or anything else special about the table. So any source data issues are pretty much out. I also tried without a table, with an empty table and with a table defined as id = integer in BigQuery - same results.
What's odd is that I had an error in my credentials.json
earlier and still got the exact same error. (Now it's correct, verified with gcloud service-account activate
.) So it can't have something to do with the target and the actual write, either.
The credentials.json
is present in the container with the correct content, the log and temp paths in S3 get written to (content not useful as far as I can tell) and there are no audit errors, so an IAM issue is unlikely.
I found this guide: https://www.m3tech.blog/entry/load-to-bq-by-aws-glue and followed it along, which was incredibly helpful because it's the only thing I found that lays out the complete process. (Although there are some errors in it, e. g. the table specification while leaving out the project ID doesn't work.) Everything else is extremely spread out, incomplete and contradictory information. Official "documentation" is here: https://aws.amazon.com/marketplace/pp/prodview-sqnd4gn5fykx6#pdp-usage
I tried adding the credentials.json
to the Spark context both in the Python code and in the --conf
parameter (by setting the value to key1=val --conf key2=val...
as suggested elsewhere on SO), same results for either.
I also don't know anything about PySpark, Spark or BigQuery - I'm just an AWS admin who was given this godforsaken task. Can anyone suggest methods of trying to debug this, or maybe ideas what the reason could be?
Solved. Finally got feedback from the Google Cloud team - the issue was that the GCS bucket and the BigQuery dataset were in different regions. Goes to show that the exception can really mean "anything is wrong", given that the same error occurred with a variety of different contexts, most of which probably didn't actually work.
And as an addendum to the guide linked above: There is no need for templating the source code, using the --conf
= key=val --conf key2=val...
approach does work. This means you can use the glue_script
datasource for simple setups like this.