Search code examples
tensorflowgputensorflow2.0raydeepface

Parallelize DeepFace on multiple GPUs


I am trying to use the DeepFace python library to do face recognition and analysis on long videos: https://github.com/serengil/deepface.

Using the library out of the box, I am able to get desired results by selecting frames from a video and then iterating through a for loop.

Single GPU

import decord
import tensorflow as tf
from deepface import DeepFace

video_path = 'myvideopath'
vr = decord.VideoReader(video_path)

for i in range(0, 100, FRAME_STEP):
    image_bgr = vr[i].asnumpy()[:,:,::-1]
    results = DeepFace.find(img_path = image_bgr, **other_parameters)

This works, but is too slow for the amount of video and frames that I need to go through.

When running the model, I notice that it uses ~600 MB for prediction, so I should be able to run multiple instances on the same physical GPU. I am only using DeepFace for prediction and am not training or fine tuning any models.

gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    try:
        tf.config.set_logical_device_configuration(gpu, [tf.config.LogicalDeviceConfiguration(memory_limit=630)] * 12)
    except RuntimeError as e:
        # Virtual devices must be set before GPUs have been initialized
        print(e)
    
logical_gpus = tf.config.list_logical_devices('GPU')
print(len(gpus), "Physical GPU,", len(logical_gpus), "Logical GPUs")

2 Physical GPU, 24 Logical GPUs

I would like to be able to parallelize the DeepFace.find and DeepFace.analyze functions.

The first thing that I tried to do was to have a queue of free gpu devices and use concurrent.futures.ThreadPoolExecutor.

def multigpu_helper(index, device_name, image_bgr, fn, fn_dict, q):
    print(f'{index:5} {device_name}')
    start_timer = timeit.default_timer()
    with tf.device(device_name):
        results = fn(img_path=image_bgr, **fn_dict)
    q.put(device_name)
    end_timer = timeit.default_timer()
    print(f'MultiGPU Time: {end_timer-start_timer} sec.')
    return results


def multigpu_process(iterable, vr, fn, fn_dict):
    logical_devices = tf.config.list_logical_devices(device_type='GPU')
    print(logical_devices)

    q = queue.Queue()
    for logical_device in logical_devices:
        q.put(logical_device.name)

    results_dict = dict()

    item_list = list(iterable)

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(logical_devices)) as pool:
        future_jobs = dict()

        while item_list:
            device_name = q.get()
            index = item_list.pop(0)
            image_bgr = vr[index].asnumpy()[:, :, ::-1]
            future_jobs[pool.submit(multigpu_helper, index, device_name, image_bgr, fn, fn_dict, q)] = index

        for future in concurrent.futures.as_completed(future_jobs):
            index = future_jobs.get(future)
            results = future.result()
            results_dict[index] = results

    return results_dict

I am able to get the code to execute and output results, but it is no faster than doing it in a single for loop on a single GPU.

[LogicalDevice(name='/device:GPU:0', device_type='GPU'), LogicalDevice(name='/device:GPU:1', device_type='GPU'), LogicalDevice(name='/device:GPU:2', device_type='GPU'), LogicalDevice(name='/device:GPU:3', device_type='GPU'), LogicalDevice(name='/device:GPU:4', device_type='GPU'), LogicalDevice(name='/device:GPU:5', device_type='GPU'), LogicalDevice(name='/device:GPU:6', device_type='GPU'), LogicalDevice(name='/device:GPU:7', device_type='GPU'), LogicalDevice(name='/device:GPU:8', device_type='GPU'), LogicalDevice(name='/device:GPU:9', device_type='GPU'), LogicalDevice(name='/device:GPU:10', device_type='GPU'), LogicalDevice(name='/device:GPU:11', device_type='GPU'), LogicalDevice(name='/device:GPU:12', device_type='GPU'), LogicalDevice(name='/device:GPU:13', device_type='GPU'), LogicalDevice(name='/device:GPU:14', device_type='GPU'), LogicalDevice(name='/device:GPU:15', device_type='GPU'), LogicalDevice(name='/device:GPU:16', device_type='GPU'), LogicalDevice(name='/device:GPU:17', device_type='GPU'), LogicalDevice(name='/device:GPU:18', device_type='GPU'), LogicalDevice(name='/device:GPU:19', device_type='GPU'), LogicalDevice(name='/device:GPU:20', device_type='GPU'), LogicalDevice(name='/device:GPU:21', device_type='GPU'), LogicalDevice(name='/device:GPU:22', device_type='GPU'), LogicalDevice(name='/device:GPU:23', device_type='GPU')]
    0 /device:GPU:0
   30 /device:GPU:1
   60 /device:GPU:2
   90 /device:GPU:3
  120 /device:GPU:4
  150 /device:GPU:5
  180 /device:GPU:6
  210 /device:GPU:7
  240 /device:GPU:8
  270 /device:GPU:9
  300 /device:GPU:10
  330 /device:GPU:11
  360 /device:GPU:12
  390 /device:GPU:13
  420 /device:GPU:14
  450 /device:GPU:15
  480 /device:GPU:16
  510 /device:GPU:17
  540 /device:GPU:18
  570 /device:GPU:19
  600 /device:GPU:20
  630 /device:GPU:21
  660 /device:GPU:22
  690 /device:GPU:23
MultiGPU Time: 16.968208671023604 sec.
  720 /device:GPU:2
MultiGPU Time: 17.829027735977434 sec.
  750 /device:GPU:1
MultiGPU Time: 17.852755011990666 sec.
  780 /device:GPU:8
MultiGPU Time: 19.71368485200219 sec.MultiGPU Time: 19.543589979992248 sec.

