I have several metrics (2000 rows x 100 columns dataframes per second, could be bigger) and I want to store them in OpenTSDB. To do so, I need to format the values in a way understood by the database, either telnet style or json style.
The problem is that with a naive python function, I can't process them fast enough. Here is my first approach:
def ndarray_to_opentsdb_series_comprehension(frame, table, **tags):
series = [{
"metric": '{table}.{col}'.format(
table=table, col=frame.columns[col].item()
),
"timestamp": frame.index[idx].item(),
"value": val.item(),
"tags": tags
} for col, serie in frame.iteritems() for idx, val in serie.iteritems()]
return json.dumps(series)
Using timeit
on a 2000x100 dataframe, I get:
In [1]: %timeit utilities.ndarray_to_opentsdb_series_comprehension(f, 't1', p1='p1')
1 loops, best of 3: 3.9 s per loop
I then tried using the DataFrame.apply
function to more efficiently iterate over my data, but I have to do it several times in order to get all the info I need:
def ndarray_to_opentsdb_series_tmp_df(frame, table, **tags):
tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
df = frame.apply(lambda s: s.apply(lambda e: '{ts} {v} {tags}'.format(ts=s.name, v=e, tags=tags_str)), axis=1)
df = df.apply(lambda s: s.apply(lambda e: '{table}.{col} {v}'.format(table=table, col=s.name, v=e)), axis=0)
flat = [e for l in df.values.tolist() for e in l]
return '\n'.join(flat)
(I tried other implementations that did not create multiple dataframes but it was roughly as fast as this one).
Here, timeit
says:
In[1]: %timeit utilities.ndarray_to_opentsdb_series_tmp_df(f, 't1', p1='p1')
1 loops, best of 3: 2.59 s per loop
I've gained more than a second but that's still not enough, I need to be able to process that much data under a second. During my tests, I realized that what's the most time consuming is retrieving the index-column pair for a given value in my DataFrame, but I need those to build my OpenTSDB request.
Is there a way to process big DataFrames using only python, or should I try implementing this logic in Cython? I know I can gain huge improvements, but I want to make sure I have the optimal python code before trying to optimize by using lower level languages.
Alright I managed to process my 2000 rows x 100 columns DataFrame in ~.5 seconds. Using prun
in ipython
, I saw that accessing the frame.columns
and frame.index
was expensive, as was using string.format
.
I chose to first convert both the columns and indices of my DataFrame to Python lists using their tolist()
method, and then index into that. I stopped using string.format
and used the %
formatter instead (this alone reduced the execution of my function by a second!).
Also, I used the raw
attribute of the DataFrame.apply
method to get a numpy.ndarray
as an argument to my lambda function instead of a pandas.Series
. I then iterate over it using list comprehension.
Here is my modified function:
def ndarray_to_opentsdb_series(frame, table, **tags):
tags_str = ' '.join('{k}={v}'.format(k=k, v=v) for k, v in tags.items())
indices = frame.index.tolist()
columns = frame.columns.tolist()
df = frame.apply(lambda s: ['%d %d %s' % (indices[i], e, tags_str) for i, e in enumerate(s)], axis=0, raw=True)
df = df.apply(lambda s: ['%s.%d %s' % (table, columns[i], e) for i, e in enumerate(s)], axis=1, raw=True)
flat = [e for l in df.values.tolist() for e in l]
return '\n'.join(flat)
Simply compiling it as Cython code reduces the run time by another 100ms, I will try optimizing it more in Cython now.