Search code examples
python-3.xapache-sparkpysparkapache-spark-sqlrdd

How to convert text log which contains partially json string to the structured in pyspark?


I am trying to create a data frame from a unstructured logs, which partially contains json

2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:34:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}
2020-09-24T08:03:01.633Z 10.1.20.1 {"EventTime":"2020-09-24 13:35:01","sourcename":"local","Keys":-9serverkey,"Type":"status"}

Here what i tried

rdd = session.sparkContext.textFile("F:\\mypath\\rdd_test_log.txt")

dataFrame = rdd.map(lambda data: Row(time= data.split(" ")[0],
                                     ip= data.split(" ")[1],
                                     EventTime=data.split(":")[2])).toDF()


The result is


---------+------------------------+
|EventTime                     |ip       |time                    |
+------------------------------+---------+------------------------+
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
|01.633Z 10.1.20.1 {"EventTime"|10.1.20.1|2020-09-24T08:03:01.633Z|
+------------------------------+---------+------------------------+

Expected:

time                     |ip        |eventtime          |sourcename|Keys        |Type
2020-09-24T08:03:01.633Z |10.1.20.1 |2020-09-24 13:33:01|local     |-9serverkey |status     

SO how can parse this json string to the rdd? OR what should be the approach?

Appreciated for the help..

Thanks


Solution

  • You can use find('{') on the string to pick up an index from which you can get a substring of the JSON text, then parse that JSON.

    dataFrame = (
        rdd.map(lambda l: (l.split(" "), l))
        .map(
            lambda data: Row(
                time=data[0][0], ip=data[0][1], EventTime=data[1][data[1].find("{") :]
            )
        )
        .toDF()
        .select(
            "time",
            "ip",
            f.regexp_replace(f.col("EventTime"), '"Keys":(.*),', '"Keys":"$1",').alias(
                "EventTime"
            ),
        )
    )
    
    dataFrame.show(1, False)
    

    Shows

    +------------------------+---------+---------------------------------------------------------------------------------------------+
    |time                    |ip       |EventTime                                                                                    |
    +------------------------+---------+---------------------------------------------------------------------------------------------+
    |2020-09-24T08:03:01.633Z|10.1.20.1|{"EventTime":"2020-09-24 13:33:01","sourcename":"local","Keys":"-9serverkey","Type":"status"}|
    +------------------------+---------+---------------------------------------------------------------------------------------------+
    

    Then you can parse EventTime into a map that can further be spread into many columns:

    parsed = dataFrame.select(
        "time",
        "ip",
        f.from_json(
            "EventTime",
            StructType(
                [
                    StructField("EventTime", StringType()),
                    StructField("sourcename", StringType()),
                    StructField("Keys", StringType()),
                    StructField("Type", StringType()),
                ]
            ),
        ).alias("eventdetails"),
    )
    

    Now create separate columns from the map

    parsed = (
        parsed.withColumn("eventtime", parsed["eventdetails"].getItem("EventTime"))
        .withColumn("sourcename", parsed["eventdetails"].getItem("sourcename"))
        .withColumn("Keys", parsed["eventdetails"].getItem("Keys"))
        .withColumn("Type", parsed["eventdetails"].getItem("Type"))
        .drop("eventdetails")
    )
    
    parsed.show()
    

    Which gives:

    +--------------------+---------+-------------------+----------+-----------+------+
    |                time|       ip|          eventtime|sourcename|       Keys|  Type|
    +--------------------+---------+-------------------+----------+-----------+------+
    |2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:33:01|     local|-9serverkey|status|
    |2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:34:01|     local|-9serverkey|status|
    |2020-09-24T08:03:...|10.1.20.1|2020-09-24 13:35:01|     local|-9serverkey|status|
    +--------------------+---------+-------------------+----------+-----------+------+
    

    Note that I assumed your JSON is valid. "Keys":-9serverkey is an invalid key/value pair, so I edited your data to "Keys":"-9serverkey"