Search code examples
pythonpandasdataframetime-seriespython-datetime

Keep track of data revisions efficiently


I get a data feed like datetime,value.

The API gives me the entire history, about 10,000 observations, and it adds new observations about every hour.

I need to add new data to a dataframe and run some analytics on it and save the df. Usually I just get new observation when I run my code.

Sometimes some old observations get revised, i.e. the API might change some observation I have already downloaded and processed.

The feed does not flag such revisions and I need to keep track of that. Suppose I get 2 data revisions (over different calls to the API) each changing the value for 2023-09-12 08:00

I then need to have the following rows in my df

date, value, revision
2023-09-12 08:00, 1000, 0
2023-09-12 08:00, 2000, 1 # <- this is a revised observation. 
2023-09-12 08:00, 1000, 2 # <- this is a second revised observation. 
2023-09-13 06:00, 1500, 0`

It is easy to identify and label revisions by looping over the df and seeing if something changed, but that is SLOW.

just wondering if there was a fast and clever way to do that


Solution

  • Let's make sure we have an understanding:

    Assuming 2 dataframes,

    • one being the repository where changes are being tracked
    • the other being the feed of new data, typically containing
      • old timestamps (history),
      • modified timestamps (revisions)
      • new timestamps

    Then feed will be filtered to retain only the non-redundant timestamps, numbered by revision, and appended to the repository.


    Input data

    Repository:

    repo = pd.DataFrame(columns = ['date', 'value', 'revision'],
                        data = [['2023-09-01 08:00', 500, 0],
                                ['2023-09-02 08:00', 1000, 0],
                                ['2023-09-02 08:00', 2000, 1]]
                       ).astype({'date':'datetime64[ns]'})
    
                     date  value  revision
    0 2023-09-01 08:00:00    500         0
    1 2023-09-02 08:00:00   1000         0
    2 2023-09-02 08:00:00   2000         1
    

    Feed:

    feed = pd.DataFrame(columns = ['date', 'value'],
                        data = [['2023-09-01 08:00', 500],   # unchanged
                                ['2023-09-02 08:00', 1000],  # revision: back to 1000
                                ['2023-09-03 08:00', 1500]]  # new timestamp
                       ).astype({'date':'datetime64[ns]'}) 
    
                     date  value
    0 2023-09-01 08:00:00    500
    1 2023-09-02 08:00:00   1000
    2 2023-09-03 08:00:00   1500
    

    Processing

    Step 1: Concatenate both repo and feed, but document each row's source in a temporary column

    Revision:

    revi = pd.concat([repo.assign(source='repo'), 
                      feed.assign(source='feed')]
                    ).sort_values(by='date').reset_index(drop=True)
    
                     date  value  revision source
    0 2023-09-01 08:00:00    500       0.0   repo
    1 2023-09-01 08:00:00    500       NaN   feed
    2 2023-09-02 08:00:00   1000       0.0   repo
    3 2023-09-02 08:00:00   2000       1.0   repo
    4 2023-09-02 08:00:00   1000       NaN   feed
    5 2023-09-03 08:00:00   1500       NaN   feed
    

    Step 2. Assign revision number

    A little technical, that one, but simply put, the size (len) of each group of rows with identical timestamp, is used to calculate a revision number.

    aggf = len
    
    revi.loc[revi.revision.isna(), 'revision'] = revi.set_index('date')['revision'].groupby(level=0).agg(aggf).values - 1
    
                     date  value  revision source
    0 2023-09-01 08:00:00    500       0.0   repo
    1 2023-09-01 08:00:00    500       1.0   feed
    2 2023-09-02 08:00:00   1000       0.0   repo
    3 2023-09-02 08:00:00   2000       1.0   repo
    4 2023-09-02 08:00:00   1000       2.0   feed
    5 2023-09-03 08:00:00   1500       0.0   feed
    

    Step 3: Remove feed rows that are not new information

    revi = revi.drop_duplicates(subset=['date','source'], keep='last'   # keep only the latest revision for each repo timestamp
              ).drop_duplicates(subset=['date','value'],  keep='first') # remove feed rows that only repeated the repo.
    
                     date  value  revision source
    0 2023-09-01 08:00:00    500       0.0   repo
    3 2023-09-02 08:00:00   2000       1.0   repo
    4 2023-09-02 08:00:00   1000       2.0   feed
    5 2023-09-03 08:00:00   1500       0.0   feed
    

    Step 4: Finaly clean the revision dataframe by removing repo rows, then 'source' column

    revi = revi[revi.source=='feed'].drop(columns='source')
    
                     date  value  revision
    4 2023-09-02 08:00:00   1000       2.0
    5 2023-09-03 08:00:00   1500       0.0
    

    Those are the 2 rows from the current feed that are required to be committed to repository:

    repo = pd.concat([repo, revi]
            ).sort_values(by=['date','revision']
            ).reset_index(drop=True)