Search code examples
pythonpysparkpandas-udf

python udf iterator -> iterator giving outputted more rows error


Have dataframe with text column CALL_TRANSCRIPT (string format) and pii_allmethods column (array of string). Trying to search Call_Transcripts for strings in array & mask using pyspark pandas udf. Getting outputted more than input rows errors. Tried couple of ways to troubleshoot , but not able to resolve.

Inner for loop is to go through pii_list array and replace call_transcript (text variable) with mask value. yield is after inner loop is done , so not clear why it would return more rows than input

NOTE: I have Spark UDF which is working , for performance improvements trying pandas udf

dfs = dfs.withColumn('FULL_TRANSCRIPT', pu_mask_all_pii(col("CALL_TRANSCRIPT"), 
col("pii_allmethods")))

**Python UDF function :** 

@pandas_udf("string")
def pu_mask_all_pii(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> 
Iterator[pd.Series]:

for text, pii_list in iterator:
    pii_list = sorted(pii_list,key=len, reverse=True)
    
    strtext = str(text)
    for pii in pii_list:
     if len(pii) > 1:
      mask = len(pii) * 'X'   
      strtext = str(re.sub(re.escape(pii), mask,strtext.encode(),flags=re.IGNORECASE))

    
    yield strtext   


     **PythonException:** An exception was thrown from a UDF: 'AssertionError: Pandas 
     SCALAR_ITER UDF outputted more rows than input rows.'. Full traceback below:

Solution

  • Setup

    df.show()
    
    +--------------------+-------------------+
    |     CALL_TRANSCRIPT|     pii_allmethods|
    +--------------------+-------------------+
    |foo bar <name> ba...|  [<name>, <phone>]|
    |    xyz defgh <name>|[<name>, <address>]|
    |                 pqr|          [<phone>]|
    +--------------------+-------------------+
    

    Solution

    There's no need to use complex pandas UDF function here; instead, you can simply use spark UDF to operate on one row at a time and perform the replacement

    import re
    
    @F.udf
    def mask(text, tokens):
        pat = '|'.join(map(re.escape, tokens))
        return re.sub(pat, lambda g: 'X' * len(g.group()), text, flags=re.IGNORECASE)
    
    df = df.withColumn('FULL_TRANSCRIPT', mask('CALL_TRANSCRIPT', 'pii_allmethods'))
    

    Alternative solution (using Pandas UDF batching)

    @F.pandas_udf('string')
    def mask(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
        # a: is a pandas series of CALL_TRANSCRIPT
        # b: is a pandas series of pii_allmethods
        for a, b in iterator: 
            result = []
    
            # zip the series to perform the replacement on one row at a time
            for text, tokens in zip(a, b): 
                pat = '|'.join(map(re.escape, tokens))
                text = re.sub(pat, lambda g: 'X' * len(g.group()), text, flags=re.IGNORECASE)
                result.append(text)
    
            # yield the result back to caller for the rows in this batch
            yield pd.Series(result)
    
    df = df.withColumn('FULL_TRANSCRIPT', mask('CALL_TRANSCRIPT', 'pii_allmethods'))
    

    Result

    df.show()
    
    +--------------------+-------------------+--------------------+
    |     CALL_TRANSCRIPT|     pii_allmethods|     FULL_TRANSCRIPT|
    +--------------------+-------------------+--------------------+
    |foo bar <name> ba...|  [<name>, <phone>]|foo bar XXXXXX ba...|
    |    xyz defgh <name>|[<name>, <address>]|    xyz defgh XXXXXX|
    |                 pqr|          [<phone>]|                 pqr|
    +--------------------+-------------------+--------------------+