Search code examples
machine-learningpython-multiprocessingpython-polarsrecommendation-engine

Polars - issues with performance - attempting to create a new dataframe per row


I need to run another library's algo on each row of a large df but am having trouble converting my code to polars expressions for better performance. Here are a couple sample DFs:

df_products = pl.DataFrame({
    'SKU':['apple','banana','carrot','date'],
    'DESCRIPTION': [
        "Wire Rope",
        "Connector",
        "Tap",
        "Zebra"
    ],
    'CATL3': [
        "Fittings",
        "Tube",
        "Tools",
        "Animal"
    ],
    'YELLOW_CAT': [
        "Rope Accessories",
        "Tube Fittings",
        "Forming Taps",
        "Striped"
    ],
    'INDEX': [0, 5, 25, 90],
    'EMBEDDINGS': [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9],
        [10,11,12]
    ],
})


df_items_sm_ex = pl.DataFrame({
    'PRODUCT_INFO':['apple','banana','carrot'],
    'SEARCH_SIMILARITY_SCORE': [
        [1., 0.87, 0.54, 0.33],
        [1., 0.83, 0.77, 0.55],
        [1., 0.92, 0.84, 0.65]
    ],
    'SEARCH_POSITION': [
        [0, 5, 25, 90],
        [1, 2, 151, 373],
        [3, 5, 95, 1500]
    ],
    'SKU':['apple','banana','carrot'],
    'YELLOW_CAT': [
        "Rope Accessories",
        "Tube Fittings",
        "Forming Taps"
    ],
    'CATL3': [
        "Fittings",
        "Tube",
        "Tools"
    ],
    'EMBEDDINGS': [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
    ],
})

and now the code

Per each row I have 3 main operations: Generate the base new dataframe, preprocess / clean / run predictions on the dataframe, write the dataframe to a few SQL tables. I have noticed step 1 and 2 easily take the longest time to execute:

df_items_sm_ex.select(
        pl.struct(df_items_sm_ex.columns)
        .map_elements(lambda row: build_complements(
            row, df_items, rfc, rfc_comp, engine, current_datetime
            )))


def build_complements(row, df_products, ml, ml_comp, engine, current_datetime):
    try:
        #step 1 - generate the base new dataframe
        output_df = build_candidate_dataframe(row, df_products)
        #step 2 - preprocess / clean / run predictions on the dataframe
        output_df = process_candidate_output(df_products, output_df, ml, ml_comp)
        #step 3 write dataframes to SQL
        write_validate_complements(output_df, row, current_datetime, engine)
    except Exception as e:
        print(f'exception: {repr(e)}')

def build_candidate_dataframe(row, df_products):
    df_len = len(row['SEARCH_SIMILARITY_SCORE'])
    schema = {'QUERY': str,
              'SIMILARITY_SCORE': pl.Float32, 
              'POSITION': pl.Int64,
              'QUERY_SKU': str, 
              'QUERY_LEAF': str,
              'QUERY_CAT': str,
              'QUERY_EMBEDDINGS': pl.List(pl.Float32)
            }
    output_df = pl.DataFrame({'QUERY': [row['PRODUCT_INFO']] * df_len,
                    'SIMILARITY_SCORE': row['SEARCH_SIMILARITY_SCORE'], 
                    'POSITION': row['SEARCH_POSITION'],
                    'QUERY_SKU': [row['SKU']] * df_len, 
                    'QUERY_LEAF': [row['YELLOW_CAT']] * df_len,
                    'QUERY_CAT': [row['CATL3']] * df_len,
                    'QUERY_EMBEDDINGS': [row['EMBEDDINGS']] * df_len
                    }, schema=schema).sort("SIMILARITY_SCORE", descending=True)
                
    output_df = output_df.join(df_products[['SKU', 'EMBEDDINGS', 'INDEX', 'DESCRIPTION', 'CATL3', 'YELLOW_CAT']], left_on=['POSITION'], right_on=['INDEX'], how='left')
    output_df = output_df.rename({"DESCRIPTION": "SIMILAR_PRODUCT_INFO", "CATL3": "SIMILAR_PRODUCT_CAT", "YELLOW_CAT": "SIMILAR_PRODUCT_LEAF"})
    return output_df

