Search code examples
python-3.xnlpmpispacygensim

Does Spacy support multiple GPUs?


I was wondering if Spacy supports multi-GPU via mpi4py?

I am currently using Spacy's nlp.pipe for Named Entity Recognition on a high-performance-computing cluster that supports the MPI protocol and has many GPUs. It says here that I would need to specify the GPU to use with cupy, but with PyMPI, I am not sure if the following will work (should I import spacy after calling cupy device?):


from mpi4py import MPI
import cupy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = ["His friend Nicolas J. Smith is here with Bart Simpon and Fred."*100]
else:
    data = None

unit = comm.scatter(data, root=0)

with cupy.cuda.Device(rank):
    import spacy
    from thinc.api import set_gpu_allocator, require_gpu
    set_gpu_allocator("pytorch")
    require_gpu(rank)
    nlp = spacy.load('en_core_web_lg')
    nlp.add_pipe("merge_entities")
    tmp_list = []
    for doc in nlp.pipe(unit):
        res = " ".join([t.text if not t.ent_type_ else t.ent_type_ for t in doc])
        tmp_list.append(res)

result = comm.gather(tmp_list, root=0)

if comm.rank == 0:
    print (result)
else:
    result = None

Or if i have 4 GPUs on the same machine and I do not want to use MPI, can I do the following:

from joblib import Parallel, delayed
import cupy

rank = 0

def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))

def flatten(list_of_lists):
    "Flatten a list of lists to a combined list"
    return [item for sublist in list_of_lists for item in sublist]

def process_chunk(texts):
    with cupy.cuda.Device(rank):
        import spacy
        from thinc.api import set_gpu_allocator, require_gpu
        set_gpu_allocator("pytorch")
        require_gpu(rank)
        preproc_pipe = []
        for doc in nlp.pipe(texts, batch_size=20):
            preproc_pipe.append(lemmatize_pipe(doc))
        rank+=1
        return preproc_pipe

def preprocess_parallel(texts, chunksize=100):
    executor = Parallel(n_jobs=4, backend='multiprocessing', prefer="processes")
    do = delayed(process_chunk)
    tasks = (do(chunk) for chunk in chunker(texts, len(texts), chunksize=chunksize))
    result = executor(tasks)
    return flatten(result)

preprocess_parallel(texts = ["His friend Nicolas J. Smith is here with Bart Simpon and Fred."*100], chunksize=1000)

Solution

  • I think I have figured out how to do this:

    The key is to instruct cupy to use a new GPU.

    import multiprocessing as mp
    mp.set_start_method('spawn', force=True)
    from joblib import Parallel, delayed
    from itertools import cycle
    import cupy
    import spacy
    from thinc.api import set_gpu_allocator, require_gpu
    
    
    def chunker(iterable, total_length, chunksize):
        return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))
    
    def flatten(list_of_lists):
        "Flatten a list of lists to a combined list"
        return [item for sublist in list_of_lists for item in sublist]
    
    def process_entity(doc):
        super_word_ls = []
        for s in doc.sents:
            word_ls = []
            for t in s:
                if not t.ent_type_:
                    if (t.text.strip()!=""):
                        word_ls.append(t.text)
                else:
                    word_ls.append(t.ent_type_)
            if len(word_ls)>0:
                super_word_ls.append(" ".join(word_ls))
        return " ".join(super_word_ls)
    
    def process_chunk(texts, rank):
        print(rank)
        with cupy.cuda.Device(rank):
            set_gpu_allocator("pytorch")
            require_gpu(rank)
            nlp = spacy.load("en_core_web_trf")
            preproc_pipe = []
            for doc in nlp.pipe(texts, batch_size=20):
                preproc_pipe.append(process_entity(doc))
            rank+=1
            return preproc_pipe
    
    
    def preprocess_parallel(texts, chunksize=100):
        executor = Parallel(n_jobs=2, backend='multiprocessing', prefer="processes")
        do = delayed(process_chunk)
        tasks = []
        gpus = list(range(0, cupy.cuda.runtime.getDeviceCount()))
        rank = 0
        for chunk in chunker(texts, len(texts), chunksize=chunksize):
            tasks.append(do(chunk, rank))
            rank = (rank+1)%len(gpus)
        result = executor(tasks)
        return flatten(result)
    
    if __name__ == '__main__':
        print(preprocess_parallel(texts = ["His friend Nicolas J. Smith is here with Bart Simpon and Fred."]*100, chunksize=50))