I am trying to create a data frame from a unstructured logs, which partially contains json
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
Here what i tried
rdd = session.sparkContext.textFile("F:\\mypath\\rdd_test_log.txt")
dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
ip= data.split(" ")[1],
EventTime=data.split(":")[2])).toDF()
The result is
---------+------------------------+
|EventTime |ip |time |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+
time |ip |eventtime |sourcename|Keys |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local |-9serverkey |status
SO how can parse this json string to the rdd? OR what should be the approach?
Appreciated for the help..
Thanks
You can use find('{')
on the string to pick up an index from which you can get a substring of the JSON text, then parse that JSON.
dataFrame = (
rdd.map(lambda l: (l.split(" "), l))
.map(
lambda data: Row(
time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
)
)
.toDF()
.select(
"time",
"ip",
f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"$1",').alias(
"EventTime"
),
)
)
dataFrame.show(1, False)
Shows
+------------------------+---------+---------------------------------------------------------------------------------------------+
|time |ip |EventTime |
+------------------------+---------+---------------------------------------------------------------------------------------------+
|2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
+------------------------+---------+---------------------------------------------------------------------------------------------+
Then you can parse EventTime
into a map that can further be spread into many columns:
parsed = dataFrame.select(
"time",
"ip",
f.from_json(
"EventTime",
StructType(
[
StructField("EventTime", StringType()),
StructField("sourcename", StringType()),
StructField("Keys", StringType()),
StructField("Type", StringType()),
]
),
).alias("eventdetails"),
)
Now create separate columns from the map
parsed = (
parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
.withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
.withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
.withColumn("Type", parsed["eventdetails"].getItem("Type"))
.drop("eventdetails")
)
parsed.show()
Which gives:
+--------------------+---------+-------------------+----------+-----------+------+
| time| ip| eventtime|sourcename| Keys| Type|
+--------------------+---------+-------------------+----------+-----------+------+
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01| local|-9serverkey|status|
|2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01| local|-9serverkey|status|
+--------------------+---------+-------------------+----------+-----------+------+
Note that I assumed your JSON is valid. "Keys":-9serverkey
is an invalid key/value pair, so I edited your data to "Keys":"-9serverkey"