Search code examples
pythonsqlpandasoptimizationduckdb

Dealing with duplicates cols on duckdb with gaps (nulls) and filling them efficiently


I'm new to duckdb (v0.10.1) so this question comes from my lack of knowledge of the built-in functionality that duckdb has.

I have a special use case that I haven't found a cleaver way to do this with duckdb with timeseries data. Sometimes there are some rare occurrences where we get duplicate values for a timestamp in a table that we need to dedup. The problem here is that, sometimes, duplicate rows have different gaps for different columns that, when combined together, fills out most/all the gaps if we merge the rows together. Here is an example to ilustrate the situation

Table with duplicate values for the timestamp column in the following table

timestamp (varchar) A (int64) B (int64)
2022-01-01 1 NULL
2022-01-01 NULL 2
2022-01-02 3 6
2022-01-02 4 NULL

There are duplicate timestamps that we want to dedup, but some columns have a value and some dont in different rows that can be used to fill the gaps. So, the desired output for these cases should be the following:

timestamp (varchar) A (int64) B (int64)
2022-01-01 1 2
2022-01-02 3 6

Using pandas this can be efficiently done using the following logic:

# Fix filling issues for rows with the same timestamp
df = df.set_index("timestamp", drop=False)
df_first = df[~df.index.duplicated(keep="first")]
df_last = df[~df.index.duplicated(keep="last")]
df = df_first.fillna(df_last)

In here we fetch into separate dataframes the first duplicate row and the last one and then try to fill the gaps of the first dataframe with the second one.

Using duckdb I wasn't able to find a clever enough solution to do this besides aggregating every possible column in a pair with the timestamp and stich them all together in a query like the following:

# Construct the SQL query
subqueries = []
for column in list(df.columns).remove('timestamp'):
    subquery = f"""
    (
        SELECT timestamp, {column}
        FROM (
            SELECT timestamp, {column},
                   ROW_NUMBER() OVER(PARTITION BY timestamp ORDER BY (CASE WHEN {column} IS NULL THEN 1 ELSE 0 END)) as rn
            FROM df
        ) sub
        WHERE rn = 1
    ) AS {column}
    """
    subqueries.append(subquery)

#query = "SELECT " + ", ".join([f"{column}.timestamp, {column}.{column}" for column in column_names])
query = "SELECT " + ", ".join([f"{column_names[0]}.timestamp"] + [f"{column}.{column}" for column in column_names])
query += " FROM " + subqueries[0]

for i in range(1, len(subqueries)):
    query += f" JOIN {subqueries[i]} ON {column_names[0]}.timestamp = {column_names[i]}.timestamp" 

The output query of this code is the following

    SELECT A.timestamp, A.A, B.B FROM 
    (
        SELECT timestamp, A
        FROM (
            SELECT timestamp, A,
                   ROW_NUMBER() OVER(PARTITION BY timestamp ORDER BY (CASE WHEN A IS NULL THEN 1 ELSE 0 END)) as rn
            FROM df
        ) sub
        WHERE rn = 1
    ) A
     JOIN 
    (
        SELECT timestamp, B
        FROM (
            SELECT timestamp, B,
                   ROW_NUMBER() OVER(PARTITION BY timestamp ORDER BY (CASE WHEN B IS NULL THEN 1 ELSE 0 END)) as rn
            FROM df
        ) sub
        WHERE rn = 1
    ) B
     ON A.timestamp = B.timestamp 

This does get the right result but it balloons in memory usage and takes way longer compared to the solution using pandas. Also, the queries get quite large when having dataframes with many columns.

The current work around I'm using in order to cope with this is to just drop to pandas when neede in order to run the dedup logic and then get back to duckdb.

I'm wondering if anyone has a better way to do this that can be as performant as the pandas implementation.

Cheers!

I have tried using only sql to solve the problem on duckdb, because the built-in functions didn't see to have a fit for what I was looking for.


Solution

  • It looks like you want the first non-NULL per timestamp group?

    • any_value(): Returns the first non-null value from arg. This function is affected by ordering.
    >>> df
    shape: (4, 3)
    ┌────────────┬──────┬──────┐
    │ timestamp  ┆ A    ┆ B    │
    │ ---        ┆ ---  ┆ ---  │
    │ str        ┆ i64  ┆ i64  │
    ╞════════════╪══════╪══════╡
    │ 2022-01-01 ┆ 1    ┆ null │
    │ 2022-01-01 ┆ null ┆ 2    │
    │ 2022-01-02 ┆ 3    ┆ 6    │
    │ 2022-01-02 ┆ 4    ┆ null │
    └────────────┴──────┴──────┘
    

    columns(*) allows us to easily run a function on each column.

    duckdb.sql("""
    from (from df select row_number() over () row_number, *)
    select any_value(columns(*) order by row_number)
    group by timestamp
    order by row_number 
    """)
    
    ┌────────────┬────────────┬───────┬───────┐
    │ row_number │ timestamp  │   A   │   B   │
    │   int64    │  varchar   │ int64 │ int64 │
    ├────────────┼────────────┼───────┼───────┤
    │          1 │ 2022-01-01 │     1 │     2 │
    │          3 │ 2022-01-02 │     3 │     6 │
    └────────────┴────────────┴───────┴───────┘