I will try to explain this as good as possible, because I am unfortunately quiet new to polars. I have a large time series dataset where each separate timeseries is identified with a group_id
. Additionally, there is a time_idx
column that identifies which of the possible time series step is present and have a corresponding target value if present. As a minimal example, consider the following:
pl.Config(fmt_table_cell_list_len=10, fmt_str_lengths=80)
min_df = pl.DataFrame({
"group_idx": [0, 1, 2, 3], "time_idx": [[0, 1, 2, 3], [2, 3], [0, 2, 3], [0,3]]
})
shape: (4, 2)
┌───────────┬──────────────┐
│ group_idx ┆ time_idx │
│ --- ┆ --- │
│ i64 ┆ list[i64] │
╞═══════════╪══════════════╡
│ 0 ┆ [0, 1, 2, 3] │
│ 1 ┆ [2, 3] │
│ 2 ┆ [0, 2, 3] │
│ 3 ┆ [0, 3] │
└───────────┴──────────────┘
Here, the time range in the dataset is 4 steps long, but not for all individual series all time steps are present. So while group_idx=0
has all present steps, group_idx=0
only has step 0 and 3, meaning that for step 1 and 2 no recorded target value is present.
Now, I would like to obtain all possible sub sequences so that we start from each possible time step for a given sequence length and maximally go to the max_time_step
(in this case 3). For example, for sequence_length=3
, the expected output would be:
result_df = pl.DataFrame({
"group_idx": [0, 0, 1, 1, 2, 2, 3, 3],
"time_idx": [[0, 1, 2, 3], [0, 1, 2, 3], [2, 3], [2, 3], [0,2,3], [0,2,3], [0,3], [0,3]],
"sub_sequence": [[0,1,2], [1,2,3], [None, None, 2], [None, 2, 3], [0, None, 2], [None, 2, 3], [0, None, None], [None, None, 3]]
})
shape: (8, 3)
┌───────────┬──────────────┬─────────────────┐
│ group_idx ┆ time_idx ┆ sub_sequence │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[i64] ┆ list[i64] │
╞═══════════╪══════════════╪═════════════════╡
│ 0 ┆ [0, 1, 2, 3] ┆ [0, 1, 2] │
│ 0 ┆ [0, 1, 2, 3] ┆ [1, 2, 3] │
│ 1 ┆ [2, 3] ┆ [null, null, 2] │
│ 1 ┆ [2, 3] ┆ [null, 2, 3] │
│ 2 ┆ [0, 2, 3] ┆ [0, null, 2] │
│ 2 ┆ [0, 2, 3] ┆ [null, 2, 3] │
│ 3 ┆ [0, 3] ┆ [0, null, null] │
│ 3 ┆ [0, 3] ┆ [null, null, 3] │
└───────────┴──────────────┴─────────────────┘
All of this should be computed within polars
, because the real dataset is much larger both in terms of the number of time series and time series length.
Edit:
Based on the suggestion by @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ I have tried the following on the actual dataset (~200 million rows after .explode()
). I forgot to say that we can assume that that group_idx
and time_idx
are already sorted. However, this gets killed.
(
min_df.lazy()
.with_columns(
pl.col("time_idx").alias("time_idx_nulls")
)
.rolling(
index_column='time_idx',
group_by='group_idx',
period=str(max_sequence_length) + 'i',
)
.agg(pl.col("time_idx_nulls"))
.filter(pl.col('time_idx_nulls').list.len() == max_sequence_length)
)
Here's an algorithm that needs only the desired sub-sequence length as input. It uses rolling
to create your sub-sequences.
period = 3
min_df = min_df.explode('time_idx')
(
min_df.get_column('group_idx').unique().to_frame()
.join(
min_df.get_column('time_idx').unique().to_frame(),
how='cross'
)
.join(
min_df.with_columns(pl.col('time_idx').alias('time_idx_nulls')),
on=['group_idx', 'time_idx'],
how='left',
)
.rolling(
index_column='time_idx',
group_by='group_idx',
period=str(period) + 'i',
)
.agg(pl.col("time_idx_nulls"))
.filter(pl.col('time_idx_nulls').list.len() == period)
.sort('group_idx')
)
shape: (8, 3)
┌───────────┬──────────┬─────────────────┐
│ group_idx ┆ time_idx ┆ time_idx_nulls │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ list[i64] │
╞═══════════╪══════════╪═════════════════╡
│ 0 ┆ 2 ┆ [0, 1, 2] │
│ 0 ┆ 3 ┆ [1, 2, 3] │
│ 1 ┆ 2 ┆ [null, null, 2] │
│ 1 ┆ 3 ┆ [null, 2, 3] │
│ 2 ┆ 2 ┆ [0, null, 2] │
│ 2 ┆ 3 ┆ [null, 2, 3] │
│ 3 ┆ 2 ┆ [0, null, null] │
│ 3 ┆ 3 ┆ [null, null, 3] │
└───────────┴──────────┴─────────────────┘
And for example, with period = 2:
shape: (12, 3)
┌───────────┬──────────┬────────────────┐
│ group_idx ┆ time_idx ┆ time_idx_nulls │
│ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ list[i64] │
╞═══════════╪══════════╪════════════════╡
│ 0 ┆ 1 ┆ [0, 1] │
│ 0 ┆ 2 ┆ [1, 2] │
│ 0 ┆ 3 ┆ [2, 3] │
│ 1 ┆ 1 ┆ [null, null] │
│ 1 ┆ 2 ┆ [null, 2] │
│ 1 ┆ 3 ┆ [2, 3] │
│ 2 ┆ 1 ┆ [0, null] │
│ 2 ┆ 2 ┆ [null, 2] │
│ 2 ┆ 3 ┆ [2, 3] │
│ 3 ┆ 1 ┆ [0, null] │
│ 3 ┆ 2 ┆ [null, null] │
│ 3 ┆ 3 ┆ [null, 3] │
└───────────┴──────────┴────────────────┘
One way that we can manage RAM requirements (for this, or any other algorithm on large datasets) is to find ways to divide-and-conquer.
If we look at our particular problem, each line in the input dataset leads to results that are independent of any other line. We can use this fact to apply our algorithm in batches.
But first, let's create some data that leads to a large problem:
min_time = 0
max_time = 1_000
nbr_groups = 400_000
min_df = (
pl.DataFrame({"time_idx": [(range(min_time, max_time, 2))]})
.join(
pl.int_range(nbr_groups, eager=True).alias("group_idx").to_frame(),
how="cross"
)
)
min_df.explode('time_idx')
shape: (200000000, 2)
┌──────────┬───────────┐
│ time_idx ┆ group_idx │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞══════════╪═══════════╡
│ 0 ┆ 0 │
│ 2 ┆ 0 │
│ 4 ┆ 0 │
│ 6 ┆ 0 │
│ ... ┆ ... │
│ 992 ┆ 399999 │
│ 994 ┆ 399999 │
│ 996 ┆ 399999 │
│ 998 ┆ 399999 │
└──────────┴───────────┘
The input dataset when exploded is 200 million records .. so roughly the size you describe. (Of course, using divide-and-conquer, we won't explode the full dataset.)
To divide-and-conquer this, we'll slice
our input dataset into smaller datasets, run the algorithm on the smaller datasets, and then concat
the results into one large dataset. (One nice feature of slice
is that it's very cheap - it's simply a window into the original dataset, so it consumes very little additional RAM.)
Notice the slice_size
variable. You'll need to experiment with this value on your particular computing platform. You want to set this as large as your RAM requirements allow. If set too low, your program will take too long. If set too high, your program will crash. (I've arbitrarily set this to 10,000 as a starting value.)
time_index_df = (
pl.int_range(min_time, max_time, eager=True, dtype=pl.Int64)
.alias("time_idx")
.to_frame()
.lazy()
)
period = 3
slice_size = 10_000
result = pl.concat(
[
(
time_index_df
.join(
min_df
.lazy()
.slice(next_index, slice_size)
.select("group_idx"),
how="cross",
)
.join(
min_df
.lazy()
.slice(next_index, slice_size)
.explode('time_idx')
.with_columns(pl.col("time_idx").alias("time_idx_nulls")),
on=["group_idx", "time_idx"],
how="left",
)
.rolling(
index_column='time_idx',
group_by='group_idx',
period=str(period) + 'i',
)
.agg(pl.col("time_idx_nulls"))
.filter(pl.col('time_idx_nulls').list.len() == period)
.select('group_idx', 'time_idx_nulls')
.collect()
)
for next_index in range(0, min_df.height, slice_size)
]
)
result.sort('group_idx')
shape: (399200000, 2)
┌───────────┬───────────────────┐
│ group_idx ┆ time_idx_nulls │
│ --- ┆ --- │
│ i64 ┆ list[i64] │
╞═══════════╪═══════════════════╡
│ 0 ┆ [0, null, 2] │
│ 0 ┆ [null, 2, null] │
│ 0 ┆ [2, null, 4] │
│ 0 ┆ [null, 4, null] │
│ ... ┆ ... │
│ 399999 ┆ [994, null, 996] │
│ 399999 ┆ [null, 996, null] │
│ 399999 ┆ [996, null, 998] │
│ 399999 ┆ [null, 998, null] │
└───────────┴───────────────────┘
You do actually need to use the joins. The joins are used to fill in the "holes" in your sequences with null
values.
Also, notice that I've put each slice
/batch into lazy mode, but not the entire algorithm. Depending on your computing platform, using lazy mode for the entire algorithm may again overwhelm your system, as Polars attempts to spread the work across multiple processors which could lead to more out-of-memory situations for you.
Also note the humongous size of my output dataset: almost 400 million records. I did this purposely as a reminder that your output dataset may be the ultimate problem. That is, any algorithm would fail if the result dataset is larger than your RAM can hold.