We've developed a data scraping program that regularly upserts data into a PostgreSQL database, using Python's psycopg2 library. The database is pretty large, containing tens of millions of records, and while the number of new or updated records from each scraping run for one website is usually small (dozens), there are occasional outliers with a few thousand changes. We scrape plan to scrape a million sites a day.
Our current challenge is efficiently marking records as inactive when they're not present in a new data scraping run. Each record has a "cms_id" column, which we use to match the scraped data. After a run, any record with a cms_id not present in the new data should be marked inactive (i.e., is_active = false
).
Here's our current method, which is working fine but we're looking to improve:
cms_ids = tuple(cms_ids)
update_query = """
UPDATE {table_name}
SET is_active = false
WHERE website_id = %s::uuid
AND is_active != false
AND cms_id NOT IN %s
""".format(table_name=table)
params = (store.site_id, cms_ids)
self.db_cursor.execute(update_query, params)
In this code, the cms_ids variable is supplied as a list of integers and then converted to a tuple.
We have a more complex situation when two columns form a unique identifier for a record - the "cms_id" and the "title". Our current approach for this case is running into syntax errors. Here's the problematic code:
# cms_ids is a list of tuples, each tuple containing a cms_product_id (int) and option_name (str)
update_query = """
UPDATE "public".products_options
SET is_active = false
FROM VALUES %s AS data(cms_product_id,option_name)
WHERE is_active != false
AND products_options.cms_product_id = data.cms_product_id
AND products_options.option_name = data.option_name
"""
self.db_cursor.execute(update_query, (cms_ids,))
This approach is returning syntax errors related to the ARRAY usage. This specific code returns the following error:
psycopg2.errors.SyntaxError: syntax error at or near "ARRAY"
LINE 4: FROM VALUES ARRAY[(680518, 'Size'),(68051...
Removing the tuple conversion, so self.db_cursor.execute(update_query, cms_ids)
, returns the following error:
TypeError: not all arguments converted during string formatting
Our primary goal is to optimize the execution time of the is_active update queries, to limit the load on the database server during these scraping runs.
Say we have a table of products with 50 million records, which we can reduce to 1,000 by filtering on the website_id. We have 1,000 cms_id values, and with the scraping run, we get 900. The mismatch of 100 means those 100 have to be set to inactive.
For the more complex scenario, we can do a join to filter to e.g. 1,000 values. We don't have a tuple of 1,000 integers, but a list containing tuples of (int, str). We may get 900 of those during the scraping run. The mismatch of 100 needs to be found.
Any suggestions or insights would be greatly appreciated! If you need further details about our system architecture or data, please let us know.
I've implemented the ANY(%S) clause instead of NOT IN %s in an effort to improve performance, but get the following error:
psycopg2.errors.WrongObjectType: op ANY/ALL (array) requires an array on the right side LINE 6: AND cms_id != ANY((80,523699,523...
Based on psycopg2 documentation, this use of the ANY clause seems to be correct, albeit with the execute method instead of execute_values. If I change it and use execute, it works. Here is the full code (will clean up table str formatting later):
update_query = """
UPDATE {table_name}
SET is_active = false
WHERE website_id = '{store_id}'::uuid
AND is_active != false
AND cms_id != ANY(%s)
""".format(table_name=table,
store_id=store.site_id)
execute_values(self.db_cursor, update_query, (cms_ids,))
# Changing the last line to execute makes it work
# self.db_cursor.execute(update_query, (cms_ids,))
Using both the execute and execute_values fast action helper, I was able to get the queries to work using the following code:
update_query = """
UPDATE {table_name}
SET is_active = false
WHERE website_id = '{site_id}'::uuid
AND is_active != false
AND id != ALL(%s)
""".format(table_name=table,
site_id=site_id)
self.db_cursor.execute(update_query, (cms_ids,))
update_query = f"""
UPDATE "public".products_options po
SET is_active = false
FROM "public".products p
WHERE p.id = po.product_id
AND p.website_id = '{site_id}'::uuid
AND po.is_active != false
AND NOT (po.product_id, po.option_name) IN %s
"""
# ids is a list of tuples, each tuple containing the product_id and option_name
self.execute_values(update_query, [ids])