Search code examples
pythonperformancenumpyzero-copy

can you create a np.complex128 1d array from two np.float 1d arrays without copy?


The Setup :

I have two arrays from shared memory reals and imags :

#/usr/bin/env python2

reals = multiprocessing.RawArray('d', 10000000)
imags = multiprocessing.RawArray('d', 10000000)

then I make them numpy-arrays, named reals2 and imags2, without any copy :

import numpy as np

reals2 = np.frombuffer(reals)
imags2 = np.frombuffer(imags)

# check if the objects did a copy
assert reals2.flags['OWNDATA'] is False
assert imags2.flags['OWNDATA'] is False

I would like to then make a np.complex128 1D-array data, again without copying the data, but I don't know how to.

The Questions :

Can you make a np.complex128 1D-array data from a pair of float arrays, without copying, yes/no?

If yes, how?


Solution

  • Short answer: no. But if you control the sender then there is a solution that does not require copying.

    Longer answer:

    • from my research I do not think there is a way to create a numpy complex array from two separate arrays without copying the data
    • IMO i think that you can not do this because all the numpy compiled c code assumes interleaved real, imag data

    if you control the sender, you can get your data without any copy operations. here's how!

    #!/usr/bin/env python2
    import multiprocessing
    import numpy as np
    
    # parent process creates some data that needs to be shared with the child processes
    data = np.random.randn(10) + 1.0j * np.random.randn(10)
    assert data.dtype == np.complex128
    # copy the data from the parent process to shared memory
    shared_data = multiprocessing.RawArray('d', 2 * data.size)
    shared_data[0::2] = data.real
    shared_data[1::2] = data.imag
    # simulate the child process getting only the shared_data
    data2 = np.frombuffer(shared_data)
    assert data2.flags['OWNDATA'] is False
    assert data2.dtype == np.float64
    assert data2.size == 2 * data.size
    # convert reals to complex
    data3 = data2.view(np.complex128)
    assert data3.flags['OWNDATA'] is False
    assert data3.dtype == np.complex128
    assert data3.size == data.size
    assert np.all(data3 == data)
    # done - if no AssertionError then success
    print 'success'
    

    hat tip to: https://stackoverflow.com/a/32877245/52074 as a great starting point.

    here's how to do the same processing but with multiple processes being started and getting the data back from each process and verifying the returned data

    #!/usr/bin/env python2
    import multiprocessing
    import os
    # third-party
    import numpy as np
    
    # constants
    # =========
    N_POINTS = 3
    N_THREADS = 4
    
    # functions
    # =========
    def func(index, shared_data, results_dict):
        # simulate the child process getting only the shared_data
        data2 = np.frombuffer(shared_data)
        assert data2.flags['OWNDATA'] is False
        assert data2.dtype == np.float64
        # convert reals to complex
        data3 = data2.view(np.complex128)
        assert data3.flags['OWNDATA'] is False
        assert data3.dtype == np.complex128
        print '[child.pid=%s,type=%s]: %s'%(os.getpid(), type(shared_data), data3)
        # return the results in a SLOW but relatively easy way
        results_dict[os.getpid()] = np.copy(data3) * index
    
    # the script
    # ==========
    if __name__ == '__main__':
        # parent process creates some data that needs to be shared with the child processes
        data = np.random.randn(N_POINTS) + 1.0j * np.random.randn(N_POINTS)
        assert data.dtype == np.complex128
    
        # copy the data from the parent process to shared memory
        shared_data = multiprocessing.RawArray('d', 2 * data.size)
        shared_data[0::2] = data.real
        shared_data[1::2] = data.imag
        print '[parent]: ', type(shared_data), data
    
        # do multiprocessing
        manager = multiprocessing.Manager()
        results_dict = manager.dict()
        processes = []
        for index in xrange(N_THREADS):
            process = multiprocessing.Process(target=func, args=(index, shared_data, results_dict))
            processes.append(process)
        for process in processes:
            process.start()
        for process in processes:
            process.join()
    
        # get the results back from the processes
        results = [results_dict[process.pid] for process in processes]
        # verify the values from the processes
        for index in xrange(N_THREADS):
            result = results[index]
            assert np.all(result == data * index)
        del processes
    
        # done
        print 'success'