Search code examples
pythonspacy

How to feed spaCy pipeline asynchronously?


I would like to setup a spaCy pipeline which does some parsing and annotations. Source of my documents is a remote storage, therefore obtaining the documents is relatively expensive.

As I want to annotate a lot of documents, I would like to process them in a streaming way asynchronously.

While setting up my pipeline as usual, the code first obtains all documents to then parse it in the pipeline.
How can I stream the documents into the pipeline?

import spacy

def get_docs_from_remote(size):
    // obtain number of documents from remote storage
    for document in result:
        yield(document['text'])

nlp = spacy.load("en_core_web_sm", disable=["ner", "lemmatizer"])
docs = nlp.pipe(
    get_docs_from_remote(
        size=number_of_documents,
    )
)

for doc in docs:
    // do stuff

Solution

  • Are there any errors with this code? It looks fine, e.g.:

    import spacy
    import time
    
    
    def get_docs_from_remote(size):
        for i in range(size):
            time.sleep(2)
            yield str(i)
    
    
    nlp = spacy.load("en_core_web_sm", disable=["ner", "lemmatizer"])
    docs = nlp.pipe(
        get_docs_from_remote(
            size=10,
        ),
        batch_size=2,
    )
    
    for doc in docs:
        print(doc.text)
    

    The main thing to be aware of with generators and nlp.pipe is that it waits for full batches before processing the docs.

    batch_size=2 above is just for demo purposes. Normally the default of 256 is fine and it could be higher if you have more RAM. On the other hand if your docs load more slowly than nlp can process them and you'd rather have the results sooner, you can even switch to batch_size=1, but the overall time spent processing will be much higher.