I am trying to run an ETL job on glue where I am extracting data into a spark dataframe from a mongodb into glue and load it into snowflake.
This is the sample schema of the Spark dataframe
|-- login: struct (nullable = true)
| |-- login_attempts: integer (nullable = true)
| |-- last_attempt: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- notifications: struct (nullable = true)
| |-- bot_review_queue: boolean (nullable = true)
| |-- bot_review_queue_web_push: boolean (nullable = true)
| |-- bot_review_queue_web_push_admin: boolean (nullable = true)
| |-- weekly_account_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| |-- weekly_summary: struct (nullable = true)
| | |-- enabled: boolean (nullable = true)
| | |-- day: integer (nullable = true)
| | |-- hour: integer (nullable = true)
| | |-- minute: integer (nullable = true)
|-- query: struct (nullable = true)
| |-- email_address: string (nullable = true)
I am trying to load the data into snowflake as it is and struct columns as json payload in snowflake but it throws the following error
An error occurred while calling o81.collectToPython.com.mongodb.spark.exceptions.MongoTypeConversionException:Cannot cast ARRAY into a StructType
I also tried to cast the struct columns into string and load it but it throws more or less the same error
An error occurred while calling o106.save. com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType
Really appreciate if I can get some help on it.
code below for casting and loading.
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="mongodb",
connection_options=read_mongo_options)
user_df_cast = user_df.select(user_df.login.cast(StringType()),'name',user_df.notifications.cast(StringType()))
datasinkusers = user_df_cast.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "users").mode("append").save()
If your users
table in Snowflake has the following schema then casting is not required, as the StructType
fields of a SparkSQL DataFrame will map to the VARIANT
type in Snowflake automatically:
CREATE TABLE users (
login VARIANT
,name STRING
,notifications VARIANT
,query VARIANT
)
Just do the following, no transformations required because the Snowflake Spark Connector understands the data-type and will convert to appropriate JSON representations on its own:
user_df = glueContext.create_dynamic_frame.from_options(
connection_type="mongodb",
connection_options=read_mongo_options
)
user_df
.toDF()
.write
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("dbtable", "users")
.mode("append")
.save()
If you absolutely need to store the StructType
fields as plain JSON strings, you'll need to explicitly transform them using the to_json
SparkSQL function:
from pyspark.sql.functions import to_json
user_df_cast = user_df.select(
to_json(user_df.login),
user_df.name,
to_json(user_df.notifications)
)
This will store JSON strings as simple VARCHAR
types which will not let you leverage Snowflake's semi-structured data storage and querying capabilities directly without a PARSE_JSON
step (inefficient).
Consider using the VARIANT
approach shown above, which will allow you to perform queries on the fields directly:
SELECT
login:login_attempts
,login:last_attempt
,name
,notifications:weekly_summary.enabled
FROM users