I would like to setup a spaCy pipeline which does some parsing and annotations. Source of my documents is a remote storage, therefore obtaining the documents is relatively expensive.
As I want to annotate a lot of documents, I would like to process them in a streaming way asynchronously.
While setting up my pipeline as usual, the code first obtains all documents to then parse it in the pipeline.
How can I stream the documents into the pipeline?
import spacy
def get_docs_from_remote(size):
// obtain number of documents from remote storage
for document in result:
yield(document['text'])
nlp = spacy.load("en_core_web_sm", disable=["ner", "lemmatizer"])
docs = nlp.pipe(
get_docs_from_remote(
size=number_of_documents,
)
)
for doc in docs:
// do stuff
Are there any errors with this code? It looks fine, e.g.:
import spacy
import time
def get_docs_from_remote(size):
for i in range(size):
time.sleep(2)
yield str(i)
nlp = spacy.load("en_core_web_sm", disable=["ner", "lemmatizer"])
docs = nlp.pipe(
get_docs_from_remote(
size=10,
),
batch_size=2,
)
for doc in docs:
print(doc.text)
The main thing to be aware of with generators and nlp.pipe
is that it waits for full batches before processing the docs.
batch_size=2
above is just for demo purposes. Normally the default of 256
is fine and it could be higher if you have more RAM. On the other hand if your docs load more slowly than nlp
can process them and you'd rather have the results sooner, you can even switch to batch_size=1
, but the overall time spent processing will be much higher.