Search code examples
pythonperformancemultiprocessing

Why decoding a large base64 string appears to be faster in single thread than in mutiprocessing?


I have a number of large base64 strings to decode, ranging from a few hundred of MB up to ~5 GB each.

The obvious solution is a single call to base64.b64decode ("reference implementation").

I'm trying to speed up the process by using multiprocessing but, surprisingly, it is much slower than the reference implementation.

On my machine I get:

reference_implementation
decoding time = 7.37

implmementation1
Verify result Ok
decoding time = 7.59

threaded_impl
Verify result Ok
decoding time = 13.24

mutiproc_impl
Verify result Ok
decoding time = 11.82

What I am doing wrong?

(Warning: memory hungry code!)

import base64

from time import perf_counter
from binascii import a2b_base64
import concurrent.futures as fut
from time import sleep
from gc import collect
from multiprocessing import cpu_count

def reference_implementation(encoded):
    """This is the implementation that gives the desired result"""
    return base64.b64decode(encoded)


def implmementation1(encoded):
    """Try to call the directly the underlying library"""
    return a2b_base64(encoded)


def threaded_impl(encoded, N):
    """Try multi threading calling the underlying library"""
    # split the string into pieces
    d = len(encoded) // N            # number of splits
    lbatch = (d // 4) * 4           # lenght of first N-1 batches, the last is len(source) - lbatch*N
    batches = []
    for i in range(N-1):
        start = i * lbatch
        end = (i + 1) * lbatch
        # print(i, start, end)
        batches.append(encoded[start:end])
    batches.append(encoded[end:])
    # Decode
    ret = bytes()
    with fut.ThreadPoolExecutor(max_workers=N) as executor:
        # Submit tasks for execution and put pieces together
        for result  in executor.map(a2b_base64, batches):
            ret = ret + result
    return ret


def mutiproc_impl(encoded, N):
    """Try multi processing calling the underlying library"""
    # split the string into pieces
    d = len(encoded) // N            # number of splits
    lbatch = (d // 4) * 4           # lenght of first N-1 batches, the last is len(source) - lbatch*N
    batches = []
    for i in range(N-1):
        start = i * lbatch
        end = (i + 1) * lbatch
        # print(i, start, end)
        batches.append(encoded[start:end])
    batches.append(encoded[end:])
    # Decode
    ret = bytes()
    with fut.ProcessPoolExecutor(max_workers=N) as executor:
        # Submit tasks for execution and put pieces together
        for result  in executor.map(a2b_base64, batches):
            ret = ret + result
    return ret

if __name__ == "__main__":
    CPU_NUM = cpu_count()

    # Prepare a 4.6 GB byte string (with less than 32 GB ram you may experience swapping on virtual memory)
    repeat = 60000000
    large_b64_string = b'VGhpcyBzdHJpbmcgaXMgZm9ybWF0dGVkIHRvIGJlIGVuY29kZWQgd2l0aG91dCBwYWRkaW5nIGJ5dGVz' * repeat

    # Compare implementations
    print("\nreference_implementation")
    t_start = perf_counter()
    dec1 = reference_implementation(large_b64_string)
    t_end = perf_counter()
    print('decoding time =', (t_end - t_start))

    sleep(1)

    print("\nimplmementation1")
    t_start = perf_counter()
    dec2 = implmementation1(large_b64_string)
    t_end = perf_counter()
    print("Verify result", "Ok" if dec2==dec1 else "FAIL")
    print('decoding time =', (t_end - t_start))
    del dec2; collect()     # force freeing memory to avoid swapping on virtual mem

    sleep(1)

    print("\nthreaded_impl")
    t_start = perf_counter()
    dec3 = threaded_impl(large_b64_string, CPU_NUM)
    t_end = perf_counter()
    print("Verify result", "Ok" if dec3==dec1 else "FAIL")
    print('decoding time =', (t_end - t_start))
    del dec3; collect()

    sleep(1)

    print("\nmutiproc_impl")
    t_start = perf_counter()
    dec4 = mutiproc_impl(large_b64_string, CPU_NUM)
    t_end = perf_counter()
    print("Verify result", "Ok" if dec4==dec1 else "FAIL")
    print('decoding time =', (t_end - t_start))
    del dec4; collect()

Solution

  • TL;DR: Python parallelism sucks due to the global interpreter lock and inter-process communication. Data copies also introduce overheads making your parallel implementations even slower, especially since the operation tends to be memory-bound. A native CPython module can be written to overpass the CPython's limits and strongly speed up the computation.


    First things first, multi-threading in CPython is limited by the global interpreter lock (GIL) which prevents such a computation to be faster than a sequential one (like in nearly all cases except generally I/Os). This point has been pointed out by Barmar in comments.

    Moreover, multi-processing is limited by the inter-process communication (IPC) between workers which is generally slow. This is especially true here since the computation is rather memory intensive and IPC is done using relatively slow pickling internally. Not to mention this IPC operation is mostly done sequentially impacting even further the scalability of the operation if it is not completely memory-bound on the target platform.

    On top of that, operations like encoded[start:end] creates a new bytes which is a (partial) copy of encoded. This increase even further the memory bandwidth pressure which should be already an issue (it is clearly the case on my laptop). The same thing is true for ret = ret + result which create a new growing copy for every process resulting in a quadratic execution.

    With so many copies in a rather memory-bound operation, this is not surprising for the operation to be slower than the sequential part. The thing is you can hardly do better in Python! Without any convoluted tricks, there is no other way to parallelize the operation in pure-Python. I mean all module have to create either CPython threads (GIL bound) or CPython processes (IPC bound). The GIL cannot be released as long as you work on any CPython objects in multiple threads. The only solution is to use native threads operating on bytes' internal buffer (which does not require the GIL to be locked.

    This can be done either using native languages (e.g. C/C++/Rust), Numba or Cython. However, there is another big issue impacting performance to consider: bytes' copies. AFAIK, Numba and Cython prevent you to avoid that. The best you can do is to extract the input memory buffer, write the output in parallel in a native array (not limited by the GIL), and finally then creates a bytes object. The thing is creating this last object take >60% of the time on my machine and there is no way to make it faster because bytes objects are immutable.

    A native Python module written in native language can overpass this limit. Technically, this is also possible in Cython by directly calling C API functions and managing object yourself but this is pretty low-level, and in the end, this looks like more a C code than Python one (with additional Cython annotations). Indeed, the CPython API provides a PyBytes_FromStringAndSize function so to creates a bytes object and it allows developers to write in the bytes' buffer only after creating it without associated buffer (i.e. the first parameter must be NULL). This is the only way to avoid an expensive copy.

    That being said, note that a new bytes object needs to be created and filling its internal buffer results in a lot of pages faults slowing things a bit. AFAIK, there is no way to avoid that in CPython besides not creating huge buffers. In fact, the computation would be faster if you could process the whole string chunk by chunk (if possible) so to benefit from CPU cache and memory recycling of the standard allocator. Indeed, this can strongly reduce the DRAM pressure and avoid page-faults. The bad news is that if you do the chunk by chunk computation in CPython, then I think you cannot benefit from multiple threads anymore. Indeed, chunks will be too small for multiple threads to really worth it, especially on large-scale servers (where multiple threads are also required to saturate the RAM and the L3 cache). CPython's parallelism simply sucks (especially due to the GIL).

    I also found out that base64.b64decode is surprisingly not so efficient. I wrote a faster (but less safe) implementation. There are ways to write a fast and safe implementation (typically thanks to SIMD), but this is complicated and not the purpose of this post. Besides, using multiple threads is enough to make the computation memory-bound on most machines (including mine) so it does not worth it to optimize further the resulting (rather naive) sequential implementation.

    Note I used OpenMP so to parallelize the C loop with only 1 line (for large inputs).

    Here is the base64.c main file of the fast parallel CPython native module (assuming the input is correctly formatted):

    #define PY_SSIZE_T_CLEAN // Required for large bytes objects on 64-bit machines
    #include <Python.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <assert.h>
    #include <omp.h>
    
    int base64_table[256];
    
    // Generate a conversion table for sake of performance
    static inline void init_table()
    {
        static const unsigned char base64_chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
    
        for (int i = 0; i < 64; ++i)
            base64_table[i] = -1;
    
        for (int i = 0; i < 64; ++i)
            base64_table[base64_chars[i]] = i;
    
        base64_table['='] = 0;
    }
    
    static inline int decode_char(unsigned char c)
    {
        return base64_table[c];
    }
    
    // Assume the input is correctly formatted
    static PyObject* decode(PyObject* self, PyObject* args)
    {
        PyObject* input_obj;
    
        // Extract the input parameter and check its type
        if(!PyArg_ParseTuple(args, "O!", &PyBytes_Type, &input_obj))
            return NULL;
    
        char* input = PyBytes_AS_STRING(input_obj);
        Py_ssize_t input_length = PyBytes_GET_SIZE(input_obj);
        assert(input_length % 4 == 0);
    
        int padding = 0;
        padding += input_length >= 1 && input[input_length - 1] == '=';
        padding += input_length >= 2 && input[input_length - 2] == '=';
    
        // Assume there is enough memory
        Py_ssize_t output_length = (input_length / 4) * 3 - padding;
        PyObject* output_obj = PyBytes_FromStringAndSize(NULL, output_length);
        assert(output_obj != NULL);
        char* output = PyBytes_AS_STRING(output_obj);
        assert(output != NULL);
    
        #pragma omp parallel for schedule(guided) if(input_length >= 8*1024*1024)
        for(Py_ssize_t k = 0; k < input_length / 4; ++k)
        {
            const Py_ssize_t i = k * 4;
            const Py_ssize_t j = k * 3;
    
            const int a = decode_char(input[i]);
            const int b = decode_char(input[i + 1]);
            const int c = decode_char(input[i + 2]);
            const int d = decode_char(input[i + 3]);
            assert(a > 0 && b > 0 && c > 0 && d > 0);
    
            const int merged = (a << 18) + (b << 12) + (c << 6) + d;
    
            if(j < output_length) output[j]     = (merged >> 16) & 0xFF;
            if(j < output_length) output[j + 1] = (merged >> 8) & 0xFF;
            if(j < output_length) output[j + 2] = merged & 0xFF;
        }
    
        return output_obj;
    }
    
    static PyMethodDef MyMethods[] = 
    {
        {"decode", decode, METH_VARARGS, "Parallel base64 decoding function."},
        {NULL, NULL, 0, NULL}
    };
    
    static struct PyModuleDef parallel_base64 = 
    {
        PyModuleDef_HEAD_INIT,
        "parallel_base64",
        NULL,
        -1,
        MyMethods
    };
    
    PyMODINIT_FUNC PyInit_parallel_base64(void) 
    {
        init_table();
        return PyModule_Create(&parallel_base64);
    }
    

    and here is the setup.py file to build it:

    from setuptools import setup, Extension
    
    module = Extension(
        'parallel_base64', 
        sources=['base64.c'],
        extra_compile_args=['-fopenmp'],
        extra_link_args=['-fopenmp']
    )
    
    setup(
        name='parallel_base64',
        version='1.0',
        description='A parallel base64 module written in C',
        ext_modules=[module],
    )
    

    You can test it and call it with python setup.py build_ext --inplace. Then you can import it with import parallel_base64 and just call parallel_base64.decode(large_b64_string).


    Performance results

    Using repeat = 30000000, on my Linux laptop with an AMD Ryzen 7 5700U (configured in performance mode) and Python 3.12.3, I get the following results:

    decoding time = 3.6366550829989137
    
    implmementation1
    Verify result Ok
    decoding time = 3.5178445390010893
    
    threaded_impl
    Verify result Ok
    decoding time = 9.623698087001685
    
    mutiproc_impl
    Verify result Ok
    decoding time = 13.102449985999556
    
    cython_impl
    Verify result Ok
    decoding time = 0.29033970499949646
    

    We can see that the native parallel implementation is 12.5 times faster using 8 cores. This is because it not only use multiple cores but also benefit from a more efficient computation. The DRAM throughput reaches 23 GiB/s which is pretty good. It should be far enough for data read from a high-end SSD.

    Note that if you want to read data from a SSD efficiently, then reading it (all at once) from Python to bytes is inefficient (because of copies and page faults). Memory mapping is generally faster, especially on a high-end SSD. This can be done with mmap on Linux. Note that Numpy provides such a feature (though munmap is missing) and Numpy arrays have the benefit to be mutable so they can be reused many times which might help to avoid page-fault performance issues and enable further optimizations.

    In the end, Python is maybe simply not the right too to get good performance for such kind of computation though native modules can help a lot to speed up some specific parts (otherwise, it is not really a Python code anymore).