I have a dataframe with nested columns like below:
df_schema = StructType([
StructField("response", StringType(), True),
StructField("id", StringType(), True),
StructField("data", StructType([
StructField("type", StringType(), True),
StructField("record", StringType(), True),
StructField("enteredCycle", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("modifiedById", StringType(), True),
StructField("years", IntegerType(), True),
StructField("attributes", StructType([
StructField("mass", ArrayType(DoubleType()), True),
StructField("pace", ArrayType(IntegerType()), True),
StructField("reflex", ArrayType(StringType()), True)
]))
]))
])
I am getting this dataframe as a resultant of an API call like below.
def api_call(parameter: str):
response = session.get(f"https:url={parameter}", headers=header_data)
return json.dumps(json.loads(response.text))
udf_call = udf(lambda z:api_call(z),StringType())
I am adding this UDF call to one of my dataframe as an extra column like below:
df = inputDf.withColumn("api_response", udf_call(col("employee_id")))
# Creating an empty df
# I have multiple api calls. So I am appending all of them into
# one single dataframe and then writing all of them at once
# rather than write one record at a time (I have 500,00 records)
empty_rdd = spark.sparkContext.emptyRDD()
empty_df = spark.createDataFrame(empty_rdd, df_schema)
Applying schema:
json_df = (
df
.withColumn("main",from_json(col('api_response'), df_schema)
.select('response', 'id', 'data.type', 'data.record',....'data.attributes.reflex')
)
empty_df = empty_df.unionAll(json_df)
The problem appears when I try to ingest the dataframe as a table:
empty_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')
I see this error:
Job aborted due to stage failure: Task 24 in stage 161.0 failed 4 times, most recent failure: Lost task 24.3 in stage 161.0 (TID 1119) (100.66.2.91 executor 0): org.apache.spark.api.python.PythonException: 'TypeError: can only concatenate str (not "NoneType") to str', from year_loads.py, line 21. Full traceback below:
Traceback (most recent call last):
File "<year_loads.py>", line 21, in <lambda>
File "<year_loads.py>", line 44, in api_call
TypeError: can only concatenate str (not "NoneType") to str
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:595)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)
My assumption is that there are NONEs in the output that cannot be appended to StringType(). So I implemented a logic to convert all NONEs to an empty string like below. I got this implementation from another stackoverflow post here and changed it according to my requirement.
# Get only String Columns
def replace_none_with_empty_str(df: DataFrame):
string_fields = []
for i, f in enumerate(df.schema.fields):
if isinstance(f.dataType, StringType):
string_fields.append(f.name)
exprs = [none_as_blank(x).alias(x) if x in string_fields else x for x in df.columns]
df.select(*exprs)
return df
# NULL/NONE to blank logic
def none_as_blank(x):
return when(col(x) != None, col(x)).otherwise('')
non_nulls_df = replace_none_with_empty_str(empty_df)
non_nulls_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')
But I still see the same error even after applying the above NULL/NONE to blank logic
.
Is my assumption and work around correct? Is my logic being properly applied on all String columns particularly nested stirng columns? If not, could anyone let me know what is the mistake I am doing here and how can I correct it?
Any help is massively appreciated.
If you look at your error stack trace, and more specifically the last entries on your stack:
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
You see that the error seems to happen when executing your UDF, before your post-processing. With the information that we got here, I would guess your api_call
function is the issue. More specifically, your return value might be None
. What about trying something like the following:
def api_call(parameter: str):
response = session.get(f"https:url={parameter}", headers=header_data)
return json.dumps(json.loads(response.text)) or ""
The or ""
would catch you returning a NoneType
and just return an empty string. This means you would have a fully empty record, but that would be the case anyway because your value is of NoneType.