I am trying to speed up processing of large lists of texts via parallelisation of textacy. When I use Pool from multiprocessing the resulting textacy corpus comes out empty. I am not sure if the problem is in the way I use textacy or multiprocessing paradigm? Here is the example that illustrates my issue:
import spacy
import textacy
from multiprocessing import Pool
texts_dict={
"key1":"First text 1."
,"key2":"Second text 2."
,"key3":"Third text 3."
,"key4":"Fourth text 4."
}
model=spacy.load('en_core_web_lg')
# this works
corpus = textacy.corpus.Corpus(lang=model)
corpus.add(tuple([value, {'key':key}],) for key,value in texts_dict.items())
print(corpus) # prints Corpus(4 docs, 8 tokens)
print([doc for doc in corpus])
# now the same thing with a worker pool returns empty corpus
corpus2 = textacy.corpus.Corpus(lang=model)
pool = Pool(processes=2)
pool.map( corpus2.add, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
print(corpus2) # prints Corpus(0 docs, 0 tokens)
print([doc for doc in corpus2])
# to make sure we get the right data into corpus.add
pool.map( print, (tuple([value, {'key':key}],) for key,value in texts_dict.items()) )
Textacy is based on spacy. Spacy doesn't support multithreading but supposedly should be OK to run in multiple processes. https://github.com/explosion/spaCy/issues/2075
As per great suggeston of @constt https://stackoverflow.com/a/58317741/4634344 the collecting of the results into the corpus works for a datasets as large as n_docs= 10273 n_sentences= 302510 n_tokens= 2053129.
For a larger dataset (16000 docs 3MM tokens) I get a following error:
result_corpus=corpus.get()
File "<string>", line 2, in get
File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Unserializable message: Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
send(msg)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
´ I will investigate but if you have a direct solution - much appreciated!
Because of the fact that python processes run in separate memory spaces, you have to share your corpus
object between processes in the pool. To do this, you have to wrap the corpus
object into a sharable class which you'll register with a BaseManager class. Here is how you can refactor your code to make it work:
#!/usr/bin/python3
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import spacy
import textacy
texts = {
'key1': 'First text 1.',
'key2': 'Second text 2.',
'key3': 'Third text 3.',
'key4': 'Fourth text 4.',
}
class PoolCorpus(object):
def __init__(self):
model = spacy.load('en_core_web_sm')
self.corpus = textacy.corpus.Corpus(lang=model)
def add(self, data):
self.corpus.add(data)
def get(self):
return self.corpus
BaseManager.register('PoolCorpus', PoolCorpus)
if __name__ == '__main__':
with BaseManager() as manager:
corpus = manager.PoolCorpus()
with Pool(processes=2) as pool:
pool.map(corpus.add, ((v, {'key': k}) for k, v in texts.items()))
print(corpus.get())
Output:
Corpus(4 docs, 16 tokens)