Search code examples
pythonmultiprocessingreadfile

Python Read huge file line per line and send it to multiprocessing or thread


I have been trying to get my code to work for many days, I am desperate. I've scoured the internet, but I still can't find it.

I have a text file encoded in "latin-1" of 9GB -> 737 022 387 lines, each line contains a string.

I would like to read each line and send them in an http PUT request that waits for a response, and returns TRUE or FALSE if the response is 200 or 400 The PUT request takes about 1 to 3 seconds, so to speed up the processing time I would like to use either a Thread or a multiprocessing.

To start, I simulate my PUT request with a sleep of 3 seconds. and even that I can't get it to work

This code split my string into char, i don't know why...

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,line )
        print(res)

This give error : TypeError: process_line() takes 1 positional argument but 17 were given

import multiprocessing
from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    with open(r"d:\txtFile",encoding="latin-1") as file:
        res = pool.apply(process_line,file.readline() )
        print(res)

that : Crash the computer

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,peon )
        print(res)

Solution

  • Although the problem seems unrealistic though. shooting 737,022,387 requests! calculate how many months it'll take from single computer!!

    Still, Better way to do this task is to read line by line from file in a separate thread and insert into a queue. And then multi-process the queue.

    Solution 1:

    from multiprocessing import Queue, Process
    from threading import Thread
    from time import sleep
    
    urls_queue = Queue()
    max_process = 4
    
    def read_urls():
        with open('urls_file.txt', 'r') as f:
            for url in f:
                urls_queue.put(url.strip())
                print('put url: {}'.format(url.strip()))
    
        # put DONE to tell send_request_processor to exit
        for i in range(max_process):
            urls_queue.put("DONE")
    
    
    def send_request(url):
        print('send request: {}'.format(url))
        sleep(1)
        print('recv response: {}'.format(url))
    
    
    def send_request_processor():
        print('start send request processor')
        while True:
            url = urls_queue.get()
            if url == "DONE":
                break
            else:
                send_request(url)
    
    
    def main():
        file_reader_thread = Thread(target=read_urls)
        file_reader_thread.start()
    
        procs = []
        for i in range(max_process):
            p = Process(target=send_request_processor)
            procs.append(p)
            p.start()
    
        for p in procs:
            p.join()
    
        print('all done')
        # wait for all tasks in the queue
        file_reader_thread.join()
    
    
    if __name__ == '__main__':
        main()
    

    Demo: https://onlinegdb.com/Elfo5bGFz

    Solution 2:

    You can use tornado asynchronous networking library

    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.queues import Queue
    
    q = Queue(maxsize=2)
    
    async def consumer():
        async for item in q:
            try:
                print('Doing work on %s' % item)
                await gen.sleep(0.01)
            finally:
                q.task_done()
    
    async def producer():
        with open('urls_file.txt', 'r') as f:
            for url in f:
                await q.put(url)
                print('Put %s' % item)
    
    async def main():
        # Start consumer without waiting (since it never finishes).
        IOLoop.current().spawn_callback(consumer)
        await producer()     # Wait for producer to put all tasks.
        await q.join()       # Wait for consumer to finish all tasks.
        print('Done')
        # producer and consumer can run in parallel
    
    IOLoop.current().run_sync(main)