I have a fairly large table in big query ( app. 9M rows) and I would like to read it via pandas.
I've tried reading and using the [pd.read_gbq()][1]
function, which works fine on small tables.
On the large table it gets stuck after 50 secs or so (logs show elapsed .. 50s
) - without giving an error or anything.
My question is how can I read that table using pd (chunks?). Any conventions on scaling up these bigquery reads will be helpful.
EDIT / resolution
adding to Khan's answer, I ended up implementing chunks, writing 500,000 each time to a file, then reading these files to dataframe like so:
def download_gbq_table(self):
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
increment = 100000
intervals = list(range(0, self.table_size, 100000))
intervals.append(self.table_size - intervals[len(intervals)-1])
df = pd.DataFrame()
for offset in intervals:
query = f"select * from `<table_name>` limit {increment} offset {offset};"
logger.info(f"running query: {query}")
start_time = time.time()
tmp_df = pd.read_gbq(query,
project_id=self.connection_parameters['project_id'],
private_key=self.connection_parameters['service_account'],
dialect='standard'
)
df = pd.concat([df, tmp_df])
logger.info(f'time took: {str(round(time.time() - start_time, 2))}')
if len(df) % 500000 == 0:
df.to_csv(os.path.join(self.tmp_dir, f'df_{str(offset + increment)}.csv'))
df = pd.DataFrame()
def read_df_from_multi_csv(self):
all_files = glob.glob(os.path.join(self.tmp_dir, "df_*"))
df_list = []
for f in all_files:
start_time = time.time()
df_list.append(pd.read_csv(f))
logger.info(f'time took for reading {f}: {str(round(time.time() - start_time, 2))}')
return pd.concat((pd.read_csv(f) for f in all_files))
Pandas' read_gbq
function currently does not provide a chunksize
parameter (even though its opposite to_gbq
function does provide a chunksize
parameter).
Anyways, you can solve your problem with adding LIMIT
and OFFSET
to your SQL query read stuff iteratively from BigQuery. Something on the lines of:
project_id = "xxxxxxxx"
increment=100000
chunks=[*range(0, 9000000, 100000)]
chunks[-1]+=increment
intervals=[[chunks[i-1], chunks[i]+1] for i, e in enumerate(chunks) if i > 0]
query_str="select * from `mydataset.mytable` limit {end} offset {start};"
for start, end in intervals:
query = query_str.format(start=start, end=end)
df = pd.read_gbq(query, project_id)
#-- do stuff with your df here..