I have two dataframes like below:
import polars as pl
df1 = pl.DataFrame(
{
"col1": [["a", "b", "c"], ["b", "c", "d"], ["a", "b", "c", "d"], ["d", "e", "f"]],
}
)
df2 = pl.DataFrame(
{
"col1": [["a", "b"], ["b", "c"], ["c", "d"], ["b", "c", "d"]],
"col2": ["1", "2", "3", "4"],
}
)
I want to lookup values from df1 in df2, selecting all of the df2.col2 values for rows where all the values in the df2.col1 list are present in the df1.col1 list. Similar to https://github.com/pola-rs/polars/issues/4670, but I don't think an explode/join/groupby will work due to memory constraints (the dfs I am working with are on the order of 100M rows).
Is there a way to do this in a fully lazy way without apply? (I'm using Polars v0.18.4)
I have an eager pipeline that works:
output = (
df1
.with_columns(
col2 = pl.col("col1").map_elements(
lambda x: df2
.filter(pl.col("col1").list.eval(pl.element().is_in(x)).list.min())
.select(pl.col("col2").flatten())
.get_column("col2")
.to_list(),
return_dtype=pl.List(pl.String)
)
)
)
And a partial eager/lazy pipeline (note that df2 is not lazy, since get_column
is not available for LazyFrames):
output = (
df1
.lazy()
.with_columns(
col2 = pl.col("col1").map_elements(
lambda x: df2
.filter(pl.col("col1").list.eval(pl.element().is_in(x)).list.min())
.select(pl.col("col2").flatten())
.get_column("col2")
.to_list(),
return_dtype=pl.List(pl.String)
),
)
.collect()
)
But my attempt at a fully lazy pipeline without apply fails due to named columns are not allowed in list.eval
.
Actually I'm not convinced it would work anyway, since the column in .then()
likely isn't combined independently for df1 rows.
output = (
df1
.lazy()
.with_context(
df2
.lazy()
.select(pl.all().name.suffix("_2"))
)
.with_columns(
col2 =
pl.when(
pl.col("col1_2").list.eval(pl.element().is_in(pl.col("col1"))).list.min()
).then(
pl.col("col2_2").flatten()
),
)
.collect()
)
# ComputeError: named columns are not allowed in `list.eval`; consider using `element` or `col("")`
Have you confirmed the explode/join/group_by
approach does not work?
with pl.StringCache():
df1 = pl.DataFrame(
{
"col1": [["a", "b", "c"], ["b", "c", "d"], ["a", "b", "c", "d"], ["d", "e", "f"]],
}
).lazy()
df2 = pl.DataFrame(
{
"col1": [["a", "b"], ["b", "c"], ["c", "d"], ["b", "c", "d"]],
"col2": ["1", "2", "3", "4"],
}
).lazy()
col1 = pl.col("col1").cast(pl.List(pl.Categorical)).to_physical().list.sort()
a = df1.select(col1).with_row_index()
b = df2.select(col1, "col2").with_row_index()
c = (
a.explode("col1").join(b.explode("col1"), on="col1")
.group_by("index", "index_right")
.agg("col1")
)
d = c.join(b, left_on="index_right", right_on="index")
e = d.filter(pl.col("col1").hash() == pl.col("col1_right").hash())
f = e.group_by("index").agg("col2")
f.collect().with_columns(
str = pl.format("[{}]", pl.col("col2").list.join(", "))
)
shape: (3, 3)
┌────────┬───────────────────┬──────────────┐
│ row_nr ┆ col2 ┆ str │
│ --- ┆ --- ┆ --- │
│ u32 ┆ list[str] ┆ str │
╞════════╪═══════════════════╪══════════════╡
│ 0 ┆ ["1", "2"] ┆ [1, 2] │
│ 1 ┆ ["3", "4", "2"] ┆ [3, 4, 2] │
│ 2 ┆ ["4", "2", … "1"] ┆ [4, 2, 3, 1] │
└────────┴───────────────────┴──────────────┘