Search code examples
pythonrnumpylarge-filesgraphing

Reading and graphing data read from huge files


We have pretty large files, the order of 1-1.5 GB combined (mostly log files) with raw data that is easily parseable to a csv, which is subsequently supposed to be graphed to generate a set of graph images.

Currently, we are using bash scripts to turn the raw data into a csv file, with just the numbers that need to be graphed, and then feeding it into a gnuplot script. But this process is extremely slow. I tried to speed up the bash scripts by replacing some piped cuts, trs etc. with a single awk command, although this improved the speed, the whole thing is still very slow.

So, I am starting to believe there are better tools for this process. I am currently looking to rewrite this process in python+numpy or R. A friend of mine suggested using the JVM, and if I am to do that, I will use clojure, but am not sure how the JVM will perform.

I don't have much experience in dealing with these kind of problems, so any advice on how to proceed would be great. Thanks.

Edit: Also, I will want to store (to disk) the generated intermediate data, i.e., the csv, so I don't have to re-generate it, should I choose I want a different looking graph.

Edit 2: The raw data files have one record per one line, whose fields are separated by a delimiter (|). Not all fields are numbers. Each field I need in the output csv is obtained by applying a certain formula on the input records, which may use multiple fields from the input data. The output csv will have 3-4 fields per line, and I need graphs that plot 1-2, 1-3, 1-4 fields in a (may be) bar chart. I hope that gives a better picture.

Edit 3: I have modified @adirau's script a little and it seems to be working pretty well. I have come far enough that I am reading data, sending to a pool of processor threads (pseudo processing, append thread name to data), and aggregating it into an output file, through another collector thread.

PS: I am not sure about the tagging of this question, feel free to correct it.


Solution

  • python sounds to be a good choice because it has a good threading API (the implementation is questionable though), matplotlib and pylab. I miss some more specs from your end but maybe this could be a good starting point for you: matplotlib: async plotting with threads. I would go for a single thread for handling bulk disk i/o reads and sync queueing to a pool of threads for data processing (if you have fixed record lengths things may get faster by precomputing reading offsets and passing just the offsets to the threadpool); with the diskio thread I would mmap the datasource files, read a predefined num bytes + one more read to eventually grab the last bytes to the end of the current datasource lineinput; the numbytes should be chosen somewhere near your average lineinput length; next is pool feeding via the queue and the data processing / plotting that takes place in the threadpool; I don't have a good picture here (of what are you plotting exactly) but I hope this helps.

    EDIT: there's file.readlines([sizehint]) to grab multiple lines at once; well it may not be so speedy cuz the docs are saying its using readline() internally

    EDIT: a quick skeleton code

    import threading
    from collections import deque
    import sys
    import mmap
    
    
    class processor(Thread):
        """
            processor gets a batch of data at time from the diskio thread
        """
        def __init__(self,q):
            Thread.__init__(self,name="plotter")
            self._queue = q
        def run(self):
            #get batched data 
            while True:
                #we wait for a batch
                dataloop = self.feed(self._queue.get())
                try:
                    while True:
                        self.plot(dataloop.next())
                except StopIteration:
                    pass
                #sanitizer exceptions following, maybe
    
        def parseline(self,line):
            """ return a data struct ready for plotting """
            raise NotImplementedError
    
        def feed(self,databuf):
            #we yield one-at-time datastruct ready-to-go for plotting
            for line in databuf:
                yield self.parseline(line)
    
        def plot(self,data):
            """integrate
            https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
            maybe
            """
    class sharedq(object):
        """i dont recall where i got this implementation from 
        you may write a better one"""
        def __init__(self,maxsize=8192):
            self.queue = deque()
            self.barrier = threading.RLock()
            self.read_c = threading.Condition(self.barrier)
            self.write_c = threading.Condition(self.barrier)
            self.msz = maxsize
        def put(self,item):
            self.barrier.acquire()
            while len(self.queue) >= self.msz:
                self.write_c.wait()
            self.queue.append(item)
            self.read_c.notify()
            self.barrier.release()
        def get(self):
            self.barrier.acquire()
            while not self.queue:
                self.read_c.wait()
            item = self.queue.popleft()
            self.write_c.notify()
            self.barrier.release()
            return item
    
    
    
    q = sharedq()
    #sizehint for readine lines
    numbytes=1024
    for i in xrange(8):
        p = processor(q)
        p.start()
    for fn in sys.argv[1:]
        with open(fn, "r+b") as f:
            #you may want a better sizehint here
            map = mmap.mmap(f.fileno(), 0)
            #insert a loop here, i forgot
            q.put(map.readlines(numbytes))
    
    #some cleanup code may be desirable