Search code examples
hivepysparkavroemrparquet

Spark/Hive - Group data into a "pivot-table" format


I have a very annoying set of files structured like so:

userId string,
eventType string,
source string,
errorCode string,
startDate timestamp,
endDate timestamp

Each file may contain an arbitrary number of records per eventId, with varying eventTypes and sources, and different code and start/end date for each.

Is there a way in Hive or Spark to group all of these together on userId, sort of like a key-value, where the value is the list of all fields associated with the userId? Specifically, I'd like it to be keyed by eventType and source. Basically I want to trade table length for width, sort of like a pivot table. My goal for this is to eventually be stored as Apache Parquet or Avro file format for more speedy analysis in the future.

Here's an example:

Source data:

userId, eventType, source, errorCode, startDate, endDate
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'

Goal:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'

The field names or order doesn't matter, as long as I can distinguish them.

I've tried two methods already to get this to work:

  1. Manually select each combination from the table and join to a master dataset. This works just fine, and parallelizes well, but doesn't allow for an arbitrary number of values for the key fields, and requires the schema to be predefined.
  2. Use Spark to create a dictionary of key:value records where each value is a dictionary. Basically loop through the dataset, add a new key to the dictionary if it doesn't exist, and for that entry, add a new field to the value-dictionary if it doesn't exist. This works beautifully, but is extremely slow and doesn't parallelize well, if it does at all. Also I am not sure whether that would be a Avro/Parquet compatible format.

Are there any alternatives to those two methods? Or even a better structure than what my goal is?


Solution

  • Would you like to have something like this?

    from pyspark.sql.functions import struct, col, create_map, collect_list
    
    df = sc.parallelize([
        ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'],
        ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'],
        ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'],
        ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'],
        ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'],
        ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'],
        ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777']
    ]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate'))
    df.show()
    
    new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\
        withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')]))
    
    new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail'))
    new_df.show()