Products tables create and insert scripts:
create table products (product_id varchar, description varchar, attributes jsonb, tax_rate decimal);
insert into products values ('P1', 'Detergent', '{"cost": 45.50, "size": "10g"}', 5.0 );
insert into products values ('P2', 'Bread', '{"cost": 45.5, "size": "200g"}',3.5);
I am trying to store jsonb data from postgresql to maptype data/dictionary format in PySpark, then extract 'cost' and 'size' from 'attributes' column into seperate columns. But PySpark is reading jsonb data as string.
PySpark code to read data from Postgresql:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DecimalType
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.jars", "C:\\Users\\nupsingh\\Documents\\Jars\\postgresql-42.7.3.jar") \
.getOrCreate()
schema = StructType([
StructField('product_id',
StringType(), True),
StructField('description',
StringType(), True),
StructField('attributes',
MapType(StringType(),IntegerType()),False),
StructField('tax_rate',
DecimalType(), True)
])
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/postgres") \
.option("dbtable", "products") \
.option("user", "user1") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.option("schema", schema) \
.load()
df.show()
df.printSchema()
attributes_col = df.select("attributes")
attributes_col.show()
products_df = attributes_col.withColumn("cost",
col("attributes")["cost"]).withColumn(
"size", col("attributes")["size"])
products_df.show()
Short answer: You can't directly load a jsonb
field into a MapType
in Pyspark.
Here's why:
When you read data from a database using PySpark's JDBC data source, PySpark relies on the metadata provided by the JDBC driver to infer the data types of the columns. The JDBC driver maps database data types to Java data types, which are then mapped to PySpark data types. In the case of Postgres jsonb
data type, the postgres JDBC driver maps it to a Java String
type. PySpark, when interacting with the JDBC driver, receives the jsonb
data as a string and infers it as a StringType
in the schema. PySpark does not have a built-in understanding of the specific structure within the jsonb
data.
Is there a workaround?
Yes, you can achieve your goal in two steps: 1- read the jsonb
data as string and 2- use from_json
to convert it to MapType
. Here's how you can do it:
from pyspark.sql.types import StructType, StructField, StringType, MapType, IntegerType, DecimalType
from pyspark.sql.functions import from_json, col
json_schema = MapType(StringType(),IntegerType()))
schema = StructType([
StructField('product_id', StringType(), True),
StructField('description', StringType(), True),
StructField('attributes', MapType(StringType(), IntegerType(), True), True),
StructField('tax_rate', DecimalType(), True),
StructField('attributes_json', StringType(), True) # Add a temporary field to hold the jsonb data
])
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/postgres") \
.option("dbtable", "products") \
.option("user", "user1") \
.option("password", "password") \
.option("driver", "org.postgresql.Driver") \
.option("schema", schema) \
.load()
# Convert the jsonb column to a MapType
df = df.withColumn("attributes", from_json(col("attributes_json"), json_schema)) \
.drop("attributes_json")