Search code examples
jsonpandaspysparkapache-spark-sqltime-series

How to extract time series json data from spark dataframe using groupby


I have the following data in a spark dataframe:

id series
1 {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}
2 {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0}

I would like to extract the time series data into a format more suitable to work; e.g. the following format

id timestamp value
1 "2016-01-31T00:00:00.000Z" 2000.3
1 "2016-02-31T00:00:00.000Z" 100000.3
1 "2016-02-31T00:00:00.000Z" null
2 "2012-01-31T00:00:00.000Z" 6394317.0
2 "2013-02-31T00:00:00.000Z" 10000317.0

I have tried df.groupby('id') and can achieve this in pandas by iterating over the groupby object. e.g:

for fund_id, df_i in df.groupby('id'):

        ts = json.loads(df_i['series'].iloc[0]) # get time series
        id = df_i['id'].iloc[0] # get id

        # storing all timeseries in temp df
        df_temp = pd.DataFrame(columns=['id','date','value'])
        df_temp['value']=ts.values()
        df_temp['date']=ts.keys()
        df_temp['id'] = id

        # Finally append all df_temp



Any ideas how to do the same thing in spark?


Solution

  • You can, yes. You have to jump through some hoops to convert the json string to an array, explode it, then split the remaining string only on the colon (:) outside the quotes.

    Sorry, can't read. Pyspark approach:

    jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
    df = spark.createDataFrame(
        [
            (1, jstring)
        ],
        ["id", "series"] 
        
    )
    
    from pyspark.sql.functions import regexp_replace,explode,split,trim,expr
    
    df.select("id",regexp_replace(regexp_replace("series","\\{",""),"\\}", "").alias("s")). \
    select("id",explode(split("s",",").cast("array<string>")).alias("exp_series")). \
    select("id",split("exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")). \
    select("id",trim(expr("foo[0]")).alias("a"),trim(expr("foo[1]")).alias("b")).show()
    

    Scala approach:

     scala> val jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
        jstring: String = {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}
    
         scala> val data = Seq((1,jstring))
            data: Seq[(Int, String)] = List((1,{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}))
            
            scala> val df = data.toDF("id","series")
            df: org.apache.spark.sql.DataFrame = [id: int, series: string]
            
            scala> df.select($"id",regexp_replace(regexp_replace($"series","\\{",""),"\\}", "").alias("s")).
                 | select($"id",explode(split($"s",",").cast("array<string>")).alias("exp_series")).
                 | select($"id",split($"exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")).
                 | select($"id",trim($"foo".getItem(0)).alias("a"),trim($"foo".getItem(1)).alias("b")).show(false)
            +---+--------------------------+---------+
            |id |a                         |b        |
            +---+--------------------------+---------+
            |1  |"2016-01-31T00:00:00.000Z"|null     |
            |1  |"2016-06-30T00:00:00.000Z"|6394317.0|
            |1  |"2016-07-31T00:00:00.000Z"|6550781.0|
            |1  |"2016-08-31T00:00:00.000Z"|7107308.0|
            +---+--------------------------+---------+