MultiGPU Time: 19.8676836140221 sec.
  810 /device:GPU:4
MultiGPU Time: 19.85990399698494 sec.
  840 /device:GPU:11
  870 /device:GPU:0
MultiGPU Time: 20.076353634009138 sec.
  900 /device:GPU:6
  930 /device:GPU:3
MultiGPU Time: 20.145404886978213 sec.
MultiGPU Time: 20.27192261395976 sec.
  960 /device:GPU:9
  990 /device:GPU:7
MultiGPU Time: 20.459441539016552 sec.
MultiGPU Time: 20.418532160052564 sec.
MultiGPU Time: 20.581610807043035 sec.
MultiGPU Time: 20.545571406022646 sec.
MultiGPU Time: 20.832303048984613 sec.
MultiGPU Time: 20.97456920897821 sec.
MultiGPU Time: 20.994418176996987 sec.
MultiGPU Time: 21.35945221298607 sec.
MultiGPU Time: 21.50979186099721 sec.
MultiGPU Time: 21.405662977020256 sec.
MultiGPU Time: 21.542257393943146 sec.
MultiGPU Time: 22.063301149988547 sec.
MultiGPU Time: 21.665760322008282 sec.
MultiGPU Time: 22.105394209967926 sec.
MultiGPU Time: 6.661869053030387 sec.
MultiGPU Time: 9.814038792042993 sec.
MultiGPU Time: 7.658941667003091 sec.
MultiGPU Time: 8.546573753003031 sec.
MultiGPU Time: 10.831304075953085 sec.
MultiGPU Time: 9.250181486015208 sec.
MultiGPU Time: 8.87483947101282 sec.
MultiGPU Time: 12.432360459002666 sec.
MultiGPU Time: 9.511910478991922 sec.
MultiGPU Time: 9.66243519296404 sec.
Face Recognition MultiGPU Total Time: 29.63435428502271 sec.

In fact, a single GPU iteration of the DeepFace.find function in a for loop should take about 0.5 sec. It seems that the multithreading is causing all the threads to finish around their cumulative time which is slower and undesired.

I tried a second time without using a queue, just splitting the input indices into separate lists and then processing separately.

def cycle_baskets(items: List[Any], maxbaskets: int) -> List[List[Any]]:
    baskets = [[] for _ in range(min(maxbaskets, len(items)))]
    for item, basket in zip(items, cycle(baskets)):
        basket.append(item)
    return baskets


def multigpu_helper_split(device_name, item_list, video_path, fn, fn_dict):
    print(device_name)
    start_timer = timeit.default_timer()

    results_dict = dict()
    
    vr = decord.VideoReader(str(video_path))

    with tf.device(device_name):
        for index in item_list:
            start_index_timer = timeit.default_timer()

            image_bgr = vr[index].asnumpy()[:, :, ::-1]
            results_dict[index] = fn(img_path=image_bgr, **fn_dict)

            end_index_timer = timeit.default_timer()
            print(f'Device {device_name} Index {index:5} {end_index_timer - start_index_timer} sec.')

    end_timer = timeit.default_timer()
    print(f'MultiGPU Time: {end_timer - start_timer} sec.')
    return results_dict


def multigpu_process_split(iterable, video_path, fn, fn_dict):
    logical_devices = [device.name for device in tf.config.list_logical_devices(device_type='GPU')]
    print(logical_devices)

    results_dict = dict()

    item_lists = cycle_baskets(list(iterable), len(logical_devices))

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(logical_devices)) as pool:
        future_jobs = {pool.submit(multigpu_helper_split, logical_devices[i], item_lists[i], video_path, fn, fn_dict) for i in range(len(logical_devices))}

        for future in concurrent.futures.as_completed(future_jobs):
            results_dict.update(future.result())

    return results_dict

This is also considerably slower and also caused the kernal to crash.

Device /device:GPU:18 Index   540 305.03293917299015 sec.
MultiGPU Time: 311.7356750360341 sec.
Device /device:GPU:22 Index   660 305.6161605300149 sec.
MultiGPU Time: 312.3281374910148 sec.
Device /device:GPU:5 Index   150 309.5672924729879 sec.
Device /device:GPU:13 Index   390 311.9252848789911 sec.
MultiGPU Time: 318.34215058299014 sec.
Device /device:GPU:0 Index     0 312.96517166896956 sec.
Device /device:GPU:3 Index    90 312.41818467900157 sec.
Device /device:GPU:4 Index   120 312.507540087041 sec.
Device /device:GPU:10 Index   300 312.49839297297876 sec.
MultiGPU Time: 319.4717267890228 sec.
Device /device:GPU:23 Index   690 313.53694368101424 sec.
MultiGPU Time: 320.6566755659878 sec.

I realize that with tf.device(device_name): is over the entire DeepFace function. Looking at the DeepFace source code, it looks like there is quite a lot more than tensorflow and what I really would want to parallelize is model.predict().

DeepFace.py

def represent():
...
# represent
        if "keras" in str(type(model)):
            # new tf versions show progress bar and it is annoying
            embedding = model.predict(img, verbose=0)[0].tolist()
        else:
            # SFace and Dlib are not keras models and no verbose arguments
            embedding = model.predict(img)[0].tolist()

How would I be able to parallelize the DeepFace.find and DeepFace.analyze functions to run on 24 logical GPUs that I have? I would like to be able to get a x24 speedup for processing the selected frames.

It would be much preferred if I could wrap something around the DeepFace functions themselves, but if that is not possible, then I could try to parallelize the source code of the DeepFace library.


Solution

  • I was able to parallelize DeepFace by parallelizing some of the internal functions using ray.