Search code examples
amazon-web-servicesaws-glue

How to query Views in Athena in a AWS Glue ETL job


I have a few views in AWS Athena that are being accessed in our catalog from another AWS account/team. AWS does not natively support accessing views through a Glue ETL job as I get an "error code 10" when trying to do so. I can access the data just fine in Athena. How can I get around this?


Solution

  • One Method you can do is use Boto3.

       import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    import boto3 #this is not generated and needs to be placed here
    from awsglue.dynamicframe import DynamicFrame
    
    print("Glue Job Started") #not needed but will help to make sure the job started when troubleshooting
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext() ##setting SparkContext
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    glueContext._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
    spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions to access someone else's data catalog
    
    #below is for establishing boto3 client connectivity
    athena_client = boto3.client('athena') #needed
    print("Athena Client Created") #not needed but good for troubleshooting
    s3 = boto3.client('s3') #needed
    
    # Below block of code is to delete the current data in your s3 folder as boto3 will bring the data from the views here first and then run your query. If you want to overwrite the previous data, you will need this. Otherwise, it is optional.
    s3resource = boto3.resource('s3')
    bucket = s3resource.Bucket('[your bucket]) #replace your bucket with your s3 internal link
    for obj in bucket.objects.filter(Prefix='ETL/shared/icoms_hp/'): # delete from new path
    s3resource.Object(bucket.name,obj.key).delete()
    
    #this is the start of your main query
    query = """
    UNLOAD (
    #write SQL query here
    )
    TO '[internal S3 path]'. #this is where you want the final data to be populated in 
    your s3 
    path
     WITH (
    format = 'PARQUET', #your format you want to output
    compression = 'SNAPPY'
    );
    """
     #below block will save the temp data during execution
     response = athena_client.start_query_execution(
     QueryString=query,
     ResultConfiguration={
     'OutputLocation': '[your temp s3 output folder]'}
     )
    
    #below block is for troubleshooting and seeing the query in athena
    queryid = response['QueryExecutionId']
    print(queryid)
    status = athena_client.get_query_execution(QueryExecutionId=queryid). 
    ['QueryExecution']['Status']['State']
    while status.upper() in ['QUEUED', 'RUNNING']:
    status = athena_client.get_query_execution(QueryExecutionId=queryid). 
    ['QueryExecution']['Status']['State']
    print(f"status - {status}")