Search code examples
amazon-web-servicesapache-sparkpysparkamazon-redshift

Unable to write to redshift via PySpark


I am trying to write to redshift via PySpark. My Spark version is 3.2.0 Using Scala version 2.12.15.

I am trying to write as per guided here. I have also tried writing via aws_iam_role as explained in the link but it resulted in the same error. All my depndenices match scala version 2.12 which is what my Spark is using.

Enviorment Spark 3.2 Scala 2.12.15 Pyspark 3.2.3 Java 11 Ubuntu 22.04 LTS Python 3.8

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc')\
    .config("spark.jars.packages","com.eclipsesource.minimal-json:minimal-json:0.9.5,com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,com.amazonaws:aws-java-sdk-s3:1.12.437,org.apache.spark:spark-avro_2.12:3.3.2,io.github.spark-redshift-community:spark-redshift_2.12:5.1.0,org.apache.hadoop:hadoop-aws:3.2.2,com.google.guava:failureaccess:1.0")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "etc") \
    .config("spark.hadoop.fs.s3a.secret.key", "etc") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .getOrCreate()

df=spark.read.option("header",True) \
.csv("demo.csv")

df.write \
  .format("io.github.spark_redshift_community.spark.redshift") \
  .option("url", "jdbc:redshift:iam://host:5439/dev?user=user&password=pass") \
  .option("dbtable", "demo") \
  .option("forward_spark_s3_credentials","True") \
  .option("tempdir", "s3a://mubucket/folder") \
  .mode("append") \
  .save()

It throws error

23/03/30 18:51:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/03/30 18:51:50 WARN Utils$: The S3 bucket demo does not have an object lifecycle configuration to ensure cleanup of temporary files. Consider configuring `tempdir` to point to a bucket with an object lifecycle policy that automatically deletes files after an expiration period. For more information, see https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html
23/03/30 18:51:51 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:53 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
23/03/30 18:51:54 ERROR Utils: Aborting task
java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.createDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)'

Credentials were removed due to posting. Via the same creds I am able to create database/tables. The same creds are also able to create files on S3 and have full access.

I was trying to write to redshift via spark. I used the guide till the point but was unable to write. I tried multiple times with different methodologies provided in the manual but all resulted in the same error. This is the manual.


Solution

  • It seems this isn't working. For now, I have created a custom solution that can write to S3 via Spark as a parquet and run copy commands on the database. I have also opened an issue on GitHub about this. You can view it here.

    from pyspark.sql import SparkSession
    import psycopg2
    import boto3
    
    
    def query_redshift(current_query,fetch,url):
          conn_string = url
          conn = psycopg2.connect(conn_string)
          conn.autocommit=True
          cursor = conn.cursor()
          cursor.execute(current_query)
    
          if fetch==1:
              records=cursor.fetchall()
              conn.commit()
              return records
          cursor.close()
          conn.close()
          print ("S3 to Redshift Transfer Successful")
               
          
    def write_to_redshift(df,folder,arn,tablename,jdbc_url,bucket,aws_access_key_id,aws_secret_access_key):
      staging = "s3://"+bucket+"/"+folder
      s3a = staging.replace("s3://","s3a://")
      df.write.parquet(s3a)
      
      query=f"""
    COPY {tablename}
    FROM '{staging}'
    CREDENTIALS 'aws_access_key_id={aws_access_key_id};aws_secret_access_key={aws_secret_access_key}'
    FORMAT AS PARQUET;
      """
      
      try:
        print(query)
        resp = query_redshift(query,0,jdbc_url)
      except Exception as e:
        print(str(e))  
      finally:
        s3 = boto3.resource('s3',aws_access_key_id=aws_access_key_id,
              aws_secret_access_key= aws_secret_access_key)
        bucket = s3.Bucket(bucket)
        delete = bucket.objects.filter(Prefix=folder+"/").delete()
        print(delete)
    
    
    
    def main():
      aws_access_key_id = 'etc'
      aws_secret_access_key = 'etc'
      spark = SparkSession.builder.appName('abc')\
          .config("spark.jars.packages","com.amazon.redshift:redshift-jdbc42:2.1.0.12,com.google.guava:guava:31.1-jre,org.apache.hadoop:hadoop-aws:3.2.2")\
          .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
          .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
          .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
          .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
          .getOrCreate()
          
      df=spark.read.option("header",True) \
      .csv("demo.csv") # replace with whatever dataframe you have
    
      df.show()
      tablename = 'public.demo'
      iam_role=""
      bucket_name = 'bucket'
    
      #S3 Credentials Option 1
    
      jdbc = "host = 'host' port ='5439' dbname = 'dev' user = 'user' password = 'pass' connect_timeout = 30000"
      folder = "cache8"
      
      write_to_redshift(df,folder,iam_role,tablename,jdbc,bucket_name,aws_access_key_id,aws_secret_access_key)
      
    main()
    

    This writes your dataframe to s3 as a parquet then runs a copy command on your db from that data and deletes it from the bucket.