I am working with Polars in a wasm environment.
I have noticed an inconsistency with the LazyFrame.collect operation where it sometimes creates threads when working with certain datasets.
Here is the code that relates to the issue
#[wasm_bindgen]
pub fn start(buff: &[u8],
item_id:&str,
order_id:&str,
item_name:&str) -> JsValue{
let cursor = Cursor::new(buff);
let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();
let df = lf.groupby([col(order_id)]);
let df = df.agg([col(item_id),col(item_name)]);
// Error occurs here
let df = df.collect().unwrap();
}
Working with a particular dataset provides me with the error:
panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'
because it is attempting to spawn threads in a WASM environment.
However, with other datasets, this process would execute flawlessly. And it would not try to create the threads. The issue does not seem to be file size due to testing with various datasets.
I would like to know what part of the Lazyframe.collect operation creates this inconsistency and how to avoid it.
working.csv
Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5
notworking.csv
Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006
The Polars fork that allows wasm is provided by https://github.com/universalmind303/polars/tree/wasm
You can see the full project here, as well as both CSV files: https://github.com/KivalM/lazyframe-min-test
EDIT: Output of describe_plan()
working dataset
[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
project */3 columns | details: None;
selection: "None"
not working dataset
[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
project */3 columns | details: None;
selection: "None"
Output of schema()
working dataset
name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8
not working dataset
name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8
output describe_optimized_plan():
[col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
project 3/3 columns | details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
selection: "None"
EDIT:
After a closer look at the source code. the problem doesnt seem to be directly from any polars code.
I have tracked the issue down to polars-lazy/src/physical_plan/executors/groupby.rs
Function
impl Executor for GroupByExec {
fn execute
Which then returns a value from
groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)
However, the groupby_helper
function runs to completion, and the dataframe is successfully created. The error appears when the dataframe is being returned from groupby_helper
to fn execute
. It is odd that a thread is attempting to be created only when this function returns. Does there exist something in RUST WASM that could cause behaviour like this?
so it looks like there is a std::thread
operation happening with the groupbys that I missed when creating the branch.
impl Drop for GroupsIdx {
fn drop(&mut self) {
let v = std::mem::take(&mut self.all);
// ~65k took approximately 1ms on local machine, so from that point we drop on other thread
// to stop query from being blocked
if v.len() > 1 << 16 {
std::thread::spawn(move || drop(v));
} else {
drop(v);
}
}
}
The dataset size is what is determining the thread spawn.
any group greater than 1 << 16
(~65k) will spawn a thread.
Feature flagging that impl
to only compile on non-wasm targets should fix your issue.