I have an application that loads millions of documents to a collection, using 30-80 workers to simultaneously load the data. Sometimes, I find that the loading process didn't complete smoothly, and with other databases I can simply delete the table and start over, but not with Firestore collections. I have to list the documents and delete them, and I've not found a way to scale this with the same capacity as my loading process. What I'm doing now is that I have two AppEngine hosted Flask/Python methods, one to get a page of 1000 documents and pass to another method to delete them. This way the process to list documents is not blocked by the process to delete them. It's still taking days to complete which is too long.
Method to get list of documents and create a task to delete them, which is single threaded:
@app.route('/delete_collection/<collection_name>/<batch_size>', methods=['POST'])
def delete_collection(collection_name, batch_size):
batch_size = int(batch_size)
coll_ref = db.collection(collection_name)
print('Received request to delete collection {} {} docs at a time'.format(
collection_name,
batch_size
))
num_docs = batch_size
while num_docs >= batch_size:
docs = coll_ref.limit(batch_size).stream()
found = 0
deletion_request = {
'doc_ids': []
}
for doc in docs:
deletion_request['doc_ids'].append(doc.id)
found += 1
num_docs = found
print('Creating request to delete docs: {}'.format(
json.dumps(deletion_request)
))
# Add to task queue
queue = tasks_client.queue_path(PROJECT_ID, LOCATION, 'database-manager')
task_meet = {
'app_engine_http_request': { # Specify the type of request.
'http_method': 'POST',
'relative_uri': '/delete_documents/{}'.format(
collection_name
),
'body': json.dumps(deletion_request).encode(),
'headers': {
'Content-Type': 'application/json'
}
}
}
task_response_meet = tasks_client.create_task(queue, task_meet)
print('Created task to delete {} docs: {}'.format(
batch_size,
json.dumps(deletion_request)
))
Here is the method I use to delete the documents, which can scale. In effect it only processes 5-10 at a time, limited by the rate which the other method passes pages of doc_ids to delete. Separating the two helps, but not that much.
@app.route('/delete_documents/<collection_name>', methods=['POST'])
def delete_documents(collection_name):
# Validate we got a body in the POST
if flask.request.json:
print('Request received to delete docs from :{}'.format(collection_name))
else:
message = 'No json found in request: {}'.format(flask.request)
print(message)
return message, 400
# Validate that the payload includes a list of doc_ids
doc_ids = flask.request.json.get('doc_ids', None)
if doc_ids is None:
return 'No doc_ids specified in payload: {}'.format(flask.request.json), 400
print('Received request to delete docs: {}'.format(doc_ids))
for doc_id in doc_ids:
db.collection(collection_name).document(doc_id).delete()
return 'Finished'
if __name__ == '__main__':
# Set environment variables for running locally
app.run(host='127.0.0.1', port=8080, debug=True)
I've tried running multiple concurrent executions of delete_collection(), but am not certain that even helps, as I'm not sure if every time it calls limit(batch_size).stream() that it gets a distinct set of documents or possibly is getting duplicates.
How can I make this run faster?
This is what I came up with. It's not super fast (120-150 docs per second), but all the other examples I found in python didn't work at all:
now = datetime.now()
then = now - timedelta(days=DOCUMENT_EXPIRATION_DAYS)
doc_counter = 0
commit_counter = 0
limit = 5000
while True:
docs = []
print('Getting next doc handler')
docs = [snapshot for snapshot in db.collection(collection_name)
.where('id.time', '<=', then)
.limit(limit)
.order_by('id.time', direction=firestore.Query.ASCENDING
).stream()]
batch = db.batch()
for doc in docs:
doc_counter = doc_counter + 1
if doc_counter % 500 == 0:
commit_counter += 1
print('Committing batch {} from {}'.format(commit_counter, doc.to_dict()['id']['time']))
batch.commit()
batch.delete(doc.reference)
batch.commit()
if len(docs) == limit:
continue
break
print('Deleted {} documents in {} seconds.'.format(doc_counter, datetime.now() - now))
As mentioned in the other comments, .stream() has a 60 second deadline. This iterative structure sets a limit of 5000 after which .stream() is called again, which keeps it under the 60 second limit. If anybody knows how to speed this up, let me know.