Search code examples
pythonamazon-web-servicesetlaws-glue

AWS Glue - o109.pyWriteDynamicFrame. ERROR: relation "xyz" already exists


I have a really simple aws glue visual etl which reads data from a file on an s3 bucket, and then copies it into an aws rds postgresql db. The script for that:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1719925 = glueContext.create_dynamic_frame.from_catalog(database="db_prices_s3", table_name="importzweispalten_csv", transformation_ctx="AWSGlueDataCatalog_node1719925")

Script generated for node PostgreSQL
PostgreSQL_node1719926 = glueContext.write_dynamic_frame.from_catalog(frame=AWSGlueDataCatalog_node1719925, database="db_einkauf_postgre", table_name="einkauf_public_test2", transformation_ctx="PostgreSQL_node1719926")

job.commit()

job image

When I run the job, I get the following error: Error Category: UNCLASSIFIED_ERROR; An error occurred while calling o109.pyWriteDynamicFrame. ERROR: relation "test2" already exists

The cloudwatch logs do not say anything better in detail.... :

24/07/02 13:41:36 INFO ProcessLauncher: postprocessing finished
24/07/02 13:41:36 INFO LogPusher: stopping
24/07/02 13:41:36 INFO CloudWatchMetricsEmitter: Emit job error metrics
24/07/02 13:41:36 INFO ProcessLauncher: Error Category: UNCLASSIFIED_ERROR
24/07/02 13:41:36 INFO ProcessLauncher: enhancedFailureReason: Error Category: UNCLASSIFIED_ERROR; An error occurred while calling o109.pyWriteDynamicFrame. ERROR: relation "test2" already exists
24/07/02 13:41:36 INFO ProcessLauncher: ExceptionErrorMessage failureReason: An error occurred while calling o109.pyWriteDynamicFrame. ERROR: relation "test2" already exists
24/07/02 13:41:36 INFO AmazonHttpClient: Configuring Proxy. Proxy Host: 169.254.76.0 Proxy Port: 8888
24/07/02 13:41:36 INFO ProcessLauncher: postprocessing
24/07/02 13:41:36 INFO ProcessLauncher: Enhance failure reason and emit cloudwatch error metrics.
24/07/02 13:41:36 WARN OOMExceptionHandler: Failed to extract executor id from error message.
24/07/02 13:41:36 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last):
File "/tmp/test7.py", line 19, in <module>
PostgreSQL_node1719926 = glueContext.write_dynamic_frame.from_catalog(frame=AWSGlueDataCatalog_node1719925, database="db_einkauf_postgre", table_name="einkauf_public_test2", transformation_ctx="PostgreSQL_node1719926")
File "/opt/amazon/lib/python3.10/site-packages/awsglue/dynamicframe.py", line 664, in from_catalog
return self._glue_context.write_dynamic_frame_from_catalog(frame, db, table_name, redshift_tmp_dir, transformation_ctx, additional_options, catalog_id)
File "/opt/amazon/lib/python3.10/site-packages/awsglue/context.py", line 395, in write_dynamic_frame_from_catalog
return DataSink(j_sink, self).write(frame)
File "/opt/amazon/lib/python3.10/site-packages/awsglue/data_sink.py", line 39, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.10/site-packages/awsglue/data_sink.py", line 32, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o109.pyWriteDynamicFrame.
org.postgresql.util.PSQLException: ERROR: relation "test2" already exists
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:329)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:315)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:291)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:265)
at org.apache.spark.sql.jdbc.glue.GlueJDBCSink$.createTable(GlueJDBCSink.scala:111)
at org.apache.spark.sql.jdbc.glue.GlueJDBCSink$.save(GlueJDBCSink.scala:39)
at com.amazonaws.services.glue.util.JDBCWrapper.writeDF(JDBCUtils.scala:989)
at com.amazonaws.services.glue.sinks.PostgresDataSink.writeDynamicFrame(PostgresDataSink.scala:42)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)

Before running the job, I created a data crawler, which reads the schema of the postgresql db and then creates the tables in aws glue. One of this table is then configured in the job, where I want my data from the s3 input file.

How can I solve this issue, that the job complains, that the relation already exists? I could not find any answer on any webpage for that problem specifically for aws glue...

I could not find a solution... I tried different privileges for the configured db user... Currently I am thinking if I am using aws glue wrong? In the end I want to transform some data from the s3 input and then save it to the rds postgres.

Versions: Spark: 3.3, Python: 3, Glue: 4.0


Solution

  • The problem was, the tables were owned by another user. So my fix was to create the tables with the user I am using in the aws glue configuration.

    So I assume it might also help if the owner needs to be another user to grant explicitly the right.