Search code examples
apache-sparkpysparkdistributed-computing

Why does my PySpark regular expression not give more than the first row?


Taking inspiration from this answer: https://stackoverflow.com/a/61444594/4367851 I have been able to split my .txt file into columns in a Spark DataFrame. However, it only gives me the first game - even though the sample .txt file contains many more.

My code:

basefile = spark.sparkContext.wholeTextFiles("example copy 2.txt").toDF().\
    selectExpr("""split(replace(regexp_replace(_2, '\\\\n', ','), ""),",") as new""").\
    withColumn("Event", col("new")[0]).\
    withColumn("White", col("new")[2]).\
    withColumn("Black", col("new")[3]).\
    withColumn("Result", col("new")[4]).\
    withColumn("UTCDate", col("new")[5]).\
    withColumn("UTCTime", col("new")[6]).\
    withColumn("WhiteElo", col("new")[7]).\
    withColumn("BlackElo", col("new")[8]).\
    withColumn("WhiteRatingDiff", col("new")[9]).\
    withColumn("BlackRatingDiff", col("new")[10]).\
    withColumn("ECO", col("new")[11]).\
    withColumn("Opening", col("new")[12]).\
    withColumn("TimeControl", col("new")[13]).\
    withColumn("Termination", col("new")[14]).\
    drop("new")


basefile.show()

Output:

+--------------------+---------------+-----------------+--------------+--------------------+--------------------+-----------------+-----------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+
|               Event|          White|            Black|        Result|             UTCDate|             UTCTime|         WhiteElo|         BlackElo|     WhiteRatingDiff|     BlackRatingDiff|        ECO|             Opening|         TimeControl|         Termination|
+--------------------+---------------+-----------------+--------------+--------------------+--------------------+-----------------+-----------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+
|[Event "Rated Cla...|[White "BFG9k"]|[Black "mamalak"]|[Result "1-0"]|[UTCDate "2012.12...|[UTCTime "23:01:03"]|[WhiteElo "1639"]|[BlackElo "1403"]|[WhiteRatingDiff ...|[BlackRatingDiff ...|[ECO "C00"]|[Opening "French ...|[TimeControl "600...|[Termination "Nor...|
+--------------------+---------------+-----------------+--------------+--------------------+--------------------+-----------------+-----------------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+

Input file:

[Event "Rated Classical game"]
[Site "https://lichess.org/j1dkb5dw"]
[White "BFG9k"]
[Black "mamalak"]
[Result "1-0"]
[UTCDate "2012.12.31"]
[UTCTime "23:01:03"]
[WhiteElo "1639"]
[BlackElo "1403"]
[WhiteRatingDiff "+5"]
[BlackRatingDiff "-8"]
[ECO "C00"]
[Opening "French Defense: Normal Variation"]
[TimeControl "600+8"]
[Termination "Normal"]

1. e4 e6 2. d4 b6 3. a3 Bb7 4. Nc3 Nh6 5. Bxh6 gxh6 6. Be2 Qg5 7. Bg4 h5 8. Nf3 Qg6 9. Nh4 Qg5 10. Bxh5 Qxh4 11. Qf3 Kd8 12. Qxf7 Nc6 13. Qe8# 1-0

[Event "Rated Classical game"]
.
.
.

Each game starts with [Event so I feel like it should be doable as the file has repeating structure, alas I can't get it to work.

Extra points:

  1. I don't actually need the move list so if it's easier they can be deleted.
  2. I only want the content of what is inside the " " for each new line once it has been converted to a Spark DataFrame.

Many thanks.


Solution

  • wholeTextFiles reads each file into a single record. If you read only one file, the result will a RDD with only one row, containing the whole text file. The regexp logic in the question returns only one result per row and this will be the first entry in the file.

    Probably the best solution would be to split the file at the os level into one file per game (for example here) so that Spark can read the multiple games in parallel. But if a single file is not too big, splitting the games can also be done within PySpark:

    Read the file(s):

    basefile = spark.sparkContext.wholeTextFiles(<....>).toDF()
    

    Create a list of columns and convert this list into a list of column expressions using regexp_extract:

    from pyspark.sql import functions as F
    
    cols = ['Event', 'White', 'Black', 'Result', 'UTCDate', 'UTCTime', 'WhiteElo', 'BlackElo', 'WhiteRatingDiff', 'BlackRatingDiff', 'ECO', 'Opening', 'TimeControl', 'Termination']
    cols = [F.regexp_extract('game', rf'{col} \"(.*)\"',1).alias(col) for col in cols]
    

    Extract the data:

    1. split the whole file into an array of games
    2. explode this array into single records
    3. delete the line breaks within each record so that the regular expression works
    4. use the column expressions defined above to extract the data
    basefile.selectExpr("split(_2,'\\\\[Event ') as game") \
      .selectExpr("explode(game) as game") \
      .withColumn("game", F.expr("concat('Event ', replace(game, '\\\\n', ''))")) \
      .select(cols) \
      .show(truncate=False)
    

    Output (for an input file containing three copies of the game):

    +---------------------+-----+-------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------------------+-----------+-----------+
    |Event                |White|Black  |Result|UTCDate   |UTCTime |WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|Opening                         |TimeControl|Termination|
    +---------------------+-----+-------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------------------+-----------+-----------+
    |Rated Classical game |BFG9k|mamalak|1-0   |2012.12.31|23:01:03|1639    |1403    |+5             |-8             |C00|French Defense: Normal Variation|600+8      |Normal     |
    |Rated Classical game2|BFG9k|mamalak|1-0   |2012.12.31|23:01:03|1639    |1403    |+5             |-8             |C00|French Defense: Normal Variation|600+8      |Normal     |
    |Rated Classical game3|BFG9k|mamalak|1-0   |2012.12.31|23:01:03|1639    |1403    |+5             |-8             |C00|French Defense: Normal Variation|600+8      |Normal     |
    +---------------------+-----+-------+------+----------+--------+--------+--------+---------------+---------------+---+--------------------------------+-----------+-----------+