Search code examples
regexapache-sparkpysparkbigdata

How to use pyspark regex to correctly break data with pipe delimited with literal pipe inside?


I have a raw databricks table with a single column named "value" which the data is from a CSV with pipe delimited.

The data inside this single column is 5 fields.

The field3 content is a json which contains PIPE inside.

The data looks like:

10|20|{"menu":{"id":"file","menuitem":[{"value":"New","onclick":"CreateNewDoc() || ReplaceDoc() || DeleteDoc()"}]}}|13123|344234

I'm new to pyspark and my data is massive, so i've tried to break the content to a new 5 columns table, using regex in the field 3 with no sucess.

My code looks like:

from pyspark.sql.functions import split, col, regexp_extract
from pyspark.sql.functions import DataFrame


df = spark.sql(
"""
select value from my_table
"""
)

df_out = df.withColumn(field1, split(col("value", "\\|".getItem(0))
           .withColumn(field2, split(col("value", "\\|".getItem(1))
           .withColumn(field3, regex_extract("value", r"\|\{(.*?)\}\|",1))
           .withColumn(field4, split(col("value", "\\|".getItem(3))
           .withColumn(field5, split(col("value", "\\|".getItem(4))

new_table = df_out.select("field1", "field2", "field3", "field4", "field5")

new_table.display()

My result from this code is like some part of the json in field3 break incorrectly to field4 and field5.

Someone can help me to solve this problem?
Maybe consider all between the first { and the last } and treat anything inside as literal?

Thank you!


Solution

  • For the columns after the JSON field, count from the end rather than the beginning, so the pipes in the JSON don't cause the indexes to be wrong.

    fields = split(col("value"), r'\|')
    nfields = size(fields)
    df_out = df.withColumn(field1, fields.getItem(0))
               .withColumn(field2, fields.getItem(1))
               .withColumn(field3, regexp_extract(col("value"), r"\|\{(.*?)\}\|",1))
               .withColumn(field4, fields.getItem(nfields-2))
               .withColumn(field5, fields.getItem(nfields-1))