Search code examples
pythonjsonpostgresqldictionarypyspark

Load jsonb data from postgresql to pyspark and store it in MapType


Products table

Products table data

Products tables create and insert scripts:

create table products (product_id varchar, description varchar, attributes jsonb, tax_rate decimal);

insert into products values ('P1', 'Detergent', '{"cost": 45.50, "size": "10g"}', 5.0 );
insert into products values ('P2',  'Bread',    '{"cost": 45.5, "size": "200g"}',3.5);

I am trying to store jsonb data from postgresql to maptype data/dictionary format in PySpark, then extract 'cost' and 'size' from 'attributes' column into seperate columns. But PySpark is reading jsonb data as string.

PySpark code to read data from Postgresql:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType, DecimalType

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "C:\\Users\\nupsingh\\Documents\\Jars\\postgresql-42.7.3.jar") \
    .getOrCreate()

schema = StructType([

    StructField('product_id',
                StringType(), True),

    StructField('description',
                StringType(), True),

    StructField('attributes',
                MapType(StringType(),IntegerType()),False),

    StructField('tax_rate',
                DecimalType(), True)
])

df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("dbtable", "products") \
    .option("user", "user1") \
    .option("password", "password") \
    .option("driver", "org.postgresql.Driver") \
    .option("schema", schema) \
    .load()

df.show()

df.printSchema()

attributes_col = df.select("attributes")

attributes_col.show()

products_df = attributes_col.withColumn("cost",
                   col("attributes")["cost"]).withColumn(
    "size", col("attributes")["size"])

products_df.show()

Solution

  • Short answer: You can't directly load a jsonb field into a MapType in Pyspark.

    Here's why:

    When you read data from a database using PySpark's JDBC data source, PySpark relies on the metadata provided by the JDBC driver to infer the data types of the columns. The JDBC driver maps database data types to Java data types, which are then mapped to PySpark data types. In the case of Postgres jsonb data type, the postgres JDBC driver maps it to a Java String type. PySpark, when interacting with the JDBC driver, receives the jsonb data as a string and infers it as a StringType in the schema. PySpark does not have a built-in understanding of the specific structure within the jsonb data.

    Is there a workaround? Yes, you can achieve your goal in two steps: 1- read the jsonb data as string and 2- use from_json to convert it to MapType. Here's how you can do it:

    from pyspark.sql.types import StructType, StructField, StringType, MapType, IntegerType, DecimalType
    from pyspark.sql.functions import from_json, col
    
    json_schema = MapType(StringType(),IntegerType()))
    
    schema = StructType([
        StructField('product_id', StringType(), True),
        StructField('description', StringType(), True),
        StructField('attributes', MapType(StringType(), IntegerType(), True), True),
        StructField('tax_rate', DecimalType(), True),
        StructField('attributes_json', StringType(), True)  # Add a temporary field to hold the jsonb data
    ])
    
    df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/postgres") \
        .option("dbtable", "products") \
        .option("user", "user1") \
        .option("password", "password") \
        .option("driver", "org.postgresql.Driver") \
        .option("schema", schema) \
        .load()
    
    # Convert the jsonb column to a MapType
    df = df.withColumn("attributes", from_json(col("attributes_json"), json_schema)) \
            .drop("attributes_json")