Search code examples
dataframeapache-sparkpysparkgroup-bytypeerror

TypeError: GroupedBy object is not subscriptable


I have a dataset df_1 that looks like this:

my_id        scope         feat_1     feat_2    value_1    value_2     value_3          date
23784    some_code          Three          A         30         60          60    2022-01-01
23794    some_code          Seven          B         60         40          20    2022-01-01
23774    some_cod1          Three          A         90         40          60    2022-01-02
22784    some_cod1          Three          C         30         10          60    2022-01-01
23564    some_cod2           Four          A         20         40          20    2022-01-05
20784    some_cod3           Five          A         10         70          40    2022-02-08

I need to perform a simple calculation on it, but since it updates quite often, I want to make sure that all the data is there. For that, I have the following guide df_2. version is always increasing and tells me when the newest update happened and I only care about the maximum version for a certain scope and date.

my_id        scope         feat_1     feat_2                   date     version
23784    some_code          Three          A             2022-01-01         600
23794    some_code          Seven          B             2022-01-01         600   
23774    some_cod1          Three          A             2022-01-02         600       
22784    some_cod1          Three          C             2022-01-01         650
23564    some_cod2           Four          A             2022-01-05         650
20784    some_cod3           Five          A             2022-02-08         700
20744    some_cod2           Five          A             2022-01-05         700
20745    some_cod2           Four          C             2022-01-05         700

I want to look at df_2, group by scope and date and get the maximum version, and then see if all my_ids are present in df_1 for this version?

What I did:

df_2 = df_2.groupBy(["date", "scope"])['version'].max()
df = df_1.join(df_2, on = ["my_id"], how = "leftanti")

But I get

TypeError: 'GroupedData' object is not subscriptable

Why is that and is my logic incorrect?


Solution

  • TypeError: 'GroupedData' object is not subscriptable

    You get this error, because you do .groupBy(["date", "scope"])['version'].

    .groupBy(["date", "scope"]) returns an object of type GroupedData. With this object you try to do ['version']. GroupedData object does not have a way to "access" elements using this syntax, i.e. no way to "subscribe" to any of its elements. This is why you get such error notice.

    You could do what you want using window functions. E.g. calculate max 'version' over every window having the same 'scope' and 'date'. Then, filter on this max value and do the same "leftanti" join that you tried to do.

    Inputs:

    from pyspark.sql import functions as F, Window as W
    df_1 = spark.createDataFrame(
        [(23784, 'some_code', 'Three', 'A', 30, 60, 60, '2022-01-01'),
         (23794, 'some_code', 'Seven', 'B', 60, 40, 20, '2022-01-01'),
         (23774, 'some_cod1', 'Three', 'A', 90, 40, 60, '2022-01-02'),
         (22784, 'some_cod1', 'Three', 'C', 30, 10, 60, '2022-01-01'),
         (23564, 'some_cod2', 'Four', 'A', 20, 40, 20, '2022-01-05'),
         (20784, 'some_cod3', 'Five', 'A', 10, 70, 40, '2022-02-08')],
        ['my_id', 'scope', 'feat_1', 'feat_2', 'value_1', 'value_2', 'value_3', 'date'])
    df_2 = spark.createDataFrame(
        [(23784, 'some_code', 'Three', 'A', '2022-01-01', 60),
         (23794, 'some_code', 'Seven', 'B', '2022-01-01', 600),
         (23774, 'some_cod1', 'Three', 'A', '2022-01-02', 600),
         (22784, 'some_cod1', 'Three', 'C', '2022-01-01', 65),
         (23564, 'some_cod2', 'Four', 'A', '2022-01-05', 65),
         (20784, 'some_cod3', 'Five', 'A', '2022-02-08', 70),
         (20744, 'some_cod2', 'Five', 'A', '2022-01-05', 70),
         (20745, 'some_cod2', 'Four', 'C', '2022-01-05', 70)],
        ['my_id', 'scope', 'feat_1', 'feat_2', 'date', 'version'])
    

    Script:

    w = W.partitionBy('scope', 'date').orderBy(F.desc('version'))
    df_2 = (df_2
        .withColumn('max_version', F.first('version').over(w))
        .filter('version = max_version')
    )
    df = df_1.join(df_2, on=["my_id"], how="leftanti")
    
    df.show()
    # +-----+---------+------+------+-------+-------+-------+----------+
    # |my_id|    scope|feat_1|feat_2|value_1|value_2|value_3|      date|
    # +-----+---------+------+------+-------+-------+-------+----------+
    # |23784|some_code| Three|     A|     30|     60|     60|2022-01-01|
    # |23564|some_cod2|  Four|     A|     20|     40|     20|2022-01-05|
    # +-----+---------+------+------+-------+-------+-------+----------+