Search code examples
pythonapache-sparkpysparkapache-spark-sql

How to convert NONEs to an empty string in a pyspark dataframe when it has nested columns?


I have a dataframe with nested columns like below:

df_schema = StructType([
    StructField("response", StringType(), True),
    StructField("id", StringType(), True),
    StructField("data", StructType([
      StructField("type", StringType(), True),
      StructField("record", StringType(), True),
      StructField("enteredCycle", StringType(), True),
      StructField("timestamp", StringType(), True),
      StructField("modifiedById", StringType(), True),
      StructField("years", IntegerType(), True),
      StructField("attributes", StructType([
        StructField("mass", ArrayType(DoubleType()), True),
        StructField("pace", ArrayType(IntegerType()), True),
        StructField("reflex", ArrayType(StringType()), True)
      ]))
    ]))
])

I am getting this dataframe as a resultant of an API call like below.

def api_call(parameter: str):
    response = session.get(f"https:url={parameter}", headers=header_data)
    return json.dumps(json.loads(response.text))

udf_call = udf(lambda z:api_call(z),StringType())

I am adding this UDF call to one of my dataframe as an extra column like below:

df = inputDf.withColumn("api_response", udf_call(col("employee_id")))

# Creating an empty df
# I have multiple api calls. So I am appending all of them into 
# one single dataframe and then writing all of them at once
# rather than write one record at a time (I have 500,00 records)

empty_rdd = spark.sparkContext.emptyRDD()
empty_df = spark.createDataFrame(empty_rdd, df_schema)

Applying schema:

json_df = (
    df
    .withColumn("main",from_json(col('api_response'), df_schema)
    .select('response', 'id', 'data.type', 'data.record',....'data.attributes.reflex')
)
empty_df = empty_df.unionAll(json_df)

The problem appears when I try to ingest the dataframe as a table:

empty_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')

I see this error:

Job aborted due to stage failure: Task 24 in stage 161.0 failed 4 times, most recent failure: Lost task 24.3 in stage 161.0 (TID 1119) (100.66.2.91 executor 0): org.apache.spark.api.python.PythonException: 'TypeError: can only concatenate str (not "NoneType") to str', from year_loads.py, line 21. Full traceback below:
Traceback (most recent call last):
  File "<year_loads.py>", line 21, in <lambda>
  File "<year_loads.py>", line 44, in api_call
TypeError: can only concatenate str (not "NoneType") to str

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:595)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1209)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1215)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:521)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2241)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:313)

My assumption is that there are NONEs in the output that cannot be appended to StringType(). So I implemented a logic to convert all NONEs to an empty string like below. I got this implementation from another stackoverflow post here and changed it according to my requirement.

# Get only String Columns
def replace_none_with_empty_str(df: DataFrame):
  string_fields = []
  for i, f in enumerate(df.schema.fields):
      if isinstance(f.dataType, StringType):
          string_fields.append(f.name)
  exprs = [none_as_blank(x).alias(x) if x in string_fields else x for x in df.columns]
  df.select(*exprs)
  return df

# NULL/NONE to blank logic
def none_as_blank(x):
    return when(col(x) != None, col(x)).otherwise('')


non_nulls_df = replace_none_with_empty_str(empty_df)
non_nulls_df.write.mode('overwrite').format('parquet').saveAsTable('dbname.tablename')

But I still see the same error even after applying the above NULL/NONE to blank logic. Is my assumption and work around correct? Is my logic being properly applied on all String columns particularly nested stirng columns? If not, could anyone let me know what is the mistake I am doing here and how can I correct it? Any help is massively appreciated.


Solution

  • If you look at your error stack trace, and more specifically the last entries on your stack:

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:642)
        at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
    

    You see that the error seems to happen when executing your UDF, before your post-processing. With the information that we got here, I would guess your api_call function is the issue. More specifically, your return value might be None. What about trying something like the following:

    def api_call(parameter: str):
        response = session.get(f"https:url={parameter}", headers=header_data)
        return json.dumps(json.loads(response.text)) or ""
    

    The or "" would catch you returning a NoneType and just return an empty string. This means you would have a fully empty record, but that would be the case anyway because your value is of NoneType.