Search code examples
pythonpandascythonopentsdb

Convert big pandas DataFrame efficiently


I have several metrics (2000 rows x 100 columns dataframes per second, could be bigger) and I want to store them in OpenTSDB. To do so, I need to format the values in a way understood by the database, either telnet style or json style.

The problem is that with a naive python function, I can't process them fast enough. Here is my first approach:

def ndarray_to_opentsdb_series_comprehension(frame, table, **tags):
    series = [{
        "metric": '{table}.{col}'.format(
            table=table, col=frame.columns[col].item()
         ),
         "timestamp": frame.index[idx].item(),
         "value": val.item(),
         "tags": tags
    } for col, serie in frame.iteritems() for idx, val in serie.iteritems()]
    return json.dumps(series)

Using timeit on a 2000x100 dataframe, I get:

In [1]: %timeit utilities.ndarray_to_opentsdb_series_comprehension(f, 't1', p1='p1')
1 loops, best of 3: 3.9 s per loop

I then tried using the DataFrame.apply function to more efficiently iterate over my data, but I have to do it several times in order to get all the info I need:

def ndarray_to_opentsdb_series_tmp_df(frame, table, **tags):
    tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
    df = frame.apply(lambda s: s.apply(lambda e: '{ts} {v} {tags}'.format(ts=s.name, v=e, tags=tags_str)), axis=1)
    df = df.apply(lambda s: s.apply(lambda e: '{table}.{col} {v}'.format(table=table, col=s.name, v=e)), axis=0)
    flat = [e for l in df.values.tolist() for e in l]
    return '\n'.join(flat)

(I tried other implementations that did not create multiple dataframes but it was roughly as fast as this one).

Here, timeit says:

In[1]: %timeit utilities.ndarray_to_opentsdb_series_tmp_df(f, 't1', p1='p1')
1 loops, best of 3: 2.59 s per loop

I've gained more than a second but that's still not enough, I need to be able to process that much data under a second. During my tests, I realized that what's the most time consuming is retrieving the index-column pair for a given value in my DataFrame, but I need those to build my OpenTSDB request.

Is there a way to process big DataFrames using only python, or should I try implementing this logic in Cython? I know I can gain huge improvements, but I want to make sure I have the optimal python code before trying to optimize by using lower level languages.


Solution

  • Alright I managed to process my 2000 rows x 100 columns DataFrame in ~.5 seconds. Using prun in ipython, I saw that accessing the frame.columns and frame.index was expensive, as was using string.format.

    I chose to first convert both the columns and indices of my DataFrame to Python lists using their tolist() method, and then index into that. I stopped using string.format and used the % formatter instead (this alone reduced the execution of my function by a second!).

    Also, I used the raw attribute of the DataFrame.apply method to get a numpy.ndarray as an argument to my lambda function instead of a pandas.Series. I then iterate over it using list comprehension.

    Here is my modified function:

    def ndarray_to_opentsdb_series(frame, table, **tags):
        tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
        indices = frame.index.tolist()
        columns = frame.columns.tolist()
        df = frame.apply(lambda s: ['%d %d %s' % (indices[i], e, tags_str) for i, e in enumerate(s)], axis=0, raw=True)
        df = df.apply(lambda s: ['%s.%d %s' % (table, columns[i], e) for i, e in enumerate(s)], axis=1, raw=True)
        flat = [e for l in df.values.tolist() for e in l]
        return '\n'.join(flat)
    

    Simply compiling it as Cython code reduces the run time by another 100ms, I will try optimizing it more in Cython now.