Search code examples
pythonapache-sparkpysparkapache-spark-sqlspark-structured-streaming

Convert a dataframe containing a list of dictionaries to a several rows in pyspark


I have the following problem, I have a dataframe that contains two columns with a list of dictionaries. The scheme that I have created for the data structure that I have is the following:

        tick_by_tick_schema = StructType([
            StructField('localSymbol', StringType()),
            StructField('tickByTicks', ArrayType(StructType([
                StructField('price', StringType()),
                StructField('size', StringType()),
                StructField('specialConditions', StringType()),
            ]))),
            StructField('domBids', ArrayType(StructType([
                StructField('price_bid', StringType()),
                StructField('size_bid', StringType()),
                StructField('marketMaker_bid', StringType()),
            ])))
        ])

My dataframe is this:

+-----------+----------------+----------------------------------------------------------------------------------------+
|localSymbol|tickByTicks     |domBids                                                                                 |
+-----------+----------------+----------------------------------------------------------------------------------------+
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|
+-----------+----------------+----------------------------------------------------------------------------------------+

Now what I would like to get is something like this:

+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+
|localSymbol|tickByTicks     |domBids                                                                                 |price_bid|marketMaker_bid|price|
+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |CHX            |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |MEMX           |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.8     |NYSENAT        |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.79    |NSDQ           |32.99|
|ALKT       |[{32.99, 100, }]|[{32.8, 1, CHX}, {32.8, 1, MEMX}, {32.8, 1, NYSENAT}, {32.79, 1, NSDQ}, {32.69, 1, BYX}]|32.69    |BYX            |32.99|
+-----------+----------------+----------------------------------------------------------------------------------------+---------+---------------+-----+

I tried this, but obviously it doesn't work xD

df = self.tick_by_tick_data_processed.select(f.col('localSymbol'),f.col('tickByTicks'),f.col('domBids'))\
    .withColumn('price_bid', f.explode(f.col('tickByTicks.price'))) \
    .withColumn('marketMaker_bid', f.explode(f.col('domBids.marketMaker_bid'))) \
    .withColumn('price_bid', f.explode(f.col('domBids.price_bid')))

Solution

  • This might work:

    # It explodes and select all struct columns
    df = self.tick_by_tick_data_processed \
        .withColumn('tick', f.explode(f.col('tickByTicks'))) \
        .withColumn('dom', f.explode(f.col('domBids'))) \
        .select('localSymbol', 'tick.*', 'dom.*')
    
    # OR
    
    # Selecting only desired columns
    df = self.tick_by_tick_data_processed \
        .withColumn('tick', f.explode(f.col('tickByTicks'))) \
        .withColumn('dom', f.explode(f.col('domBids'))) \
        .select('localSymbol', 
                f.col('tick.price').alias('tick_price'), 
                f.col('dom.marketMaker_bid').alias('marketMaker_bid'),
                f.col('dom.price_bid').alias('price_bid'))