Search code examples
pythonapache-sparkpysparkapache-spark-sqlspark-streaming

How to dynamically apply array column typing in Spark


I have a PySpark DataFrame with a string column that contains JSON data structured as arrays of objects. However, the schema of these JSON objects can vary from row to row.

Here’s an example of two rows in the DataFrame:

+---------------------------------------------------------------------------------------------------+
| column                                                                                       |
+---------------------------------------------------------------------------------------------------+
| [{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}]                 |
| [{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}]    |
+---------------------------------------------------------------------------------------------------+
  • First row: Contains JSON objects with fields _t, id, value, and a nested details object.
  • Second row: Contains JSON objects with fields _t, id, extra_field, and a nested other_details object.

I need to convert the column from a string type to an array type, with the schema dynamically inferred to accommodate all variations in the JSON structure. Currently, my approach only works for the first row and doesn't account for schema variations.

Here’s what I’ve tried so far:

from pyspark.sql.functions import schema_of_json, from_json, col

json_sample = df.select("column").head()[0]  # Sample JSON from the first row
inferred_schema = schema_of_json(json_sample)    # Infer schema from the sample

# Convert the column to array type using the inferred schema
df = df.withColumn("column", from_json(col("column"), inferred_schema))

Issue: The inferred schema only matches the first row's JSON structure. As a result, rows with different schemas (e.g., the second row) fail to parse correctly.

I’m looking for a way to: Dynamically infer the schema for the column to handle all variations in the JSON objects across rows. Apply the inferred schema to convert the column into an array type without losing any data.

Real data example, one row:

