Counting distinct substring occurrences in column for every row in PySpark?

My data set looks like this, where I have a comma delimited set of string values in col1 and col2. col3 is the two columns that have been concatted together.

|col1       |col2    |col3   
|a,b,c,d    |a,c,d   |a,b,c,d,a,c,d 
|e,f,g      |f,g,h   |e,f,g,f,g,h

Basically, what I'm trying to do is grab all the values that are separated by commas in col3 and append another column with each of the values and their counts.

I'm trying to get this kind of output in col4:

|col1       |col2    |col3          |col4
|a,b,c,d    |a,c,d   |a,b,c,d,a,c,d |a: 2, b: 1, c: 2, d: 2
|e,f,g      |f,g,h   |e,f,g,f,g,h   |e: 1, f: 2, g: 2, h: 1

I've figured out how to concat the columns together to get to col3, but I'm having a bit of a trouble getting to col4. Here's where I've left off and I'm a bit unsure of where to go from here:

from pyspark.sql.functions import concat, countDistinct

df =, df.col2).alias('col3'), '*')
# +--------------------+ 
# |count(DISTINCT col3)|
# +--------------------+
# |                   2|
# +--------------------+

How do I dynamically count the substrings separated by comma in col3 and make a final column that shows the frequency of each substring for all rows in the dataset?


  • Use UDF

    Here is a way to do this using udfs. First do the data generation.

    from pyspark.sql.types import StringType, StructType, StructField
    from pyspark.sql.functions import concat_ws, udf
    data = [("a,b,c,d", "a,c,d", "a,b,c,d,a,c,d"),
            ("e,f,g", "f,g,h", "e,f,g,f,g,h")
    schema = StructType([
    df = spark.createDataFrame(data=data,schema=schema)

    Then use some native python functions like Counter and json to accomplish the task.

    from collections import Counter
    import json
    def count_occurances(row):
        return json.dumps(dict(Counter(row.split(','))))
    df.withColumn('concat', concat_ws(',', df.col1, df.col2, df.col3))\
      .withColumn('counts', count_occurances('concat')).show(2, False)

    Results in

    |col1   |col2 |col3         |concat                     |counts                          |
    |a,b,c,d|a,c,d|a,b,c,d,a,c,d|a,b,c,d,a,c,d,a,b,c,d,a,c,d|{"a": 4, "b": 2, "c": 4, "d": 4}|
    |e,f,g  |f,g,h|e,f,g,f,g,h  |e,f,g,f,g,h,e,f,g,f,g,h    |{"e": 2, "f": 4, "g": 4, "h": 2}|

    Solution using native pyspark functions

    This solution is a bit more complicated than using the udf but could be more performant due to the lack of udfs. The idea is to concat the three string columns and explode them. In order to know from where each exploded row came we add an index. Double grouping will help us getting the desired result. In the end we join the result back to the original frame to get the desired schema.

    from pyspark.sql.functions import concat_ws, monotonically_increasing_id, split, explode, collect_list
    df = df.withColumn('index', monotonically_increasing_id())
      .withColumn('concat', concat_ws(',', df.col1, df.col2, df.col3))\
      .withColumn('arr_col', split('concat', ','))\
      .withColumn('explode_col', explode('arr_col'))\
      .groupBy('index', 'explode_col').count()\
      .withColumn('concat_counts', concat_ws(':', 'explode_col', 'count'))\
      .groupBy('index').agg(concat_ws(',', collect_list('concat_counts')).alias('grouped_counts')), on='index').show()

    results in

    |      index|   col1| col2|         col3| grouped_counts|
    |94489280512|  e,f,g|f,g,h|  e,f,g,f,g,h|h:2,g:4,f:4,e:2|

    Please note that the json that we created in the udf part is usually way more handy to use than the simple string in the grouped_counts column using native pyspark functions.