I get a data feed like
datetime,value
.
The API gives me the entire history, about 10,000 observations, and it adds new observations about every hour.
I need to add new data to a dataframe and run some analytics on it and save the df. Usually I just get new observation when I run my code.
Sometimes some old observations get revised, i.e. the API might change some observation I have already downloaded and processed.
The feed does not flag such revisions and I need to keep track of that. Suppose I get 2 data revisions (over different calls to the API) each changing the value for 2023-09-12 08:00
I then need to have the following rows in my df
date, value, revision
2023-09-12 08:00, 1000, 0
2023-09-12 08:00, 2000, 1 # <- this is a revised observation.
2023-09-12 08:00, 1000, 2 # <- this is a second revised observation.
2023-09-13 06:00, 1500, 0`
It is easy to identify and label revisions by looping over the df and seeing if something changed, but that is SLOW.
just wondering if there was a fast and clever way to do that
Let's make sure we have an understanding:
Assuming 2 dataframes,
Then feed will be filtered to retain only the non-redundant timestamps, numbered by revision, and appended to the repository.
Input data
Repository:
repo = pd.DataFrame(columns = ['date', 'value', 'revision'],
data = [['2023-09-01 08:00', 500, 0],
['2023-09-02 08:00', 1000, 0],
['2023-09-02 08:00', 2000, 1]]
).astype({'date':'datetime64[ns]'})
date value revision
0 2023-09-01 08:00:00 500 0
1 2023-09-02 08:00:00 1000 0
2 2023-09-02 08:00:00 2000 1
Feed:
feed = pd.DataFrame(columns = ['date', 'value'],
data = [['2023-09-01 08:00', 500], # unchanged
['2023-09-02 08:00', 1000], # revision: back to 1000
['2023-09-03 08:00', 1500]] # new timestamp
).astype({'date':'datetime64[ns]'})
date value
0 2023-09-01 08:00:00 500
1 2023-09-02 08:00:00 1000
2 2023-09-03 08:00:00 1500
Processing
Step 1: Concatenate both repo and feed, but document each row's source in a temporary column
Revision:
revi = pd.concat([repo.assign(source='repo'),
feed.assign(source='feed')]
).sort_values(by='date').reset_index(drop=True)
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
1 2023-09-01 08:00:00 500 NaN feed
2 2023-09-02 08:00:00 1000 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 NaN feed
5 2023-09-03 08:00:00 1500 NaN feed
Step 2. Assign revision number
A little technical, that one, but simply put, the size (len
) of each group of rows with identical timestamp, is used to calculate a revision number.
aggf = len
revi.loc[revi.revision.isna(), 'revision'] = revi.set_index('date')['revision'].groupby(level=0).agg(aggf).values - 1
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
1 2023-09-01 08:00:00 500 1.0 feed
2 2023-09-02 08:00:00 1000 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 2.0 feed
5 2023-09-03 08:00:00 1500 0.0 feed
Step 3: Remove feed rows that are not new information
revi = revi.drop_duplicates(subset=['date','source'], keep='last' # keep only the latest revision for each repo timestamp
).drop_duplicates(subset=['date','value'], keep='first') # remove feed rows that only repeated the repo.
date value revision source
0 2023-09-01 08:00:00 500 0.0 repo
3 2023-09-02 08:00:00 2000 1.0 repo
4 2023-09-02 08:00:00 1000 2.0 feed
5 2023-09-03 08:00:00 1500 0.0 feed
Step 4: Finaly clean the revision dataframe by removing repo rows, then 'source' column
revi = revi[revi.source=='feed'].drop(columns='source')
date value revision
4 2023-09-02 08:00:00 1000 2.0
5 2023-09-03 08:00:00 1500 0.0
Those are the 2 rows from the current feed that are required to be committed to repository:
repo = pd.concat([repo, revi]
).sort_values(by=['date','revision']
).reset_index(drop=True)