[{'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '29b2c17e-e980-4e3f-bd6a-a311f414bd25', 'Operation': 'Add', 'Deductible': {'_id': 'f794288a-6c38-4dd5-bc3a-f3d81e2e824a', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000232200', 'LoadingFee': '0', 'DerivedLmi': '150000.00', 'PricingLmi': '150000.00', 'PureRate': '0.000232200', 'PurePremium': '34.83'}, 'Product': {'NetPremiumWithoutDiscount': '85.64', 'DaFix': '7.99', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '125.94117647058823529411764706', 'DiscountValue': '50.38', 'CommercialPremium': '75.56', 'CommissionPercentage': '0.320000000', 'CommissionValue': '40.301176470588235294117647059', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '75.564705882352941176470588236', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.000503733'}, 'Financial': {'Iof': '5.58', 'IofPercentage': '0.0738', 'GrossPremium': '81.14', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '40.301176470588235294117647059', 'ProLaboreWithoutDiscount': '0', 'Commission': '24.180705882352941176470588236', 'ProLabore': '0', 'Profit': '-3.43', 'NetPremium': '51.38', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '19.98'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '150000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'ca1cc323-0f42-4661-951e-dfad7027d9b6', 'Operation': 'Add', 'Deductible': {'_id': 'ad9c664c-06b4-408a-8e2f-2c664c45a137', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000494500', 'LoadingFee': '0', 'DerivedLmi': '15000.00', 'PricingLmi': '15000.00', 'PureRate': '0.000494500', 'PurePremium': '7.42'}, 'Product': {'NetPremiumWithoutDiscount': '30.8', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '45.294117647058823529411764706', 'DiscountValue': '18.12', 'CommercialPremium': '27.18', 'CommissionPercentage': '0.320000000', 'CommissionValue': '14.494117647058823529411764706', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '27.176470588235294117647058824', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.001812'}, 'Financial': {'Iof': '2.01', 'IofPercentage': '0.0738', 'GrossPremium': '29.19', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '14.494117647058823529411764706', 'ProLaboreWithoutDiscount': '0', 'Commission': '8.696470588235294117647058824', 'ProLabore': '0', 'Profit': '-1.23', 'NetPremium': '18.48', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '12.29'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': '5f809eaf-6a80-4eb3-a36a-e71014d7e888', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '15000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '00e41c0f-eb68-4e24-8898-66fd54e560dd', 'Operation': 'Add', 'Deductible': {'_id': 'b4e2babc-7ea4-49e5-9c73-5b3265b07ccb', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.010913400', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.010913400', 'PurePremium': '32.74'}, 'Product': {'NetPremiumWithoutDiscount': '81.44', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '119.76470588235294117647058824', 'DiscountValue': '47.91', 'CommercialPremium': '71.86', 'CommissionPercentage': '0.320000000', 'CommissionValue': '38.324705882352941176470588237', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '71.858823529411764705882352944', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.023953333'}, 'Financial': {'Iof': '5.30', 'IofPercentage': '0.0738', 'GrossPremium': '77.16', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '38.324705882352941176470588237', 'ProLaboreWithoutDiscount': '0', 'Commission': '22.994823529411764705882352942', 'ProLabore': '0', 'Profit': '-3.26', 'NetPremium': '48.86', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '19.38'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'd044866e-f5f5-4a42-b68f-23aa2c55b498', 'Operation': 'Add', 'Deductible': {'_id': '0f9b9edb-2a35-4300-b1e5-d61e90636c66', 'Value': '1000', 'Percent': '0.15'}, 'Subscription': {'RiskRate': '0.001363100', 'LoadingFee': '0', 'DerivedLmi': '30000.00', 'PricingLmi': '30000.00', 'PureRate': '0.001363100', 'PurePremium': '40.89'}, 'Product': {'NetPremiumWithoutDiscount': '97.74', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '143.73529411764705882352941176', 'DiscountValue': '57.49', 'CommercialPremium': '86.24', 'CommissionPercentage': '0.320000000', 'CommissionValue': '45.995294117647058823529411763', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '86.24117647058823529411764706', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.002874667'}, 'Financial': {'Iof': '6.35', 'IofPercentage': '0.0738', 'GrossPremium': '92.59', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '45.995294117647058823529411763', 'ProLaboreWithoutDiscount': '0', 'Commission': '27.597176470588235294117647059', 'ProLabore': '0', 'Profit': '-3.91', 'NetPremium': '58.64', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '21.66'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '30000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': 'b2df01da-0b28-4425-8e68-1728395804f8', 'Operation': 'Add', 'Deductible': {'_id': '23e3f5e1-1a25-4920-a8f9-2a68bb7505d3', 'Value': '150.00', 'Percent': '0.1'}, 'Subscription': {'RiskRate': '0.005820050', 'LoadingFee': '0', 'DerivedLmi': '2000.00', 'PricingLmi': '2000.00', 'PureRate': '0.005820050', 'PurePremium': '11.64'}, 'Product': {'NetPremiumWithoutDiscount': '39.24', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '57.705882352941176470588235294', 'DiscountValue': '23.08', 'CommercialPremium': '34.62', 'CommissionPercentage': '0.320000000', 'CommissionValue': '18.465882352941176470588235294', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '34.623529411764705882352941176', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.01731'}, 'Financial': {'Iof': '2.56', 'IofPercentage': '0.0738', 'GrossPremium': '37.18', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '18.465882352941176470588235294', 'ProLaboreWithoutDiscount': '0', 'Commission': '11.079529411764705882352941176', 'ProLabore': '0', 'Profit': '-1.57', 'NetPremium': '23.54', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '13.47'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '2000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '7fa48879-43e5-493b-bb90-ff7998232847', 'Operation': 'Add', 'Deductible': {'_id': '6c8d2e6f-5390-4a3b-b14e-79a44d5bec64', 'Value': '500.00', 'Percent': '0.1'}, 'Subscription': {'RiskRate': '0.006439250', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.006439250', 'PurePremium': '19.32'}, 'Product': {'NetPremiumWithoutDiscount': '54.6', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '80.29411764705882352941176471', 'DiscountValue': '32.12', 'CommercialPremium': '48.18', 'CommissionPercentage': '0.320000000', 'CommissionValue': '25.694117647058823529411764707', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '48.176470588235294117647058826', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.01606'}, 'Financial': {'Iof': '3.56', 'IofPercentage': '0.0738', 'GrossPremium': '51.74', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '25.694117647058823529411764707', 'ProLaboreWithoutDiscount': '0', 'Commission': '15.416470588235294117647058824', 'ProLabore': '0', 'Profit': '-2.18', 'NetPremium': '32.76', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '15.62'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}, {'_t': 'UniqueLmiCoverageCalculation', 'CoverageId': '795c1e26-2eb7-4b55-b23a-765850699aeb', 'Operation': 'Add', 'Deductible': {'_id': 'e8996734-39d5-4983-8b41-8d7c23c73a12', 'Value': '0.00', 'Percent': '0'}, 'Subscription': {'RiskRate': '0.000273050', 'LoadingFee': '0', 'DerivedLmi': '3000.00', 'PricingLmi': '3000.00', 'PureRate': '0.000273050', 'PurePremium': '0.82'}, 'Product': {'NetPremiumWithoutDiscount': '17.6', 'DaFix': '7.98', 'DaPercentage': '0.14', 'ProfitPercentage': '0.36'}, 'Commercial': {'CommercialPremiumWithoutDiscount': '25.882352941176470588235294118', 'DiscountValue': '10.35', 'CommercialPremium': '15.53', 'CommissionPercentage': '0.320000000', 'CommissionValue': '8.282352941176470588235294118', 'ProLaborePercentage': '0', 'ProLaboreValue': '0', 'Discount': '0.400000000', 'CommercialLoadingFee': '0.000000000', 'CommercialPremiumWithDiscountAndFee': '15.529411764705882352941176471', 'MinimumPremiumLoadingFee': '0', 'CommercialFee': '0.005176667'}, 'Financial': {'Iof': '1.15', 'IofPercentage': '0.0738', 'GrossPremium': '16.68', 'FinancialCharge': '0.00', 'InstallmentCost': '0.00', 'InstallmentCostPercentage': '0.0', 'FinancialRefundAmount': '0'}, 'Audit': {'CommissionWithoutDiscount': '8.282352941176470588235294118', 'ProLaboreWithoutDiscount': '0', 'Commission': '4.9694117647058823529411764707', 'ProLabore': '0', 'Profit': '-0.70', 'NetPremium': '10.56', 'ProfitPercentage': '0.36', 'CommercialLoadingFee': '0.000000000', 'Discount': '0.400000000', 'ProLaborePercentage': '0', 'CommissionPercentage': '0.320000000', 'MinimumPremiumDA': '0', 'TotalDA': '10.44'}, 'IndemnityPeriod': None, 'InsuredObjects': [{'RiskId': 'a564e803-3ea5-4336-9ae7-5bb70d62916b', 'RiskVersion': '2.1', 'Operation': 'Add', 'InsuredAmount': '3000.00'}]}]

Solution

  • You can create an RDD with the column you concern and then load this RDD into dataframe using spark.read.json(), Spark will infer and merge the schema automatically for you, see below:

    from pyspark.sql.types import ArrayType
    from pyspark.sql.functions import from_json, col
    
    rdd = df.select('column').rdd.map(lambda x: x['column'])
    
    inferred_schema = ArrayType(spark.read.json(rdd).schema)
    
    df = df.withColumn("column", from_json(col("column"), inferred_schema))
    

    You can set samplingRatio (default=1.0) when creating dataframe with spark.read.json() to use a fraction of rows for schema inferring.