from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import os
from pyspark.sql.functions import current_timestamp
import sys
def move_file(bucket, key, filename):
s3_resource = boto3.resource('s3')
dest_key = 'dcgs_abv/upload_archive_files'
# Copy object A as object B
s3_resource.Object(bucket, dest_key + filename).copy_from(CopySource=key + filename, ACL='public-read')
s3_resource.Object(bucket,filename).delete()
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
schema= "dcgs"
table_bcs_output = "bcs_output_test"
redshift_schema_table_bcs_output = schema + "." + table_bcs_output
bucket = 'bucket'
prefix = 'key/upload_files/'
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket = bucket, Prefix=prefix)
for object in response['Contents']:
key = object['Key']
if '.txt' in key :
file = os.path.basename(object['Key'])
abv_data = glueContext.create_dynamic_frame_from_options("s3", \
{'paths': ["s3://{}/{}".format(bucket, prefix)], \
"recurse":True, 'groupFiles': 'inPartition'}, format="csv", delimiter='\t')
abv_df = abv_data.toDF().withColumn("snapshot_day", current_timestamp())
conparms_r = glueContext.extract_jdbc_conf("reporting", catalog_id = None)
abv_df.write\
.format("com.databricks.spark.redshift")\
.option("url", "jdbc:redshift://rs-reporting.cy2mjiqdtk9k.us-east-1.redshift.amazonaws.com:8192/rptg")\
.option("dbtable", redshift_schema_table_bcs_output)\
.option("user", conparms_r['user'])\
.option("password", conparms_r['password'])\
.option("aws_iam_role", "arn:aws:iam::123456789:role/redshift_admin_role")\
.option("tempdir", args["TempDir"])\
.option("delimiter", '\t')\
.option("quote","'")\
.mode("append")\
.save()
move_file(bucket, key, file)
job.commit()
I have a job in AWS Glue using Spark 2.4 Python3 (Glue Version 1.0). I have tried every way I can find to set the delimiter for the Redshift load. I have tried .option("sep",'\t')
, .option("separator",'\t')
and quite a few others.
The file in question is a tab separated text file. I have opened it with open office using the tab separator and ' for the text.
Can someone tell me where I am going wrong?
The answer, which I had to dig around a lot to find as it isn't specifically obvious in the docs here : https://docs.databricks.com/data/data-sources/aws/amazon-redshift.html is the following:
.option("extracopyoptions","delimiter '\t'")
This will properly set the delimiter during the COPY command that is being executed.