I have a dataset df_1
that looks like this:
my_id scope feat_1 feat_2 value_1 value_2 value_3 date
23784 some_code Three A 30 60 60 2022-01-01
23794 some_code Seven B 60 40 20 2022-01-01
23774 some_cod1 Three A 90 40 60 2022-01-02
22784 some_cod1 Three C 30 10 60 2022-01-01
23564 some_cod2 Four A 20 40 20 2022-01-05
20784 some_cod3 Five A 10 70 40 2022-02-08
I need to perform a simple calculation on it, but since it updates quite often, I want to make sure that all the data is there. For that, I have the following guide df_2
. version
is always increasing and tells me when the newest update happened and I only care about the maximum version
for a certain scope and date.
my_id scope feat_1 feat_2 date version
23784 some_code Three A 2022-01-01 600
23794 some_code Seven B 2022-01-01 600
23774 some_cod1 Three A 2022-01-02 600
22784 some_cod1 Three C 2022-01-01 650
23564 some_cod2 Four A 2022-01-05 650
20784 some_cod3 Five A 2022-02-08 700
20744 some_cod2 Five A 2022-01-05 700
20745 some_cod2 Four C 2022-01-05 700
I want to look at df_2
, group by scope
and date
and get the maximum version
, and then see if all my_id
s are present in df_1
for this version?
What I did:
df_2 = df_2.groupBy(["date", "scope"])['version'].max()
df = df_1.join(df_2, on = ["my_id"], how = "leftanti")
But I get
TypeError: 'GroupedData' object is not subscriptable
Why is that and is my logic incorrect?
TypeError: 'GroupedData' object is not subscriptable
You get this error, because you do .groupBy(["date", "scope"])['version']
.
.groupBy(["date", "scope"])
returns an object of type GroupedData
. With this object you try to do ['version']
. GroupedData object does not have a way to "access" elements using this syntax, i.e. no way to "subscribe" to any of its elements. This is why you get such error notice.
You could do what you want using window functions. E.g. calculate max 'version' over every window having the same 'scope' and 'date'. Then, filter
on this max value and do the same "leftanti" join that you tried to do.
Inputs:
from pyspark.sql import functions as F, Window as W
df_1 = spark.createDataFrame(
[(23784, 'some_code', 'Three', 'A', 30, 60, 60, '2022-01-01'),
(23794, 'some_code', 'Seven', 'B', 60, 40, 20, '2022-01-01'),
(23774, 'some_cod1', 'Three', 'A', 90, 40, 60, '2022-01-02'),
(22784, 'some_cod1', 'Three', 'C', 30, 10, 60, '2022-01-01'),
(23564, 'some_cod2', 'Four', 'A', 20, 40, 20, '2022-01-05'),
(20784, 'some_cod3', 'Five', 'A', 10, 70, 40, '2022-02-08')],
['my_id', 'scope', 'feat_1', 'feat_2', 'value_1', 'value_2', 'value_3', 'date'])
df_2 = spark.createDataFrame(
[(23784, 'some_code', 'Three', 'A', '2022-01-01', 60),
(23794, 'some_code', 'Seven', 'B', '2022-01-01', 600),
(23774, 'some_cod1', 'Three', 'A', '2022-01-02', 600),
(22784, 'some_cod1', 'Three', 'C', '2022-01-01', 65),
(23564, 'some_cod2', 'Four', 'A', '2022-01-05', 65),
(20784, 'some_cod3', 'Five', 'A', '2022-02-08', 70),
(20744, 'some_cod2', 'Five', 'A', '2022-01-05', 70),
(20745, 'some_cod2', 'Four', 'C', '2022-01-05', 70)],
['my_id', 'scope', 'feat_1', 'feat_2', 'date', 'version'])
Script:
w = W.partitionBy('scope', 'date').orderBy(F.desc('version'))
df_2 = (df_2
.withColumn('max_version', F.first('version').over(w))
.filter('version = max_version')
)
df = df_1.join(df_2, on=["my_id"], how="leftanti")
df.show()
# +-----+---------+------+------+-------+-------+-------+----------+
# |my_id| scope|feat_1|feat_2|value_1|value_2|value_3| date|
# +-----+---------+------+------+-------+-------+-------+----------+
# |23784|some_code| Three| A| 30| 60| 60|2022-01-01|
# |23564|some_cod2| Four| A| 20| 40| 20|2022-01-05|
# +-----+---------+------+------+-------+-------+-------+----------+