I have a celery worker that periodically fetches data from an API that I need to be in sync with. I create or update a particular model instance depending on whether I have the external id
stored already.
However, as the API I'm syncing with has grown, this task has become increasingly expensive.
Here's the relevant code:
for d in api.fetch_items():
attempted_count += 1
try:
existing_item = MyModel.objects.get(item_id=d["item"])
serializer = MySerializer(
existing_item,
data=d,
partial=True,
context={"company": company},
)
except MyModel.DoesNotExist:
serializer = MySerializer(data=d, context={"company": company})
if serializer.is_valid():
serializer.save()
imported_count += 1
else:
logger.error(f"Invalid data when deserializing item: {serializer.errors}\nBased on: {d}")
I think because the response is so long (often 50,000+ items), those queries make the task eat up lots of time. I'm hoping to get some suggestions on making this flow more efficient.
Some things I've considered:
Comparing the instances of MyModel
via iterator()
to the response the task receives to determine which internal items need to be updated or created (I don't even know if this would be faster)
Bulk updating, though I'm not sure how to do bulk updates when the values that need to be updated for each item are different.
Maybe an atomic transaction?
I'm not even really sure if the things I've thought of are good approaches. Hopefully someone can share their thoughts!
I've had to deal with a similar issue - which frankly stems from having a rubbish API that doesn't provide a sensible way to push changes to data, and forces you to iterate over a whole dataset periodically to identify changes yourself.
Whichever way you do it, if you update all the objects in your database every time, it's going to be slow and inefficient and scale poorly. The approach I took was as follows:
When iterating over the incoming data, generate a hash of each entry.
Store this hash in the cache, with the object ID as the key.
When iterating over the data again, check the hash, and if it matches what you have in the cache, skip the update because you know nothing has changed since the last time you saved it.
In code, this would look something like this:
import hashlib
from django.core.cache import cache
for d in api.fetch_items():
data_str = json.dumps(d) # Convert the data to a string so you can hash it
data_hash = hashlib.md5(data_str.encode('utf-8')).hexdigest() # Generate md5 hash, which is quick
cache_key = 'some_prefix_' + data_hash
if not cache.get(cache_key):
cache.set(cache_key, 1) # Just set a simple value - we only care about the key
# Perform your logic here for updating the item in the database
else:
# The data hasn't changed since the last time you checked - skip it
continue
There is still some inherent inefficiency in iterating over everything - but at least this approach eliminates unnecessary database writes.