My problem is the following: I am sending queries via the Google Drive API that fetch all files that match a certain criteria. I won't post the entire code here as it's quite extensive, but the query criteria is just to get all files that belong in folders with a certain name (for example: "I want all files that reside in folders where the folder name contains the string 'meet'"). The code I have written for this particular part, is the following:
import json
import environ
import os
import google.auth
import io
from apiclient import discovery
from httplib2 import Http
from google.cloud import secretmanager
from googleapiclient.http import MediaIoBaseDownload
from oauth2client.service_account import ServiceAccountCredentials
# Imported functions from a local file. Just writing to database and establishing connection
from firestore_drive import add_file, establish_db_connection
.... some other code here ...
def update_files_via_parent_folder(self, parent_id, parent_name):
page_token = None
# Set a query that fetches all files based on the ID of its parent folder
# E.g. "get all files from folder whose ID is parent_id"
query = f"'{parent_id}' in parents"
response = self.execute_query(query, page_token)
files = response.get('files', [])
while True:
# Execute the query, and extract all resulting files in the folder
for file in files:
file_id = file['id']
filename = file['name']
# Start requesting the current file from Drive, and download through a byte-stream
request = self.service.files().get_media(fileId=file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request)
done = False
dl_counter = 0
while done is False:
# Start downloading the file from Drive, and convert it to JSON (dictionary)
status, done = downloader.next_chunk()
prefab_json = json.loads(fh.getvalue())
# Find the proper collection-name and then add the file to database
collection_name = next(type_name for type_name in self.possible_types if type_name in parent_name)
add_file(self.db, collection_name, filename, file_content=prefab_json)
# Find out if there are more files to download in the same folder
page_token = response.get('nextPageToken', None)
if page_token is None:
if len(files) == 0:
print(f'Folder found, but contained no files.')
break
response = self.execute_query(query, page_token)
files = response.get('files', [])
def execute_query(self, query, page_token):
"""
Helper function for executing a query to Google Drive. Implemented as a function due to repeated usage.
"""
return self.service.files().list(
q=query,
spaces='drive',
fields='nextPageToken, files(id, name)',
pageToken=page_token).execute()
Now my question is this: Is there a way to download the files asynchronously or in parallel in the following section?
for file in files:
file_id = ...
filename = ...
# Same as above; start download and write to database...
For reference, the point of the code is to extract files that are located on Google Drive, and copy them over to another database. I'm not concerned with local storage, only fetching from Drive and writing to a database (if this is even possible to do in parallel).
I've tried various options such as multiprocessing.pool, multiprocessing.ThreadPool, and asyncio, but I'm not sure if I actually used them correctly. I can also mention that the database used, is Firestore.
Additional note: the reason I want to do it, is because this sequential operation is extremely slow, and I want to deploy this as a cloud function (which has a maximum time limit of 540 second (9 minutes)).
Any feedback is welcome :)
In case anyone runs into this in the future, this may help. I was running into this issue on a project recently - had to download several 100 files from Google Drive on a regular basis.
The drive_service.files().get_media(fileId=file_id)
can't be called asynchronously. Also you can only use one service to download one file at a time so doing parallel calls with the same client caused Google server side to throw errors. I was stuck w/ sequential downloads taking me ~10 minutes
I ended up instantiating a new client for each download and then threaded the calls w/ ThreadPoolExecutor
. Ran in about ~20-30 secs instead. I'm sure the code could use improving but it worked for me - might help someone else stuck on this.
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional, Tuple
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
# Replace w/ your service account file path
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
SERVICE_ACCOUNT_FILE_PATH = "path/to-your-service-account.json"
def download_file(file_id: str) -> Optional[io.BytesIO]:
creds = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE_PATH, scopes=SCOPES
)
drive_service = build("drive", "v3", credentials=creds)
try:
request = drive_service.files().get_media(fileId=file_id)
file_buffer = io.BytesIO()
downloader = MediaIoBaseDownload(file_buffer, request)
done = False
while not done:
_, done = downloader.next_chunk()
file_buffer.seek(0)
return file_buffer
except Exception as e:
print(f"Error downloading file with ID '{file_id}': {e}")
return None
def download_all_files(
file_ids: List[str], max_workers: int = 5
) -> List[Tuple[str, io.BytesIO]]:
downloaded_files = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file_id = {
executor.submit(download_file, file_id): file_id for file_id in file_ids
}
for future in as_completed(future_to_file_id):
file_id = future_to_file_id[future]
try:
file_buffer = future.result()
if file_buffer is not None:
downloaded_files.append((file_id, file_buffer))
print(f"Downloaded file with ID '{file_id}'")
else:
print(f"Failed to download file with ID '{file_id}'")
except Exception as e:
print(f"Error downloading file with ID '{file_id}': {e}")
return downloaded_files
if __name__ == "__main__":
file_ids = [
# fileids look like this etc.
"1Kk1KsgcuetJqeasdas_GZDasdwasdqwwq939yQAUlM1Al5tOW18",
"your_file_id_2",
]
downloaded_files = download_all_files(file_ids, max_workers=20)