Search code examples
sqlarraysapache-sparkpysparkapache-spark-sql

How to join two tables with aggregation


I have two pyspark dataframes:

one is:

name start end
bob    1   3
john   5   8

and second is:

day outcome
  1   a
  2   c
  3   d
  4   a
  5   e
  6   c
  7   u
  8   l

And i need concat days result for every person, like

bob  acd

john  ecul

Is it possible to do so in pyspark?


Solution

  • Use spark-sql. I used scala, but SQL in pyspark is exactly the same and I believe you can easily transform if there is any difference for pyspark.

    Join two dataframes, use collect_list() to get array of outcome, then use concat_ws() to concatenate array to string:

    val dF1 = Seq(
    ("bob", 1, 3),
    ("john",  5, 8)
    ).toDF("name","start","end")
    
    dF1.createOrReplaceTempView("dF1")
    
    val dF2 = Seq(
    (1, "a"),
    (2, "c"),
    (3, "d"),
    (4, "a"),
    (5, "e"),
    (6, "c"),
    (7, "u"),
    (8, "l")
    ).toDF("day","outcome")
    
    dF2.createOrReplaceTempView("dF2")
    
    
    spark.sql(""" 
    select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
    from
    (select d1.name, e.day 
      from dF1 d1 
           lateral view explode(sequence(d1.start, d1.end)) e as day
    )d1
    left join dF2 d2 on d1.day=d2.day
    group by d1.name
    """).show(100, false)
    

    Result:

    +----+-------+
    |name|outcome|
    +----+-------+
    |bob |acd    |
    |john|ecul   |
    +----+-------+
    

    Fixing OOM:

    spark.sql(""" 
    select d1.name, concat_ws('',collect_list(d2.outcome)) outcome
    from dF1 d1 
    left join dF2 d2 on d1.start<=d2.day and  d1.end>=d2.day
    group by d1.name
    """).show(100, false)