Search code examples
pythonapache-sparkpysparkapache-spark-sqlparquet

How to read the correct values from Parquet files?


I want to read the Parquet files I get from the readSteam() method from a Kafka topic. However, when I read these Parquet files, I get a dataframe that I don't want instead of real values.

import glob

parquet_files = glob.glob("/home/ubuntu/tmp/output" + "/*.parquet")

parquet_df = spark.read.parquet(*parquet_files)

parquet_df.show()

Output:

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[22 7B 27 74 69 6...|electricRaw|        0|   127|2023-06-03 22:05:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   124|2023-06-03 22:04:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   125|2023-06-03 22:04:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   126|2023-06-03 22:04:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+

If I want to extract the value's column as below, I get it encrypted.

df = parquet_df.select("value")
df = df.select("value").withColumnRenamed("value", "new_value")
df.show()

Output:

+--------------------+
|           new_value|
+--------------------+
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
+--------------------+

My goal is to get the real data in the value column. How can I do it?

Thanks in advance.


Solution

  • The column value is of type byte array, you need to decode it to ASCII format, try this:

    from pyspark.sql.functions import udf
    byte_array_to_ascii = udf(lambda x: bytearray(x).decode('utf-8'))
    df = df.withColumn("ascii_value", byte_array_to_ascii("value"))
    df.show()
    

    If it didn't work, it's probably because of format of the value column, then try this:

    from pyspark.sql.functions import udf, hex, unhex, col
    from pyspark.sql import functions
    hex_to_ascii = udf(lambda x: bytearray.fromhex(''.join(filter(str.isalnum, x))).decode('utf-8'))
    df = df.withColumn("value", functions.hex(col("value"))).withColumn("ascii_value", hex_to_ascii("value"))
    df.show()