Search code examples
pythonconcurrencyproducer-consumer

Producer-consumer problem - trying to save into a csv file


so this seemingly simple problem is doing my head in.

I have a dataset (datas) and I do some processing on it (this isn't the issue, though this takes time owing to the size of the dataset) to produce multiple rows to be stored into a CSV file. However, it is very taxing to produce a row, then save it to csv, then produce a row and then save it etc.

So I'm trying to implement producer and consumer threads - producers will produce each row of data (to speed up the process), store in a queue and a single consumer will then append to my csv file.

My attempts below result in success sometimes (the data is correctly saved) or other times the data is "cut off" (either an entire row or part of it).

What am I doing wrong?


from threading import Thread
from queue import Queue
import csv

q = Queue()

def producer():
    datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
    for data in datas:
        q.put(data)


def consumer():
    while True:
        local = q.get()
        file = open('dataset.csv','a')
        with file as fd:
            writer = csv.writer(fd)
            writer.writerow(local)
        file.close()

        q.task_done()


for i in range(10):
    t = Thread(target=consumer)
    t.daemon = True
    t.start()

producer()

q.join()


Solution

  • I think this does something similar to what you're trying to do. For testing purposes, it prefixes each row of data in the CSV file produced with a "producer id" so the source of the data can be seen in the results.

    As you will be able to see from the csv file produced, all the data produced gets put into it.

    import csv
    import random
    from queue import Queue
    from threading import Thread
    import time
    
    SENTINEL = object()
    
    def producer(q, id):
        data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
                ("bye", "hat"))
        for datum in data:
            q.put((id,) + datum)  # Prefix producer ID to datum for testing.
            time.sleep(random.random())  # Vary thread speed for testing.
    
    
    class Consumer(Thread):
        def __init__(self, q):
            super().__init__()
            self.q = q
    
        def run(self):
            with open('dataset.csv', 'w', newline='') as file:
                writer = csv.writer(file, delimiter=',')
                while True:
                    datum = self.q.get()
                    if datum is SENTINEL:
                        break
                    writer.writerow(datum)
    
    def main():
        NUM_PRODUCERS = 10
        queue = Queue()
    
        # Create producer threads.
        threads = []
        for id in range(NUM_PRODUCERS):
            t = Thread(target=producer, args=(queue, id+1,))
            t.start()
            threads.append(t)
    
        # Create Consumer thread.
        consumer = Consumer(queue)
        consumer.start()
    
        # Wait for all producer threads to finish.
        while threads:
            threads = [thread for thread in threads if thread.is_alive()]
    
        queue.put(SENTINEL)  # Indicate to consumer thread no more data.
        consumer.join()
        print('Done')
    
    if __name__ == '__main__':
        main()