Search code examples
pandasapache-sparkvisual-studio-codepysparkpycharm

Spark: How to debug pandas-UDF in VS Code


I'm looking for a way to debug spark pandas UDF in vscode and Pycharm Community version (place breakpoint and stop inside UDF). At the moment when breakpoint is placed inside UDF debugger doesn't stop.

In the reference below there is described Local mode and Distributed mode.

I'm trying at least to debug in Local mode. Pycharm/VS Code there should be a way to debug local enc by "Attach to Local Process". Just I can not figure out how.

At the moment I can not find any answer how to attach pyspark debugger to local process inside UDF in VS Code(my dev ide).

I found only examples below in Pycharm.

  1. Attache to local process How can PySpark be called in debug mode?

When I try to attach to process I'm getting message below in Pycharm. In VS Code I'm getting msg that process can not be attached.

Attaching to a process with PID=33,692
/home/usr_name/anaconda3/envs/yf/bin/python3.8 /snap/pycharm-community/223/plugins/python-ce/helpers/pydev/pydevd_attach_to_process/attach_pydevd.py --port 40717 --pid 33692
WARNING: The 'kernel.yama.ptrace_scope' parameter value is not 0, attach to process may not work correctly.
         Please run 'sudo sysctl kernel.yama.ptrace_scope=0' to change the value temporary
     or add the 'kernel.yama.ptrace_scope = 0' line to /etc/sysctl.d/10-ptrace.conf to set it permanently.

Process finished with exit code 0
Server stopped.
  1. pyspark_xray https://github.com/bradyjiang/pyspark_xray With this package, it is possible to debug rdds running on worker, but I was not able to adjust package to debug UDFs

Example code, breakpoint doesn't stop inside UDF pandas_function(url_json):

import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
    .master('local[*]') \
    .getOrCreate()
sc = spark.sparkContext

# Create initial dataframe respond_sdf
d_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
            (' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]

schema = StructType([
  StructField('url', StringType(), True),
  StructField('content', StringType(), True)
  ])

jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)

# Pandas UDF 
def pandas_function(url_json):
# Here I want to place breakpoint
    df = pd.DataFrame(eval(url_json['content'][0]))
    return df

# Pnadas UDF transformation applied to respond_sdf
respond_sdf.groupby(F.monotonically_increasing_id()).applyInPandas(pandas_function, schema=schema).show()

Solution

  • This example demonstrates how to use excellent pyspark_exray library to step into UDF functions passed into Dataframe.mapInPandas function

    https://github.com/bradyjiang/pyspark_xray/blob/master/demo_app02/driver.py