Search code examples
postgresqlpsql

Copying data from multiple table to single table in realtime


I have 10 tables, each with 3 columns shown below.

create table table_1
(
    signal_id text not null,
    signal_timestamp text not null,
    value numeric not null 
);

And I have a source application inserting 50 rows per second into each table, continuously. I aim to collect them all in a single table

create table table_all_signals 
(
    source_id int not null,
    signal_id text not null,
    signal_timestamp text not null,
    value numeric not null 
);

Above source_id should be unique for each table, e.g. source_id is 1 for table_1, 2 for table_2 and so on. Every new row inserted into any table should be copied to table_all_signals with respective source_id. I can't use a view because I want to broadcast a NOTIFY from a trigger on this table to LISTEN in my python client for new rows. I think we cannot create triggers on views.

begin;
alter table table_1 rename to table_1_legacy;
create view table_1 as 
  select signal_id as "signalID", 
         signal_timestamp as "timestamp",
         value as "value"
  from table_all_signals
  where source_id=1;

create or replace rule table_1_singal_id_i as on insert to table_1
  do instead 
  insert into table_all_signals select 1::int,new.* 
  returning signal_id as "signalID", 
            signal_timestamp as "timestamp",
            value as "value";

create or replace rule table_1_signal_id_u as on update to table_1
  do instead 
  update table_all_signals
  set signal_id=new."signalID",signal_timestamp=new."timestamp",value=new."value"
  where source_id=1 and signal_id=old."signalID" and signal_timestamp=old."timestamp" and value=old."value"
  returning signal_id as "signalID", 
            signal_timestamp as "timestamp",
            value as "value";

create rule table_1_signal_id_d as on delete to table_1
  do instead 
  delete from table_all_signals
  where source_id=1 and signal_id=old."signalID" and signal_timestamp=old."timestamp" and value=old."value"
  returning signal_id as "signalID", 
            signal_timestamp as "timestamp",
            value as "value";
commit;

I did the above for the other 9 tables as well

insert into table_all_signals select 1,* from table_1_legacy;
insert into table_all_signals select 2,* from table_2_legacy;
--...
insert into table_all_signals select 10,* from table_10_legacy;

I have executed above query first, Then executed below query

SELECT * 
FROM table_all_signals
ORDER BY TIMESTAMP DESC
LIMIT 10000

I am able to see rows from all tables with respective source_id. It is okay so far.

Now again I have executed below query

SELECT * 
FROM table_all_signals
ORDER BY TIMESTAMP DESC
LIMIT 10000

Now I am also getting same rows I got from previous query. My expectation is I should get newly inserted rows. I have queried all 10 tables, they are getting inserted with new rows regularly. But those rows are not getting copied to table_all_signals automatically.

To update table_all_signals with latest rows, I have to execute below query every time.

insert into table_all_signals select 1,* from table_1_legacy;
insert into table_all_signals select 2,* from table_2_legacy;
---...
insert into table_all_signals select 10,* from table_10_legacy;

Is there any solution to make table_all_signals get updated automatically (without running above query) when any of the 10 source tables gets new rows?


Solution

  • You're likely feeding your rows into the _legacy tables. Source devices should still be configured to write into table_1, table_2,...table_N like they did before, and now the rules should redirect them to the common table_all_signals.

    If that's a problem, you can skip hiding the table_N behind views and set up the rules directly on them: demo at db<>fiddle

    begin;
    create rule table_1_singal_id_i as on insert to table_1
      do instead 
      insert into table_all_signals select 1::int,new.* 
      returning signal_id as "signalID", 
                signal_timestamp as "timestamp",
                value as "value";
    create rule table_1_singal_id_u as on update to table_1
      do instead 
      update table_all_signals
      set signal_id=new."signalID", 
          signal_timestamp=new."timestamp",
          value=new."value"
      where source_id=1
      and signal_id=old."signalID"
      and signal_timestamp=old."timestamp"
      and value=old."value"
      returning signal_id as "signalID", 
                signal_timestamp as "timestamp",
                value as "value";
    create rule table_1_singal_id_d as on delete to table_1
      do instead 
      delete from table_all_signals
      where source_id=1
      and signal_id=old."signalID"
      and signal_timestamp=old."timestamp"
      and value=old."value"
      returning signal_id as "signalID", 
                signal_timestamp as "timestamp",
                value as "value";
    commit;
    insert into table_all_signals select 1,* from table_1;
    

    Instead of inserting into table_1 this will now get redirected to table_all_signals:

    insert into table_1 
      select gen_random_uuid(),
             '2023-11-21 12:46:49.734534'::timestamp-'100 microseconds'::interval*random(),
             round((random()*1e3)::numeric,6) 
      returning *;
    
    signalID timestamp value
    2a8a9cc5-23cb-423f-9a06-215bc90d3e16 2023-11-21 12:46:49.734502 123.257321

    The row didn't go to table_1:

    select * from table_1;
    
    signalID timestamp value
    SELECT 0
    

    The row went to table_all_signals instead

    select * from table_all_signals;
    
    source_id signal_id signal_timestamp value
    1 2a8a9cc5-23cb-423f-9a06-215bc90d3e16 2023-11-21 12:46:49.734502 123.257321