I want to read the Parquet files I get from the readSteam() method from a Kafka topic. However, when I read these Parquet files, I get a dataframe that I don't want instead of real values.
import glob
parquet_files = glob.glob("/home/ubuntu/tmp/output" + "/*.parquet")
parquet_df = spark.read.parquet(*parquet_files)
parquet_df.show()
Output:
+----+--------------------+-----------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[22 7B 27 74 69 6...|electricRaw| 0| 127|2023-06-03 22:05:...| 0|
|null|[22 7B 27 74 69 6...|electricRaw| 0| 124|2023-06-03 22:04:...| 0|
|null|[22 7B 27 74 69 6...|electricRaw| 0| 125|2023-06-03 22:04:...| 0|
|null|[22 7B 27 74 69 6...|electricRaw| 0| 126|2023-06-03 22:04:...| 0|
+----+--------------------+-----------+---------+------+--------------------+-------------+
If I want to extract the value's column as below, I get it encrypted.
df = parquet_df.select("value")
df = df.select("value").withColumnRenamed("value", "new_value")
df.show()
Output:
+--------------------+
| new_value|
+--------------------+
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
+--------------------+
My goal is to get the real data in the value column. How can I do it?
Thanks in advance.
The column value is of type byte array, you need to decode it to ASCII format, try this:
from pyspark.sql.functions import udf
byte_array_to_ascii = udf(lambda x: bytearray(x).decode('utf-8'))
df = df.withColumn("ascii_value", byte_array_to_ascii("value"))
df.show()
If it didn't work, it's probably because of format of the value column, then try this:
from pyspark.sql.functions import udf, hex, unhex, col
from pyspark.sql import functions
hex_to_ascii = udf(lambda x: bytearray.fromhex(''.join(filter(str.isalnum, x))).decode('utf-8'))
df = df.withColumn("value", functions.hex(col("value"))).withColumn("ascii_value", hex_to_ascii("value"))
df.show()