Search code examples
pythonpysparkapache-spark-sqlaws-glue

Spark dynamic frame show method yields nothing


So I am using AWS Glue auto-generated code to read csv file from S3 and write it to a table over a JDBC connection. Seems simple, Job runs successfully with no error but it writes nothing. When I checked the Glue Spark Dynamic Frame it does contents all the rows (using .count()). But when do a .show() on it yields nothing.

.printSchema() works fine. Tried logging the error while using .show(), but no errors or nothing is printed. Converted the DynamicFrame to the data frame using .toDF and the show method it works. I thought there is some problem with the file, trying to narrow to certain columns. But even with just 2 columns in the file same thing. Clearly marked string in double quotes, still no success.

We have things like JDBC connection that needs to be picked from Glue configuration. Which I guess regular spark data frame can't do. Hence need dynamic frame working.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_options('s3', {'paths': ['s3://bucket/file.csv']}, 'csv', format_options={'withHeader': True,'skipFirst': True,'quoteChar':'"','escaper':'\\'})

datasource0.printSchema()
datasource0.show(5)

Output

root
|-- ORDERID: string
|-- EVENTTIMEUTC: string

Here is what the converting to regular data frame yields.

datasource0.toDF().show()

Output

+-------+-----------------+
|ORDERID|     EVENTTIMEUTC|
+-------+-----------------+
|      2| "1/13/2018 7:50"|
|      3| "1/13/2018 7:50"|
|      4| "1/13/2018 7:50"|
|      5| "1/13/2018 7:50"|
|      6| "1/13/2018 8:52"|
|      7| "1/13/2018 8:52"|
|      8| "1/13/2018 8:53"|
|      9| "1/13/2018 8:53"|
|     10| "1/16/2018 1:33"|
|     11| "1/16/2018 2:28"|
|     12| "1/16/2018 2:37"|
|     13| "1/17/2018 1:17"|
|     14| "1/17/2018 2:23"|
|     15| "1/17/2018 4:33"|
|     16| "1/17/2018 6:28"|
|     17| "1/17/2018 6:28"|
|     18| "1/17/2018 6:36"|
|     19| "1/17/2018 6:38"|
|     20| "1/17/2018 7:26"|
|     21| "1/17/2018 7:28"|
+-------+-----------------+
only showing top 20 rows

Here is the some data.

ORDERID, EVENTTIMEUTC
1, "1/13/2018 7:10"
2, "1/13/2018 7:50"
3, "1/13/2018 7:50"
4, "1/13/2018 7:50"
5, "1/13/2018 7:50"
6, "1/13/2018 8:52"
7, "1/13/2018 8:52"
8, "1/13/2018 8:53"
9, "1/13/2018 8:53"
10, "1/16/2018 1:33"
11, "1/16/2018 2:28"
12, "1/16/2018 2:37"
13, "1/17/2018 1:17"
14, "1/17/2018 2:23"
15, "1/17/2018 4:33"
16, "1/17/2018 6:28"
17, "1/17/2018 6:28"
18, "1/17/2018 6:36"
19, "1/17/2018 6:38"
20, "1/17/2018 7:26"
21, "1/17/2018 7:28"
22, "1/17/2018 7:29"
23, "1/17/2018 7:46"
24, "1/17/2018 7:51"
25, "1/18/2018 2:22"
26, "1/18/2018 5:48"
27, "1/18/2018 5:50"
28, "1/18/2018 5:50"
29, "1/18/2018 5:51"
30, "1/18/2018 5:53"
100, "1/18/2018 10:32"
101, "1/18/2018 10:33"
102, "1/18/2018 10:33"
103, "1/18/2018 10:42"
104, "1/18/2018 10:59"
105, "1/18/2018 11:16"

Solution

  • Reading the docs for DynamicFrame, they're not very explicit about this but there can be cases where there is no underyling DataFrame processed until you call toDF(), so you are essentially calling .show() on something which can be blank:

    To address these limitations, AWS Glue introduces the DynamicFrame. A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. You can resolve these inconsistencies to make your datasets compatible with data stores that require a fixed schema.

    .toDF():

    Converts a DynamicFrame to an Apache Spark DataFrame by converting DynamicRecords into DataFrame fields. Returns the new DataFrame.

    Checking the code here, it seems there could be instances where the underlying Java data frame is blank when you try to print it out: https://github.com/awslabs/aws-glue-libs/blob/f973095b9f2aa784cbcc87681a00da3127125337/awsglue/dynamicframe.py#L78

    def show(self, num_rows=20):
        print(self._jdf.showString(num_rows))
    

    where __init__ relies on a passed in param (jdf) which could be not yet collected:

    def __init__(self, jdf, glue_ctx, name=""):
        self._jdf = jdf
        self.glue_ctx = glue_ctx
        self._ssql_ctx = glue_ctx._ssql_ctx
        self._sc = glue_ctx and glue_ctx._sc
        self._schema = None
        self._lazy_rdd = None
        self.name = name
    

    When calling toDF() the underlying dataframe is processed:

    ... SNIP ...
    scala_options.append(self.glue_ctx.convert_resolve_option(option.path, option.action, option.target))
    
        return DataFrame(self._jdf.toDF(self.glue_ctx._jvm.PythonUtils.toSeq(scala_options)), self.glue_ctx)
    

    The Java docs for .toDF() mention this method converts from an RDD (i.e. it is collecting the results from the workers):

    This can be quite convenient in conversion from a RDD of tuples into a DataFrame with meaningful names.