I'm trying to merge a number of columns from a Polars data frame (in Rust) into a format compatible with the MAP
type in Parquet. I believe this means converting the columns into a single column containing a list of structs, each with a key
and a value
field. In other words:
List(Struct([Field('key', Utf8), Field('value', Utf8)]))
This is how Polars reads in Parquet files that have been written in this format, so I think it stands to reason that writing back to that format should work.
I'm struggling to work out exactly how to do it though and can't find any Rust examples of doing anything similar. Can anyone point me in the right direction?
This is how I did it in the end:
fn merge_fields(&self, dataframe: LazyFrame) -> LazyFrame {
let schema = dataframe.schema().unwrap();
let merge_fields: Vec<Expr> = schema
.iter_names()
.filter(|f| f.starts_with(&self.prefix))
.map(|v| col(v).cast(DataType::String))
.collect();
// Check there are fields to merge
if merge_fields.len() == 0 {
return dataframe;
}
let prefix_len = self.prefix.len();
let merged_name = self.field.to_owned();
dataframe.with_columns([
// Merge fields into single column
as_struct(merge_fields)
.apply(move |s: Series| {
// Downcast to a struct
let chunked_array = s.struct_()?;
// Get field names
// Slice the name to remove prefix
let names: Vec<&str> = chunked_array.fields().iter().map(|f| &f.name()[prefix_len..]).collect();
// Loop over rows
let out = chunked_array.into_iter().map(|row| {
let merge_fields_as_strings = row.iter().enumerate().filter_map(|(idx, field)| {
let val = match field.get_str() {
Some(v) => v,
None => return None
};
if val.is_empty() || val.trim().is_empty() {
return None;
}
Some((names[idx], val))
}).collect::<Vec<(&str, &str)>>();
let d = df!(
"key" => merge_fields_as_strings.iter().map(|k| k.0).collect::<Vec<&str>>(),
"value" => merge_fields_as_strings.iter().map(|v| v.1).collect::<Vec<&str>>()
).unwrap();
Some(d.select(["key", "value"]).unwrap().into_struct(&merged_name).into_series())
})
.collect::<ChunkedArray<ListType>>();
Ok(Some(out.into_series()))
},
GetOutput::from_type(DataType::default())
).alias(&self.field)
])
}