Search code examples
pythonperformancepandasnumpylogfile-analysis

Expanding timeseries events with Pandas


Problem

I'm looking for suggestions on how to make this more pythonic and improve the efficiency.

I have a dataframe with events, each having at minimum a start and end timestamp. I am expanding the number of records so that the new table has one record for each hour the interval overlaps.

This is basically the same usecase as the IntervalMatch function found in QlikView.

Example: An event from 18:00-20:00 expands to two distinct records, one for 18:00-19:00 and another for 19:00-20:00.

Current solution

I have a fully working solution, but I think it is rather ugly and it is quite slow on large datasets with >100k rows and 10-20 columns.

import pandas as pd
from datetime import timedelta

def interval_match(df):

    intervals = []

    def perdelta(start, end, delta):
        curr = start.replace(minute=0, second=0)
        while curr < end:
            yield curr
            curr += delta

    def interval_split(x):

        for t in perdelta(x.Start, x.End, timedelta(hours=1)):
            _ = ([x.id,
                  x.Start,
                  x.End,
                  max(t, x.Start),
                  min((t+timedelta(hours=1), x.End))])

            intervals.append(_)

    df.apply(interval_split, axis=1)

    ndf = pd.DataFrame(intervals, 
                       columns=['id', 
                                'Start', 
                                'End', 
                                'intervalStart', 
                                'intervalEnd'])

    ndf['Duration'] = ndf.iEnd - ndf.iStart

    return ndf

With some example data, the function interval_match() can be used like this:

# Some example data
df = pd.DataFrame({'End': {0: pd.Timestamp('2016-01-01 09:24:20')},
                   'Start': {0: pd.Timestamp('2016-01-01 06:56:10')},
                   'id': {0: 1234562}})


# Running the function
interval_match(df).to_dict()


# Output
{'Duration': {0: Timedelta('0 days 00:03:50'),
              1: Timedelta('0 days 01:00:00'),
              2: Timedelta('0 days 01:00:00'),
              3: Timedelta('0 days 00:24:20')},
      'End': {0: Timestamp('2016-01-01 09:24:20'),
              1: Timestamp('2016-01-01 09:24:20'),
              2: Timestamp('2016-01-01 09:24:20'),
              3: Timestamp('2016-01-01 09:24:20')},
    'Start': {0: Timestamp('2016-01-01 06:56:10'),
              1: Timestamp('2016-01-01 06:56:10'),
              2: Timestamp('2016-01-01 06:56:10'),
              3: Timestamp('2016-01-01 06:56:10')},
'intervalEnd':{0: Timestamp('2016-01-01 07:00:00'),
              1: Timestamp('2016-01-01 08:00:00'),
              2: Timestamp('2016-01-01 09:00:00'),
              3: Timestamp('2016-01-01 09:24:20')},
'intervalStart': {0: Timestamp('2016-01-01 06:56:10'),
              1: Timestamp('2016-01-01 07:00:00'),
              2: Timestamp('2016-01-01 08:00:00'),
              3: Timestamp('2016-01-01 09:00:00')},
       'id': {0: 1234562, 
              1: 1234562, 
              2: 1234562, 
              3: 1234562}}

My desire is to

  1. Make this more efficient, preferrably using built in Pandas functions or some numpy magic.
  2. Not have to deal with the columns as I do in the interval_split function today. Just operate on, and expand the entire dataframe.

Appreciations for any suggestions or help.


Solution

  • I made a variant (inspired by your code) and it ran very slowly. I was getting ~5 minutes for processing 20k rows of data, and the culprit after profiling was the .append. There is a trick to put all the records into a dictionary, and then to use a DataFrame's from_dict method. Using from_dict for the same 20k rows, it completed in about 5 seconds (so ~60x faster).

    I've attached my code that was inspired by yours, and it is also generic for the column inputs (my test use vs production use is difference).

    import pandas as pd
    from collections import namedtuple
    from datetime import timedelta
    
    Interval = namedtuple('Interval', 'field_name start_time end_time delta')
    
    class IntervalMatch(object):
    
        def __init__(self):
            pass
    
        def per_delta(self,interval: Interval, include_start: bool):
            current_interval = interval.start_time
            if not include_start:
                current_interval += pd.DateOffset(seconds=interval.delta)
    
            while current_interval < interval.end_time:
                yield current_interval
                current_interval += pd.DateOffset(seconds=interval.delta)
    
        def _copy(self, row, columns: pd.Index):
            values = pd.Series(row).values
            return pd.DataFrame([values], columns=columns.values).copy(True)
    
        def interval_split(self, interval: Interval, base_row: pd.Series, columns: pd.Index, include_start: bool):
            for time in self.per_delta(interval, include_start):
                extended_row = self._copy(base_row, columns)
                extended_row.at[(0, interval.field_name)] = time
                yield extended_row
    
        def get_exploded_records(self, data_to_examine: pd.DataFrame, time_field_name: str):
            last_row = None
            results = pd.DataFrame()
            delta = 1 # second
    
            time_col_index = data_to_examine.columns.get_loc(time_field_name)
    
            # process each row.  It is possible there is a map/reduce/fluent way of doing this w/ Pandas
            intermediate_results = {}
            current_row = -1
            for row in data_to_examine.itertuples(index=False):
                current_row += 1
                if last_row is None:
                    last_row = row
                    intermediate_results[current_row] = row
                    continue
    
                total_seconds = (row[time_col_index] - last_row[time_col_index]).total_seconds()
                if total_seconds > 1 and total_seconds < 100:
                    # there is a gap, so we want to explode the gap into the data and fill it with last_row values.
                    interval = Interval(time_field_name, last_row[time_col_index], row[time_col_index], delta)
                    for intrvl in self.interval_split(interval, last_row, data_to_examine.columns, False):
                        # we must unroll the list of rows to just the first row (since there is only one)
                        intermediate_results[current_row] = intrvl.values[0]
                        current_row += 1
    
                # append the current row
                intermediate_results[current_row] = row
                last_row = row
    
            results = pd.DataFrame.from_dict(intermediate_results, orient='index') #, columns=data_to_examine.columns)
            return results
    
    def test():
            print("Preparing Data")
            timestamps = ['2016-01-01 09:24:20', '2016-01-01 09:24:21',
                          '2016-01-01 09:24:23', '2016-01-01 09:24:24', '2016-01-01 09:24:40']
            data_with_gaps = pd.DataFrame({'timestamp':[pd.Timestamp(timestamp) for timestamp in timestamps],
                                           'names':['Torial', 'Torial', 'Knut', 'Knut', 'Torial'],
                                           'action':['Add','Edit','Add', 'Edit','Delete']})
    
            interval = IntervalMatch()
            print("Getting Exploded Records")
            exploded = interval.get_exploded_records(data_with_gaps, 'timestamp')
            print(f"Data with Gaps: {data_with_gaps}")
            print(f"Exploded: {exploded}")
            exploded.to_csv("Exploded_test.csv")