Search code examples
pythonparquet

Save a CSV file that's too big to fit into memory into a parquet file


My development environment is a single-user workstation with 4 cores but not running Spark or HDFS. I have a CSV file that's too big to fit in memory. I want to save it as a parquet file and analyze it locally using existing tools, but have the ability to move it to the Spark cluster in the future and analyze it with Spark.

Is there any way to do this row-by-row without moving the file over to the Spark cluster?

I'm looking for a pure-python solution that does not involve the use of Spark.


Solution

  • In addition to the response of the ostrokach answer in question "How to convert a csv file to parquet" , and also based on the approach of Zelazny7 in question "Get inferred dataframe types iteratively using chunksize".

    The approach to converting a CSV bigger than RAM to a parquet file, is in summary:

    • Read the csv in chunks with a certain limit of RAM consumption in megabytes
    • Create a parquet file from the schema optimally inferred from the entire CSV file
    • Increment the parquet file by chunks of CSV data

    This approach reads the entire CSV file to detect the most optimized data type for each column.

    Then identifies the impact on RAM reduction and adjusts the chunksize that will be preserved in the parquet file as row_groups.

    This approach, despite reading and writing the parquet file in chunks, is secure in terms of writing schema consistency and optimized in choosing the data type for each column.

    Mitigated schema risk

    Since the first chunk of data can have a column showing only integer data, the column will be identified as int64. If the schema of the parquet file is created by this first chunk, the assumed premise is that all other data in the csv file has an identical schema to the first chunk. But if in the last chunk, this same column has some missing data, the increment of the parquet file will result in a schema error because the int64 format does not support empty data.

    To avoid the risk of a schema error, it is necessary to analyze the entire dataframe and identify each column by the broadest data type among all the data read.

    Convert csv bigger than RAM to parquet

    
    import numbers
    import warnings
    from pathlib import Path
    from typing import Optional
    
    import numpy as np
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    from pandas.errors import DtypeWarning
    
    warnings.filterwarnings("ignore", category=DtypeWarning)
    
    
    def get_chunksize_by_max_ram_mb(
        file_path: Path, max_ram_mb_per_chunk: int
    ) -> int:
        """Returns the amount of rows (chunksize) of a CSV that is approximately
        equivalent to the maximum RAM consumption defined.
    
        Args:
            file_path (Path): csv file path
            max_ram_mb_per_chunk (int): maximum consumption of RAM in mb
    
        Returns:
            int: chunksize
        """
        mb_size = file_path.stat().st_size / (1024**2)
        num_lines = sum(1 for _ in open(file_path))
        rows_per_chunk = (
            int(max_ram_mb_per_chunk / mb_size * num_lines / 3.5 / 10000) * 10000
        )
        return rows_per_chunk
    
    
    def auto_opt_pd_dtypes(
        df_: pd.DataFrame, inplace=False
    ) -> Optional[pd.DataFrame]:
        """Automatically downcast Number dtypes for minimal possible,
        will not touch other (datetime, str, object, etc)
        Ref.: https://stackoverflow.com/a/67403354
        :param df_: dataframe
        :param inplace: if False, will return a copy of input dataset
        :return: `None` if `inplace=True` or dataframe if `inplace=False`
    
        Opportunities for Improvement
        Optimize Object column for categorical
        Ref.: https://github.com/safurrier/data_science_toolbox/blob/master/data_science_toolbox/pandas/optimization/dtypes.py#L56
        """
    
        df = df_ if inplace else df_.copy()
    
        for col in df.columns:
            # integers
            if issubclass(df[col].dtypes.type, numbers.Integral):
                # unsigned integers
                if df[col].min() >= 0:
                    df[col] = pd.to_numeric(df[col], downcast="unsigned")
                # signed integers
                else:
                    df[col] = pd.to_numeric(df[col], downcast="integer")
            # other real numbers
            elif issubclass(df[col].dtypes.type, numbers.Real):
                df[col] = pd.to_numeric(df[col], downcast="float")
    
        if not inplace:
            return df
    
    
    def get_dtype_opt(csv_file_path, sep, chunksize, encoding):
        """
        Identifies the optimized data type of each column by analyzing
        the entire dataframe by chunks.
        Ref.: https://stackoverflow.com/a/15556579
    
        return: dtype dict to pass as dtype argument of pd.read_csv
        """
    
        list_chunk = pd.read_csv(
            csv_file_path,
            sep=sep,
            chunksize=chunksize,
            header=0,
            low_memory=True,
            encoding=encoding,
        )
    
        list_chunk_opt = []
        for chunk in list_chunk:
            chunk_opt = auto_opt_pd_dtypes(chunk, inplace=False)
            list_chunk_opt.append(chunk_opt.dtypes)
    
        df_dtypes = pd.DataFrame(list_chunk_opt)
        dict_dtypes = df_dtypes.apply(
            lambda x: np.result_type(*x), axis=0
        ).to_dict()
        return dict_dtypes
    
    
    def get_chunksize_opt(
        csv_file_path, sep, dtype, max_ram_mb_per_chunk, chunksize, encoding
    ):
        """After dtype optimization, analyzing only one data chunk,
        returns the amount of rows (chunksize) of a CSV that is
        approximately equivalent to the maximum RAM consumption.
        """
    
        for chunk in pd.read_csv(
            csv_file_path,
            sep=sep,
            dtype=dtype,
            chunksize=chunksize,
            low_memory=True,
            encoding=encoding,
        ):
            chunksize_opt = chunksize * (
                max_ram_mb_per_chunk
                / (chunk.memory_usage(deep=True).sum() / (1024**2))
            )
            break
        return int(chunksize_opt / 10_000) * 10_000
    
    
    def write_parquet(
        csv_file_path, parquet_file_path, sep, dtype, chunksize, encoding
    ):
        """Write Parquet file from a CSV with defined dtypes and
        by chunks for RAM optimization.
        """
    
        for i, chunk in enumerate(
            pd.read_csv(
                csv_file_path,
                sep=sep,
                dtype=dtype,
                chunksize=chunksize,
                low_memory=True,
                encoding=encoding,
            )
        ):
            if i == 0:
                # Guess the schema of the CSV file from the first chunk
                parquet_schema = pa.Table.from_pandas(df=chunk).schema
                # Open a Parquet file for writing
                parquet_writer = pq.ParquetWriter(
                    parquet_file_path, parquet_schema, compression="gzip"
                )
            # Write CSV chunk to the parquet file
            table = pa.Table.from_pandas(chunk, schema=parquet_schema)
            parquet_writer.write_table(table)
    
        parquet_writer.close()
    
    
    def convert_csv_to_parquet(
        csv_file_path,
        parquet_file_path,
        max_ram_mb_per_chunk,
        sep=",",
        encoding="utf8",
    ):
        """Converts a CSV file to Parquet file, with maximum RAM consumption
        limit allowed and automatically optimizing the data types of each column.
        """
    
        chunksize = get_chunksize_by_max_ram_mb(
            csv_file_path, max_ram_mb_per_chunk
        )
        dict_dtypes_opt = get_dtype_opt(csv_file_path, sep, chunksize, encoding)
        chunksize_opt = get_chunksize_opt(
            csv_file_path,
            sep,
            dict_dtypes_opt,
            max_ram_mb_per_chunk,
            chunksize,
            encoding,
        )
        write_parquet(
            csv_file_path,
            parquet_file_path,
            sep,
            dict_dtypes_opt,
            chunksize_opt,
            encoding,
        )
       
    

    Usage

    convert_csv_to_parquet(
        csv_file_path='input_big_csvfile.csv',
        parquet_file_path='output_parquetfile.parquet',
        max_ram_mb_per_chunk=100, sep=",", encoding="utf8")
    

    Explanation

    As the CSV file does not fit in RAM memory, the approach will respect a maximum allowed RAM consumption.

    The steps:

    • Finding the optimal chunksize for the first CSV scan.
    • Identify the most optimized dtype for each column, respecting the defined maximum RAM consumption.
    • After optimization, identify the number of rows that correspond to the defined maximum RAM consumption to serve as a basis for creating "num_row_groups" in the parquet file.
    • Write parquet file respecting the defined maximum RAM consumption.

    So, even if you only have 500mb of RAM available, you can convert a 10_000mb CSV to parquet and on top of that optimize the columns data types, which improves the reading speed and reduces the final file size even more.

    Read parquet file in chunk equivalent

    def read_by_row_group(parquet_file_path):
        parquet_file = pq.ParquetFile(parquet_file_path)
        for row_group in range(parquet_file.num_row_groups):
            yield parquet_file.read_row_group(row_group).to_pandas()
            
    for df in read_by_row_group(parquet_file_path):
        
        # parsing
        pass
    

    Futher improvements

    Implement data optimization of "object" dtype column to "categorical" dtype column, similar to what was implemented in data_science_toolbox , but in a chunk approach.