Search code examples
rust-polars

Create a polars dataframe from postgres sql in a generic way


Hi I am trying to read from postgres into a polars frame in a generic way.

I have read a post here Rust: Read dataframe in polars from mysql

about reading from mysql and want to change this so I don't need to handle the columns for each new query.

I was planning on storing all the values in a large vector ordered by columns first. Storing each type in an enum.

I have the following code.

let rows = client.query(&select_sql, &[]).await?;

  #[derive(Clone, Debug)]
  enum GenericValue {
    String(String),
    Bool(bool),
    DateTime(DateTime<Utc>),
    Int32(i32),
    UInt(u32),
  }

  let number_of_rows = rows.len();
  let number_of_columns = rows.first().expect("expected one row").columns().len();

  println!("Number of columns: {:?}", number_of_columns);
  println!("Number of rows: {:?}", number_of_rows);

  let mut data: Vec<Option<GenericValue>> = vec![None; number_of_columns * number_of_rows]; 

  for (row_index, row) in rows.iter().enumerate() {

    for (col_index, column) in row.columns().iter().enumerate() {
        let colType: String = column.type_().to_string();
        
        let index = col_index * number_of_columns + row_index;

        if colType == "int4" { 
            data[index] = Some(GenericValue::Int32(row.get(col_index)));
        }
        else if colType == "text" {
            data[index] = Some(GenericValue::String(row.get(col_index)));
        }
        else if colType == "varchar" {
          data[index] = Some(GenericValue::String(row.get(col_index)));
        }
        else if colType == "bool" {
            data[index] = Some(GenericValue::Bool(row.get(col_index)));
        }
        else if colType == "timestamptz" {
          data[index] = Some(GenericValue::DateTime(row.get(col_index)));
        }
        else {
            panic!("{}", &colType);
        }
    }

   for c in 0 .. number_of_columns {

      let start_index = c * number_of_columns;
      let slice = data[start_index .. start_index + number_of_rows];

      let first: Option<GenericValue> = data[start_index];

      let chunked_data: Utf8Chunked = slice.iter().map(|v| v.unwrap()).collect();

      println!("{:?}", chunked_data);


      // let ca_country: Utf8Chunked = values.iter().map(|v| &*v.country).collect();

   }

The problem is I need to implement the trait PolarsAsRef<str> forGenericValue which I am not sure how to do and how would this chunked data handle NULLS ? How can I create the correct chunked data for each series here ?

Note this is to move away from connectorx as it still seems pinned to a very old polars and I don't get much performance benefits from it anyway as I use partitioned postgres tables which seem to break connectorx's parallelism.

Thanks


Solution

  • In the end I answered it like below. Seems to be a real lack of docs and examples on this.

    use arrow2::array::{Array, Int32Array, Utf8Array, StructArray};
    
    use polars_core::prelude::ArrayRef;
    
    use arrow2::datatypes::{DataType, Field, Schema};
    use polars::prelude::*;
    
    use tokio_postgres::{Client, NoTls, Error, ToStatement};
    use tokio_postgres::binary_copy::BinaryCopyInWriter;
    use tokio_postgres::types::ToSql;
    use tokio_postgres::types::Type;
    use tokio_postgres::{Row, Column};
    
    use polars_core::chunked_array::ChunkedArray; // ::from_chunks
    
    async fn postgres_to_polars(rows: &[Row]) -> std::result::Result<DataFrame, PolarsError> {
    
        let mut fields: Vec<(String, polars_core::datatypes::DataType)> = Vec::new();
        let column_count = rows[0].len();
        for i in 0..column_count {
            let field_name = rows[0].columns()[i].name().to_string();
            let data_type: polars_core::datatypes::DataType = match rows[0].columns()[i].type_() {
                &Type::VARCHAR => polars_core::datatypes::DataType::Utf8,
                &Type::INT4 => polars_core::datatypes::DataType::Int32,
                // Add more cases for other PostgreSQL types as needed
                _ => panic!("Unsupported PostgreSQL data type"),
            };
            // let field = Field::new(&field_name, data_type, false);
            fields.push((field_name, data_type));
        }
    
        let first_row = rows.first().unwrap();
    
        let mut arrow_arrays: Vec<Vec<ArrayRef>> = vec![];
        
        for (col_index, column) in first_row.columns().iter().enumerate() {
    
            let mut array_data: Vec<ArrayRef> = vec![];
    
            for (row_index, row) in rows.iter().enumerate() {
      
                let array: ArrayRef = match column.type_() {
                    &Type::VARCHAR => Box::new(Utf8Array::<i64>::from(vec![Some(row.try_get::<usize, String>(col_index).unwrap())])),
                    &Type::INT4 => Box::new(Int32Array::from(vec![Some(row.try_get(col_index).unwrap())])),
                    // Add more cases for other PostgreSQL types as needed
                    _ => panic!("Unsupported PostgreSQL data type"),
                };
      
                array_data.push(array);
      
            }
    
            arrow_arrays.push(array_data);
        }
    
        let mut series: Vec<Series> = vec![];
    
        for (array, field) in arrow_arrays.iter().zip(fields.iter()) {
    
            unsafe {
    
                let s = Series::from_chunks_and_dtype_unchecked(
                    &field.0,
                    array.to_vec(),
                    &field.1
                );
    
                series.push(s);
            }
        }
        
        let df: PolarsResult<DataFrame> = DataFrame::new(series);
    
        df
    }