I have the following data in a spark dataframe:
id | series |
---|---|
1 | {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0} |
2 | {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0} |
I would like to extract the time series data into a format more suitable to work; e.g. the following format
id | timestamp | value |
---|---|---|
1 | "2016-01-31T00:00:00.000Z" | 2000.3 |
1 | "2016-02-31T00:00:00.000Z" | 100000.3 |
1 | "2016-02-31T00:00:00.000Z" | null |
2 | "2012-01-31T00:00:00.000Z" | 6394317.0 |
2 | "2013-02-31T00:00:00.000Z" | 10000317.0 |
I have tried df.groupby('id') and can achieve this in pandas by iterating over the groupby object. e.g:
for fund_id, df_i in df.groupby('id'):
ts = json.loads(df_i['series'].iloc[0]) # get time series
id = df_i['id'].iloc[0] # get id
# storing all timeseries in temp df
df_temp = pd.DataFrame(columns=['id','date','value'])
df_temp['value']=ts.values()
df_temp['date']=ts.keys()
df_temp['id'] = id
# Finally append all df_temp
Any ideas how to do the same thing in spark?
You can, yes. You have to jump through some hoops to convert the json string to an array, explode it, then split the remaining string only on the colon (:) outside the quotes.
Sorry, can't read. Pyspark approach:
jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
df = spark.createDataFrame(
[
(1, jstring)
],
["id", "series"]
)
from pyspark.sql.functions import regexp_replace,explode,split,trim,expr
df.select("id",regexp_replace(regexp_replace("series","\\{",""),"\\}", "").alias("s")). \
select("id",explode(split("s",",").cast("array<string>")).alias("exp_series")). \
select("id",split("exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")). \
select("id",trim(expr("foo[0]")).alias("a"),trim(expr("foo[1]")).alias("b")).show()
Scala approach:
scala> val jstring = """{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}"""
jstring: String = {"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}
scala> val data = Seq((1,jstring))
data: Seq[(Int, String)] = List((1,{"2016-01-31T00:00:00.000Z": null, "2016-06-30T00:00:00.000Z": 6394317.0, "2016-07-31T00:00:00.000Z": 6550781.0, "2016-08-31T00:00:00.000Z": 7107308.0}))
scala> val df = data.toDF("id","series")
df: org.apache.spark.sql.DataFrame = [id: int, series: string]
scala> df.select($"id",regexp_replace(regexp_replace($"series","\\{",""),"\\}", "").alias("s")).
| select($"id",explode(split($"s",",").cast("array<string>")).alias("exp_series")).
| select($"id",split($"exp_series",":(?=([^\"]*\"[^\"]*\")*[^\"]*$)").alias("foo")).
| select($"id",trim($"foo".getItem(0)).alias("a"),trim($"foo".getItem(1)).alias("b")).show(false)
+---+--------------------------+---------+
|id |a |b |
+---+--------------------------+---------+
|1 |"2016-01-31T00:00:00.000Z"|null |
|1 |"2016-06-30T00:00:00.000Z"|6394317.0|
|1 |"2016-07-31T00:00:00.000Z"|6550781.0|
|1 |"2016-08-31T00:00:00.000Z"|7107308.0|
+---+--------------------------+---------+