Search code examples
apache-sparkpysparkapache-spark-sqldatabricksazure-databricks

Databricks Pyspark - Group related rows


I am parsing an EDI file in Azure Databricks. Rows in the input file are related to other rows based on the order in which they appear. What I need is a way to group related rows together.

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

data = [
    ('book000book title',),
    ('auth001first author',),
    ('auth002second author',),
    ('book003another book',),
    ('auth004third author',),
]

schema = T.StructType([T.StructField('Line', T.StringType())])
books = spark.createDataFrame(data, schema)

books = (books
        .withColumn('RecordType', F.substring(F.col('Line'), 1, 4))
        .withColumn('Sequence', F.substring(F.col('Line'), 5, 3))
        .withColumn('Title', F.when(F.col('RecordType') == 'book', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .withColumn('Author', F.when(F.col('RecordType') == 'auth', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .drop('Line')
        )

window = Window.orderBy('Sequence')
books = (books
         .withColumn('BookID', F.row_number().over(window))
        )

books.show()

Following a book record, the subsequent lines are the authors for that book so those authors should have the same BookID as the last book record before them.

The output of this is:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     2|
|      auth|     002|        null|second author|     3|
|      book|     003|another book|         null|     4|
|      auth|     004|        null| third author|     5|
+----------+--------+------------+-------------+------+

I need to way to assign the correct BookID for each author so that I will be able to group them. The output I'm looking for is:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     1|
|      auth|     002|        null|second author|     1|
|      book|     003|another book|         null|     2|
|      auth|     004|        null| third author|     2|
+----------+--------+------------+-------------+------+

I haven't been able to figure it out. Any help would be much appreciated.


Solution

  • You can use conditional sum aggregation over a window ordered by sequence like this:

    books = (books
             .withColumn('BookID', F.sum(F.when(F.col("RecordType") == "book", 1)).over(window))
             )
    
    books.show()
    #+----------+--------+------------+-------------+------+
    #|RecordType|Sequence|       Title|       Author|BookID|
    #+----------+--------+------------+-------------+------+
    #|      book|     000|  book title|         null|     1|
    #|      auth|     001|        null| first author|     1|
    #|      auth|     002|        null|second author|     1|
    #|      book|     003|another book|         null|     2|
    #|      auth|     004|        null| third author|     2|
    #+----------+--------+------------+-------------+------+