Search code examples
pythonpysparkdatabricks

Are there alternatives to a for loop when parsing free text in Python/PySpark?


I have to read in data in Databricks Python/PySpark but the format is not the usual CSV or JSON so I have to iterate over a for loop. As a result it's very slow.

The data looks like this, for millions of rows. It's not the same format each row, although there are certain common formats:

HEADER0123 a bunch of spaces ACCTNUM999787666 more numbers ABC2XYZ some text

So to parse I read from s3 as text and get the data by character position:

raw_text = (spark.read
    .format("text")
    .option("mode", "PERMISSIVE")
    .option("header", "false")
    .option("inferSchema","false")
    .load(my_path)) 

my_list = []
input = raw_text.collect()
for row in input:
  line = row[0].strip()
  header = line[0:6]
  acct = line[6:9]
  my_list.append(header, acct)
  etc.

Then later I create dataframes:

df = spark.createDataFrame(my_list, "header string, acct int")

Even though I have experience with Spark dataframes this is the only way I can think of due to the unusual format.

Is there a way to leverage Spark to process this kind of data? Or a way that doesn't require a for loop?


Solution

  • you're looking for substring()


    my_list = []
    input = raw_text.collect()
    for row in input:
      line = row[0].strip()
      header = line[0:6]
      acct = line[6:9]
      my_list.append(header, acct)
    
    df = spark.createDataFrame(my_list, "header string, acct int")
    

    is same as

    df = (
      raw_text
      .withColumn('header', F.substring('value', 0, 6))
      .withColumn('acct', F.substring('value', 7, 3))
      .drop('value')
    )
    

    Also note that if each line in your input file is fixed length and has header and account fields separated by space then you can still read it as a csv.

    spark.read.option("delimiter", " ").csv(file)
    # or
    spark.read.csv(file, sep=' ')