Search code examples
pythonpython-polarspyarrow

How to apply ip lookup using polars?


Given two tables I'd like to conduct a lookup over all ips and find the network it belongs to:

I have two large tables:

clients

and the following networks:

networks

Regarding the ClientIP (First table) I thought of casting the whole column with ip_address

Regarding the second column (second table) I thought of casting the whole column with ip_network

Something like this:

import ipaddress

network = ipaddress.ip_network('99.96.0.0/13')
ip_obj = ipaddress.ip_address('99.87.29.96')
print(ip_obj in network)

and then conduct an apply function, but it is very slow especially for tables with this kind of size.

I noticed in some databases like KQL, there is a built-in support: ipv4-lookup

Is there any kind of builtin support for iplookup in polars? or in pyarrows? any suggestions?


Solution

  • Assuming the network dataframe's IpCidr blocks are not overlapping, you could convert the IPv4 addresses to a pl.Int64 and get the max value within the CIDR block.

    A function using only a pl.Expr to convert an IPv4 address to pl.Int64

    import polars as pl
    
    def ip_addr4_int64_expr(ipv4_str_expr: pl.Expr):
        return (
            ipv4_str_expr.str.split(".")
            .list.eval(
                pl.element().cast(pl.Int64)
                * (2 ** (8 * (pl.element().cum_count(reverse=True)))).cast(pl.Int64)
            )
            .list.sum()
        )
    

    A range of addresses can be derived from the CIDR's prefix by getting the number of available hosts and adding it to the base IPv4's Int64 representation.

    cidr_split_ipv4_expr = pl.col("IpCidr").str.split("/").list.get(0)
    cidr_prefix_expr = pl.col("IpCidr").str.split("/").list.get(1).cast(pl.Int64)
    
    ip_cidr_df = ip_cidr_df.with_columns(
        ip_addr4_int64_expr(cidr_split_ipv4_expr).alias("ip_addr4_int64"),
        (
            ip_addr4_int64_expr(cidr_split_ipv4_expr)
            - 1
            + ((2 ** (32 - cidr_prefix_expr)).cast(pl.Int64))
        ).alias("cidr_ip_max"),
    )
    
    client_df = client_df.with_columns(
        ip_addr4_int64_expr(pl.col("ClientIP")).alias("ip_addr4_int64"),
    )
    

    Using a join_asof, a range lookup can be done. Then null out values that return above the max IP range.

    client_df = (
        client_df.sort("ip_addr4_int64")
        .join_asof(ip_cidr_df.sort("ip_addr4_int64"), on="ip_addr4_int64")
        .select(
            "ClientIP",
            "Timestamp",
            pl.when(pl.col("ip_addr4_int64") <= pl.col("cidr_ip_max"))
            .then(pl.col("Info"))
            .alias("Info"),
        )
    )
    

    Examples:

    ip_cidr_df = pl.DataFrame(
        {
            "IpCidr": [
                "99.96.0.0/13", "99.88.0.0/13", "1.0.136.0/22", "1.0.128.0/21",
                "1.0.0.0/24", "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12",
                "192.168.0.0/16",
            ],
            "Info": [
                "ATT-INTERNET4", "ATT-INTERNET4", "TOT-NET TOT Public Company Limit", 
                "TOT-NET TOT Public Company Limit", "CLOUDFLARENET", "The 10.0.0.0/8 Range",
                "The 127.0.0.0/8 Range", "The 172.16.0.0/12 Range", "The 192.168.0.0/16 Range",
            ],
        }
    )
    
    client_df = pl.DataFrame(
        {
            "Timestamp": [
                "2023-06-01 00:00:00", "2023-06-01 00:00:00", "2023-06-01 00:00:00", 
                "2023-06-01 00:00:00", "2023-06-30 23:59:00", "2023-06-30 23:59:00",
                "2023-06-30 23:59:00",
            ],
            "ClientIP": [
                "1.0.0.14", "99.96.1.5", "99.87.29.96", "10.0.0.1", "127.0.0.1", "172.16.0.1", "192.168.0.1",
            ],
        }
    )
    

    Output:

    shape: (7, 3)
    ┌─────────────┬─────────────────────┬──────────────────────────┐
    │ ClientIP    ┆ Timestamp           ┆ Info                     │
    │ ---         ┆ ---                 ┆ ---                      │
    │ str         ┆ str                 ┆ str                      │
    ╞═════════════╪═════════════════════╪══════════════════════════╡
    │ 1.0.0.14    ┆ 2023-06-01 00:00:00 ┆ CLOUDFLARENET            │
    │ 10.0.0.1    ┆ 2023-06-01 00:00:00 ┆ The 10.0.0.0/8 Range     │
    │ 99.87.29.96 ┆ 2023-06-01 00:00:00 ┆ null                     │
    │ 99.96.1.5   ┆ 2023-06-01 00:00:00 ┆ ATT-INTERNET4            │
    │ 127.0.0.1   ┆ 2023-06-30 23:59:00 ┆ The 127.0.0.0/8 Range    │
    │ 172.16.0.1  ┆ 2023-06-30 23:59:00 ┆ The 172.16.0.0/12 Range  │
    │ 192.168.0.1 ┆ 2023-06-30 23:59:00 ┆ The 192.168.0.0/16 Range │
    └─────────────┴─────────────────────┴──────────────────────────┘
    

    Note: This answer assumes a dataframe consisting only of IPv4 addresses and no overlapping CIDR blocks in ip_cidr_df. The same logic could be applied by converting IPv6 addresses to a pl.Struct consisting of pl.Int64.