So I am using AWS Glue auto-generated code to read csv file from S3 and write it to a table over a JDBC connection. Seems simple, Job runs successfully with no error but it writes nothing. When I checked the Glue Spark Dynamic Frame it does contents all the rows (using .count()). But when do a .show() on it yields nothing.
.printSchema() works fine. Tried logging the error while using .show(), but no errors or nothing is printed. Converted the DynamicFrame to the data frame using .toDF and the show method it works. I thought there is some problem with the file, trying to narrow to certain columns. But even with just 2 columns in the file same thing. Clearly marked string in double quotes, still no success.
We have things like JDBC connection that needs to be picked from Glue configuration. Which I guess regular spark data frame can't do. Hence need dynamic frame working.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_options('s3', {'paths': ['s3://bucket/file.csv']}, 'csv', format_options={'withHeader': True,'skipFirst': True,'quoteChar':'"','escaper':'\\'})
datasource0.printSchema()
datasource0.show(5)
Output
root
|-- ORDERID: string
|-- EVENTTIMEUTC: string
Here is what the converting to regular data frame yields.
datasource0.toDF().show()
Output
+-------+-----------------+
|ORDERID| EVENTTIMEUTC|
+-------+-----------------+
| 2| "1/13/2018 7:50"|
| 3| "1/13/2018 7:50"|
| 4| "1/13/2018 7:50"|
| 5| "1/13/2018 7:50"|
| 6| "1/13/2018 8:52"|
| 7| "1/13/2018 8:52"|
| 8| "1/13/2018 8:53"|
| 9| "1/13/2018 8:53"|
| 10| "1/16/2018 1:33"|
| 11| "1/16/2018 2:28"|
| 12| "1/16/2018 2:37"|
| 13| "1/17/2018 1:17"|
| 14| "1/17/2018 2:23"|
| 15| "1/17/2018 4:33"|
| 16| "1/17/2018 6:28"|
| 17| "1/17/2018 6:28"|
| 18| "1/17/2018 6:36"|
| 19| "1/17/2018 6:38"|
| 20| "1/17/2018 7:26"|
| 21| "1/17/2018 7:28"|
+-------+-----------------+
only showing top 20 rows
Here is the some data.
ORDERID, EVENTTIMEUTC
1, "1/13/2018 7:10"
2, "1/13/2018 7:50"
3, "1/13/2018 7:50"
4, "1/13/2018 7:50"
5, "1/13/2018 7:50"
6, "1/13/2018 8:52"
7, "1/13/2018 8:52"
8, "1/13/2018 8:53"
9, "1/13/2018 8:53"
10, "1/16/2018 1:33"
11, "1/16/2018 2:28"
12, "1/16/2018 2:37"
13, "1/17/2018 1:17"
14, "1/17/2018 2:23"
15, "1/17/2018 4:33"
16, "1/17/2018 6:28"
17, "1/17/2018 6:28"
18, "1/17/2018 6:36"
19, "1/17/2018 6:38"
20, "1/17/2018 7:26"
21, "1/17/2018 7:28"
22, "1/17/2018 7:29"
23, "1/17/2018 7:46"
24, "1/17/2018 7:51"
25, "1/18/2018 2:22"
26, "1/18/2018 5:48"
27, "1/18/2018 5:50"
28, "1/18/2018 5:50"
29, "1/18/2018 5:51"
30, "1/18/2018 5:53"
100, "1/18/2018 10:32"
101, "1/18/2018 10:33"
102, "1/18/2018 10:33"
103, "1/18/2018 10:42"
104, "1/18/2018 10:59"
105, "1/18/2018 11:16"
Reading the docs for DynamicFrame
, they're not very explicit about this but there can be cases where there is no underyling DataFrame
processed until you call toDF()
, so you are essentially calling .show()
on something which can be blank:
To address these limitations, AWS Glue introduces the DynamicFrame. A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. You can resolve these inconsistencies to make your datasets compatible with data stores that require a fixed schema.
.toDF():
Converts a DynamicFrame to an Apache Spark DataFrame by converting DynamicRecords into DataFrame fields. Returns the new DataFrame.
Checking the code here, it seems there could be instances where the underlying Java data frame is blank when you try to print it out: https://github.com/awslabs/aws-glue-libs/blob/f973095b9f2aa784cbcc87681a00da3127125337/awsglue/dynamicframe.py#L78
def show(self, num_rows=20):
print(self._jdf.showString(num_rows))
where __init__
relies on a passed in param (jdf
) which could be not yet collected:
def __init__(self, jdf, glue_ctx, name=""):
self._jdf = jdf
self.glue_ctx = glue_ctx
self._ssql_ctx = glue_ctx._ssql_ctx
self._sc = glue_ctx and glue_ctx._sc
self._schema = None
self._lazy_rdd = None
self.name = name
When calling toDF()
the underlying dataframe is processed:
... SNIP ...
scala_options.append(self.glue_ctx.convert_resolve_option(option.path, option.action, option.target))
return DataFrame(self._jdf.toDF(self.glue_ctx._jvm.PythonUtils.toSeq(scala_options)), self.glue_ctx)
The Java docs for .toDF()
mention this method converts from an RDD (i.e. it is collecting the results from the workers):
This can be quite convenient in conversion from a RDD of tuples into a
DataFrame
with meaningful names.