Search code examples
pythonpython-polars

Is Polars Dataframe thread-safe?


The Polars Python library has many promising features such as parallelism and fast calculation speed. It's written here and there in the docs.

However, what's not so obvious is the thread safety of Polars dataframe, especially when it comes to modifying values.

I've heard that Pandas Dataframes are not thread-safe, which means you should NOT modify a Pandas Dataframe from multiple threads.

Say, if there was a code like this:

import threading
import polars as pl

df = pl.DataFrame({'a': range(5), 'b': range(5)})


def work_parallelly():
    for _ in range(100000):
        df[2, 'b'] += 1


thread = threading.Thread(target=work_parallelly)
thread.start()
for _ in range(100000):
    df[2, 'a'] += 1

thread.join()

print(df)

Would this be considered safe? The result looks like it's working properly.

shape: (5, 2)
┌────────┬────────┐
│ a      ┆ b      │
│ ---    ┆ ---    │
│ i64    ┆ i64    │
╞════════╪════════╡
│ 0      ┆ 0      │
│ 1      ┆ 1      │
│ 100002 ┆ 100002 │
│ 3      ┆ 3      │
│ 4      ┆ 4      │
└────────┴────────┘

Also, is the .collect() method considered thread-safe?


Solution

  • If your code is using multiple Python threads to change values in a given dataframe, you are subject to data corruption on the DF.

    That, however, should be easy to workaround for most cases by using simple thread locks (threading.Lock class) - to ensure a single Python thread will attempt to modify a given df at once.

    If the multi-threaded nature of your Python code imply in "getting to know" about multiple changes to the same Dataframe, another simple fix is, instead of trying to eagerly apply your modifications as you gather information on the operations that have to be performed, to instead, serialize a description of these operations in a thread queue (queue.Queue) - and keep a single working thread that will read this queue and issue the Python code for these modifications in series. (Polars will then automatically paralelize each operation, but they will be applied one after the other if they come from a single Python thread)/

    This code is much slower for your example, because each single "+= 1 " op has to go through a serialization/deserialization of the parameters in the queue, and an action. (On the other hand, the "+=" operation inside a Python loop as you put in your example is thread safe, because the Python side will ensure only one pure Python "+=" operation runs at once, due to the GIL. For broadcasting, and other inner transforms in the df data, this safety does not apply):

    import threading
    import polars as pl
    from queue import Queue
    from operator import iadd
    
    df = pl.DataFrame({'a': range(5), 'b': range(5)})
    q = Queue()
    _SENTINEL = None
    
    def worker():
        while True:
            op, args = q.get()
    
            if op is _SENTINEL:
                break
            elif callable(op):
                op(*args)
            elif isinstance(op, str):
                # inplace augmented operators for elements
                # of an array can't be passed as functions: -
                # so we can create a mini protocol, where the
                # operation name comes as a string:
                match op:
                    case "iadd":
                        args[0][args[1], args[2]] += args[3]
                    case _:
                        pass
    
    
    
    def work_parallelly(col):
        for _ in range(100_000):
            q.put(("iadd", (df, 2, col, 1)))
    
    
    worker_thread = threading.Thread(target=worker)
    thread1 = threading.Thread(target=work_parallelly, args=("a",))
    thread2 = threading.Thread(target=work_parallelly, args=("b",))
    worker_thread.start()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    q.put((None, None))
    worker_thread.join()
    
    
    print(df)