def process_candidate_output(df_products, output_df, ml, ml_comp):
    combined_embeddings = (output_df.to_pandas()['QUERY_EMBEDDINGS'] + output_df.to_pandas()['EMBEDDINGS']) / 2
    output_df = output_df.with_columns(pl.Series(name='COMBINED_EMBEDDINGS', values=combined_embeddings))
    output_df = output_df[['QUERY', 'QUERY_SKU', 'QUERY_CAT', 'QUERY_LEAF', 'SIMILAR_PRODUCT_INFO', 'SIMILAR_PRODUCT_CAT', 'SIMILAR_PRODUCT_LEAF', 'SIMILARITY_SCORE', 'COMBINED_EMBEDDINGS', 'SKU', 'POSITION']]
    output_df = output_df.filter(
        pl.col('SKU') != output_df['QUERY_SKU'][0]
    )
    #ML predictions
    output_df = predict_complements(output_df, ml)
    output_df = output_df.filter(
        pl.col('COMPLEMENTARY_PREDICTIONS') == 1
    )
    #Other ML predictions
    output_df = predict_required_accessories(output_df, ml_comp)
    output_df = output_df.sort(by='LABEL_PROBABILITY', descending=True)
    return output_df

Solution

  • For each row of the df you're pulling the data out of polars and into python objects and then you're reinitiating a polars df. That is very expensive both in time and memory. Instead you should keep everything in polars memory unless for some reason you absolutely can't. You mention that most of the time is taken in steps 1 and step 2 but step 2 includes your ML stuff so that might still be the bottleneck.

    Using map_elements here isn't ideal because you're not returning anything back to the df, just use a regular for loop. Also I combined your two functions into this one:

    def build_and_process_candidate_output(i, df_items_sm_ex, df_products, ml, ml_comp):
        output_df = (
            df_items_sm_ex[i]
            .explode("SEARCH_SIMILARITY_SCORE", "SEARCH_POSITION")
            .rename(
                {
                    "PRODUCT_INFO": "QUERY",
                    "SEARCH_SIMILARITY_SCORE": "SIMILARITY_SCORE",
                    "SEARCH_POSITION": "POSITION",
                    "SKU": "QUERY_SKU",
                    "YELLOW_CAT": "QUERY_LEAF",
                    "CATL3": "QUERY_CAT",
                    "EMBEDDINGS": "QUERY_EMBEDDINGS",
                }
            )
            .join(
                df_products.select(
                    "SKU", "EMBEDDINGS", "INDEX", "DESCRIPTION", "CATL3", "YELLOW_CAT"
                ),
                left_on=["POSITION"],
                right_on=["INDEX"],
                how="left",
            )
            .rename(
                {
                    "DESCRIPTION": "SIMILAR_PRODUCT_INFO",
                    "CATL3": "SIMILAR_PRODUCT_CAT",
                    "YELLOW_CAT": "SIMILAR_PRODUCT_LEAF",
                }
            )
            .select(
                "QUERY",
                "QUERY_SKU",
                "QUERY_CAT",
                "QUERY_LEAF",
                "SIMILAR_PRODUCT_INFO",
                "SIMILAR_PRODUCT_CAT",
                "SIMILAR_PRODUCT_LEAF",
                "SIMILARITY_SCORE",
                
                # This assumes these are the same length
                (pl.col("QUERY_EMBEDDINGS").explode() + pl.col("EMBEDDINGS").explode())
                .implode()
                .over("POSITION")
                .alias("COMBINED_EMBEDDINGS"),
    
                "SKU",
                "POSITION",
            )
             # Given the sample data, this filter makes everything go away
             # which is why supplying good sample data is important
            .filter(pl.col("SKU") != pl.col("QUERY_SKU").first())
        )
    
        # ML predictions
        output_df = predict_complements(output_df, ml)
        output_df = output_df.filter(pl.col("COMPLEMENTARY_PREDICTIONS") == 1)
        # Other ML predictions
        output_df = predict_required_accessories(output_df, ml_comp)
        output_df = output_df.sort(by="LABEL_PROBABILITY", descending=True)
        return output_df