My development environment is a single-user workstation with 4 cores but not running Spark or HDFS. I have a CSV file that's too big to fit in memory. I want to save it as a parquet file and analyze it locally using existing tools, but have the ability to move it to the Spark cluster in the future and analyze it with Spark.
Is there any way to do this row-by-row without moving the file over to the Spark cluster?
I'm looking for a pure-python solution that does not involve the use of Spark.
In addition to the response of the ostrokach answer in question "How to convert a csv file to parquet" , and also based on the approach of Zelazny7 in question "Get inferred dataframe types iteratively using chunksize".
The approach to converting a CSV bigger than RAM to a parquet file, is in summary:
This approach reads the entire CSV file to detect the most optimized data type for each column.
Then identifies the impact on RAM reduction and adjusts the chunksize that will be preserved in the parquet file as row_groups.
This approach, despite reading and writing the parquet file in chunks, is secure in terms of writing schema consistency and optimized in choosing the data type for each column.
Mitigated schema risk
Since the first chunk of data can have a column showing only integer data, the column will be identified as int64. If the schema of the parquet file is created by this first chunk, the assumed premise is that all other data in the csv file has an identical schema to the first chunk. But if in the last chunk, this same column has some missing data, the increment of the parquet file will result in a schema error because the int64 format does not support empty data.
To avoid the risk of a schema error, it is necessary to analyze the entire dataframe and identify each column by the broadest data type among all the data read.
import numbers
import warnings
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pandas.errors import DtypeWarning
warnings.filterwarnings("ignore", category=DtypeWarning)
def get_chunksize_by_max_ram_mb(
file_path: Path, max_ram_mb_per_chunk: int
) -> int:
"""Returns the amount of rows (chunksize) of a CSV that is approximately
equivalent to the maximum RAM consumption defined.
Args:
file_path (Path): csv file path
max_ram_mb_per_chunk (int): maximum consumption of RAM in mb
Returns:
int: chunksize
"""
mb_size = file_path.stat().st_size / (1024**2)
num_lines = sum(1 for _ in open(file_path))
rows_per_chunk = (
int(max_ram_mb_per_chunk / mb_size * num_lines / 3.5 / 10000) * 10000
)
return rows_per_chunk
def auto_opt_pd_dtypes(
df_: pd.DataFrame, inplace=False
) -> Optional[pd.DataFrame]:
"""Automatically downcast Number dtypes for minimal possible,
will not touch other (datetime, str, object, etc)
Ref.: https://stackoverflow.com/a/67403354
:param df_: dataframe
:param inplace: if False, will return a copy of input dataset
:return: `None` if `inplace=True` or dataframe if `inplace=False`
Opportunities for Improvement
Optimize Object column for categorical
Ref.: https://github.com/safurrier/data_science_toolbox/blob/master/data_science_toolbox/pandas/optimization/dtypes.py#L56
"""
df = df_ if inplace else df_.copy()
for col in df.columns:
# integers
if issubclass(df[col].dtypes.type, numbers.Integral):
# unsigned integers
if df[col].min() >= 0:
df[col] = pd.to_numeric(df[col], downcast="unsigned")
# signed integers
else:
df[col] = pd.to_numeric(df[col], downcast="integer")
# other real numbers
elif issubclass(df[col].dtypes.type, numbers.Real):
df[col] = pd.to_numeric(df[col], downcast="float")
if not inplace:
return df
def get_dtype_opt(csv_file_path, sep, chunksize, encoding):
"""
Identifies the optimized data type of each column by analyzing
the entire dataframe by chunks.
Ref.: https://stackoverflow.com/a/15556579
return: dtype dict to pass as dtype argument of pd.read_csv
"""
list_chunk = pd.read_csv(
csv_file_path,
sep=sep,
chunksize=chunksize,
header=0,
low_memory=True,
encoding=encoding,
)
list_chunk_opt = []
for chunk in list_chunk:
chunk_opt = auto_opt_pd_dtypes(chunk, inplace=False)
list_chunk_opt.append(chunk_opt.dtypes)
df_dtypes = pd.DataFrame(list_chunk_opt)
dict_dtypes = df_dtypes.apply(
lambda x: np.result_type(*x), axis=0
).to_dict()
return dict_dtypes
def get_chunksize_opt(
csv_file_path, sep, dtype, max_ram_mb_per_chunk, chunksize, encoding
):
"""After dtype optimization, analyzing only one data chunk,
returns the amount of rows (chunksize) of a CSV that is
approximately equivalent to the maximum RAM consumption.
"""
for chunk in pd.read_csv(
csv_file_path,
sep=sep,
dtype=dtype,
chunksize=chunksize,
low_memory=True,
encoding=encoding,
):
chunksize_opt = chunksize * (
max_ram_mb_per_chunk
/ (chunk.memory_usage(deep=True).sum() / (1024**2))
)
break
return int(chunksize_opt / 10_000) * 10_000
def write_parquet(
csv_file_path, parquet_file_path, sep, dtype, chunksize, encoding
):
"""Write Parquet file from a CSV with defined dtypes and
by chunks for RAM optimization.
"""
for i, chunk in enumerate(
pd.read_csv(
csv_file_path,
sep=sep,
dtype=dtype,
chunksize=chunksize,
low_memory=True,
encoding=encoding,
)
):
if i == 0:
# Guess the schema of the CSV file from the first chunk
parquet_schema = pa.Table.from_pandas(df=chunk).schema
# Open a Parquet file for writing
parquet_writer = pq.ParquetWriter(
parquet_file_path, parquet_schema, compression="gzip"
)
# Write CSV chunk to the parquet file
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
def convert_csv_to_parquet(
csv_file_path,
parquet_file_path,
max_ram_mb_per_chunk,
sep=",",
encoding="utf8",
):
"""Converts a CSV file to Parquet file, with maximum RAM consumption
limit allowed and automatically optimizing the data types of each column.
"""
chunksize = get_chunksize_by_max_ram_mb(
csv_file_path, max_ram_mb_per_chunk
)
dict_dtypes_opt = get_dtype_opt(csv_file_path, sep, chunksize, encoding)
chunksize_opt = get_chunksize_opt(
csv_file_path,
sep,
dict_dtypes_opt,
max_ram_mb_per_chunk,
chunksize,
encoding,
)
write_parquet(
csv_file_path,
parquet_file_path,
sep,
dict_dtypes_opt,
chunksize_opt,
encoding,
)
convert_csv_to_parquet(
csv_file_path='input_big_csvfile.csv',
parquet_file_path='output_parquetfile.parquet',
max_ram_mb_per_chunk=100, sep=",", encoding="utf8")
As the CSV file does not fit in RAM memory, the approach will respect a maximum allowed RAM consumption.
The steps:
So, even if you only have 500mb of RAM available, you can convert a 10_000mb CSV to parquet and on top of that optimize the columns data types, which improves the reading speed and reduces the final file size even more.
def read_by_row_group(parquet_file_path):
parquet_file = pq.ParquetFile(parquet_file_path)
for row_group in range(parquet_file.num_row_groups):
yield parquet_file.read_row_group(row_group).to_pandas()
for df in read_by_row_group(parquet_file_path):
# parsing
pass
Implement data optimization of "object" dtype column to "categorical" dtype column, similar to what was implemented in data_science_toolbox , but in a chunk approach.