Search code examples
apache-sparkjoinpysparkmergebroadcast

PySpark: (broadcast) joining two datasets on closest datetimes/unix


I am using PySpark and are close to giving up on my problem. I have two data sets: one very very very large one (set A) and one that is rather small (set B). They are of the form:

Data set A:
variable   | timestampA
---------------------------------
x          | 2015-01-01 09:29:21
y          | 2015-01-01 12:01:57


Data set B:
different information | timestampB
-------------------------------------------
info a                | 2015-01-01 09:30:00
info b                | 2015-01-01 09:30:00
info a                | 2015-01-01 12:00:00
info b                | 2015-01-01 12:00:00

A has many rows where each row has a different time stamp. B has a time stamp every couple of minutes. The main problem here is, that there are no exact time stamps that match in both data sets.

My goal is to join the data sets on the nearest time stamp. An additional problem arises since I want to join in a specific way. For each entry in A, I want to map the entire information for the closest timestamp while duplicating the entry in A. So, the result should look like:

Final data set
variable   | timestampA          | information     | timestampB
--------------------------------------------------------------------------
x          | 2015-01-01 09:29:21 | info a          | 2015-01-01 09:30:00
x          | 2015-01-01 09:29:21 | info b          | 2015-01-01 09:30:00
y          | 2015-01-01 12:01:57 | info a          | 2015-01-01 12:00:00
y          | 2015-01-01 12:01:57 | info b          | 2015-01-01 12:00:00

I am very new to PySpark (and also stackoverflow). I figured that I probably need to use a window function and/or a broadcast join, but I really have no point to start and would appreciate any help. Thank you!


Solution

  • You can you use broadcast to avoid shuffling.

    If understand correctly you have timestamps in set_B which are consequent with some determined interval? If so you can do the following:

    from pyspark.sql import functions as F
    
    # assuming 5 minutes is your interval in set_B
    interval = 'INTERVAL {} SECONDS'.format(5 * 60 / 2)
    
    res = set_A.join(F.broadcast(set_B), (set_A['timestampA'] > (set_B['timestampB'] - F.expr(interval))) & (set_A['timestampA'] <= (set_B['timestampB'] + F.expr(interval))))
    

    Output:

    +--------+-------------------+------+-------------------+
    |variable|         timestampA|  info|         timestampB|
    +--------+-------------------+------+-------------------+
    |       x|2015-01-01 09:29:21|info a|2015-01-01 09:30:00|
    |       x|2015-01-01 09:29:21|info b|2015-01-01 09:30:00|
    |       y|2015-01-01 12:01:57|info a|2015-01-01 12:00:00|
    |       y|2015-01-01 12:01:57|info b|2015-01-01 12:00:00|
    +--------+-------------------+------+-------------------+
    

    If you don't have determined interval then only cross join and then finding min(timestampA - timestampB) interval can do the trick. You can do that with window function and row_number function like following:

    w = Window.partitionBy('variable', 'info').orderBy(F.abs(F.col('timestampA').cast('int') - F.col('timestampB').cast('int')))
    
    res = res.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')