Search code examples
pythonscrapytwisted

Threadsafety question for Python collections in Scrapy Item Pipelines (using Twisted for concurrency)


Scrapy has a notion of Item Pipelines that concurrently process (via Twisted) items returned from a Scrapy Spider. The following code example is provided for filtering duplicate items (code copied below). How is it that the set can be used safely by concurrent calls to process_item? It seems that Scrapy invokes item pipelines here.

from scrapy.exceptions import DropItem

class DuplicatesPipeline:

    def __init__(self):
        self.ids_seen = set()

    def process_item(self, item, spider):
        if item['id'] in self.ids_seen:
            raise DropItem("Duplicate item found: %s" % item)
        else:
            self.ids_seen.add(item['id'])
            return item

Solution

  • Twisted and Scrapy are largely single-threaded. Instead of preemptive multi-threading they provide concurrency by way of cooperative multi-tasking. In a cooperative multi-tasking system, there is no preemption. This means that a function like process_item above is perfectly safe to assume self.ids_seen will not change between its first and second to last lines. Only this process_item method is running. No other work can happen until process_item cooperatively gives up control. It does this by raising an exception or return a value. When this happens, control returns to its caller (or whatever the nearest except handler is). That code then gets to run until it decides to give up control, and so on. Eventually control is returned all the way to the Twisted reactor which picks another event to service by calling some application method. Then the process repeats.