Hi I am trying to read from postgres into a polars frame in a generic way.
I have read a post here Rust: Read dataframe in polars from mysql
about reading from mysql and want to change this so I don't need to handle the columns for each new query.
I was planning on storing all the values in a large vector ordered by columns first. Storing each type in an enum.
I have the following code.
let rows = client.query(&select_sql, &[]).await?;
#[derive(Clone, Debug)]
enum GenericValue {
let number_of_rows = rows.len();
let number_of_columns = rows.first().expect("expected one row").columns().len();
println!("Number of columns: {:?}", number_of_columns);
println!("Number of rows: {:?}", number_of_rows);
let mut data: Vec<Option<GenericValue>> = vec![None; number_of_columns * number_of_rows];
for (row_index, row) in rows.iter().enumerate() {
for (col_index, column) in row.columns().iter().enumerate() {
let colType: String = column.type_().to_string();
let index = col_index * number_of_columns + row_index;
if colType == "int4" {
data[index] = Some(GenericValue::Int32(row.get(col_index)));
else if colType == "text" {
data[index] = Some(GenericValue::String(row.get(col_index)));
else if colType == "varchar" {
data[index] = Some(GenericValue::String(row.get(col_index)));
else if colType == "bool" {
data[index] = Some(GenericValue::Bool(row.get(col_index)));
else if colType == "timestamptz" {
data[index] = Some(GenericValue::DateTime(row.get(col_index)));
else {
panic!("{}", &colType);
for c in 0 .. number_of_columns {
let start_index = c * number_of_columns;
let slice = data[start_index .. start_index + number_of_rows];
let first: Option<GenericValue> = data[start_index];
let chunked_data: Utf8Chunked = slice.iter().map(|v| v.unwrap()).collect();
println!("{:?}", chunked_data);
// let ca_country: Utf8Chunked = values.iter().map(|v| &*v.country).collect();
The problem is I need to implement the trait PolarsAsRef<str>
which I am not sure how to do and how would this chunked data handle NULLS ?
How can I create the correct chunked data for each series here ?
Note this is to move away from connectorx as it still seems pinned to a very old polars and I don't get much performance benefits from it anyway as I use partitioned postgres tables which seem to break connectorx's parallelism.
In the end I answered it like below. Seems to be a real lack of docs and examples on this.
use arrow2::array::{Array, Int32Array, Utf8Array, StructArray};
use polars_core::prelude::ArrayRef;
use arrow2::datatypes::{DataType, Field, Schema};
use polars::prelude::*;
use tokio_postgres::{Client, NoTls, Error, ToStatement};
use tokio_postgres::binary_copy::BinaryCopyInWriter;
use tokio_postgres::types::ToSql;
use tokio_postgres::types::Type;
use tokio_postgres::{Row, Column};
use polars_core::chunked_array::ChunkedArray; // ::from_chunks
async fn postgres_to_polars(rows: &[Row]) -> std::result::Result<DataFrame, PolarsError> {
let mut fields: Vec<(String, polars_core::datatypes::DataType)> = Vec::new();
let column_count = rows[0].len();
for i in 0..column_count {
let field_name = rows[0].columns()[i].name().to_string();
let data_type: polars_core::datatypes::DataType = match rows[0].columns()[i].type_() {
&Type::VARCHAR => polars_core::datatypes::DataType::Utf8,
&Type::INT4 => polars_core::datatypes::DataType::Int32,
// Add more cases for other PostgreSQL types as needed
_ => panic!("Unsupported PostgreSQL data type"),
// let field = Field::new(&field_name, data_type, false);
fields.push((field_name, data_type));
let first_row = rows.first().unwrap();
let mut arrow_arrays: Vec<Vec<ArrayRef>> = vec![];
for (col_index, column) in first_row.columns().iter().enumerate() {
let mut array_data: Vec<ArrayRef> = vec![];
for (row_index, row) in rows.iter().enumerate() {
let array: ArrayRef = match column.type_() {
&Type::VARCHAR => Box::new(Utf8Array::<i64>::from(vec![Some(row.try_get::<usize, String>(col_index).unwrap())])),
&Type::INT4 => Box::new(Int32Array::from(vec![Some(row.try_get(col_index).unwrap())])),
// Add more cases for other PostgreSQL types as needed
_ => panic!("Unsupported PostgreSQL data type"),
let mut series: Vec<Series> = vec![];
for (array, field) in arrow_arrays.iter().zip(fields.iter()) {
unsafe {
let s = Series::from_chunks_and_dtype_unchecked(
let df: PolarsResult<DataFrame> = DataFrame::new(series);