Search code examples
pythonipcpyarrowapache-arrow

How to use Apache Arrow IPC from multiple processes (possibly from different languages)?


I'm not sure where to begin, so looking for some guidance. I'm looking for a way to create some arrays/tables in one process, and have it accessible (read-only) from another.

So I create a pyarrow.Table like this:

a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])

a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
#   0,
#   1,
#   2
# ]

a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
#   "foo",
#   "bar",
#   "baz"
# ]

tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])

tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]

Now how do I read this from a different process? I thought I would use multiprocessing.shared_memory.SharedMemory, but that didn't quite work:

shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out:
    for batch in tbl.to_batches():
        out.write(batch)

# TypeError: Unable to read from object of type: <class 'memoryview'>

Do I need to wrap the shm.buf with something?

Even if I get this to work, it seems very fiddly. How would I do this in a robust manner? Do I need something like zmq?

I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?

In my real use case, I also want to talk to Julia, but maybe that should be a separate question when I come to it.

PS: I have gone through the docs, it didn't clarify this part for me.


Solution

  • Do I need to wrap the shm.buf with something?

    Yes, you can use pa.py_buffer() to wrap it:

    size = calculate_ipc_size(table)
    shm = shared_memory.SharedMemory(create=True, name=name, size=size)
    
    stream = pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf))
    with pa.RecordBatchStreamWriter(stream, table.schema) as writer:
       writer.write_table(table)
    

    Also, for size you need to calculate the size of the IPC output, which may be a bit larger than Table.nbytes. The function you can use for that is:

    def calculate_ipc_size(table: pa.Table) -> int:
        sink = pa.MockOutputStream()
        with pa.ipc.new_stream(sink, table.schema) as writer:
            writer.write_table(table)
        return sink.size()
    

    How would I do this in a robust manner?

    Not sure of this part yet. In my experience the original process needs to stay alive while the others are reusing the buffers, but there might be a way to get around that. This is likely connected to this bug in CPython: https://bugs.python.org/issue38119

    I'm not clear how this is zero copy though. When I write the record batches, isn't that serialisation? What am I missing?

    You are correct that writing the Arrow data into an IPC buffer does involve copies. The zero-copy part is when other processes read the data from shared memory. The columns of the Arrow table will reference the relevant segments of the IPC buffer, rather than a copy.