As a part of ETL, table continuous_trips
has continuous flow of incoming records.
New records are aggregated and get inserted into temp. table called trips_agg
every 5 minutes.
CREATE TABLE IF NOT EXISTS trips_agg AS (
SELECT start_time, station_id, from_station, to_station, from_terminus, end_terminus, previous_station, next_station,
AVG(wait_span) AS wait_span,
AVG(walk_span) AS walk_span,
AVG(delay_span) AS delay_span,
SUM(passengers_requests) AS passengers_requests
FROM continuous_trips
GROUP BY start_time, station_id, from_station, to_station, from_terminus, end_terminus, previous_station, next_station
)
The table trips_agg
gets dropped after inserting all records into the table daily_trips
and recreated during next cycle.
Tables daily_trips
& trips_agg
have the same columns.
CREATE TABLE IF NOT EXISTS daily_trips (
start_time timestamp without time zone NOT NULL,
station_id text NOT NULL,
from_station text NOT NULL,
to_station text NOT NULL,
from_terminus text NOT NULL,
end_terminus text NOT NULL,
previous_station text,
next_station text,
wait_span interval NOT NULL,
walk_span interval NOT NULL,
delay_span interval NOT NULL,
passengers_requests numeric NOT NULL
)
Note: columns 'previous_station' and 'next_station' allows null.
composite unique key is added as follows:
ALTER TABLE daily_trips ADD CONSTRAINT daily_trips_unique_row UNIQUE
(start_time, station_id, from_station, to_station, from_terminus, end_terminus, previous_station, next_station);
In case unique key is violated upon insertion, the record should be updated. So used upsert strategy.
INSERT INTO daily_trips SELECT * FROM trips_agg
ON CONFLICT (start_time, station_id, from_station, to_station, from_terminus, end_terminus,
previous_station, next_station) DO UPDATE
set wait_span = (daily_trips.wait_span + EXCLUDED.wait_span)/2,
walk_span = (daily_trips.walk_span + EXCLUDED.walk_span)/2 ,
delay_span = (daily_trips.delay_span + EXCLUDED.delay_span)/2,
passengers_requests =(daily_trips.passengers_requests + EXCLUDED.passengers_requests);
When values for all columns are present this setup works perfectly but, it's not the case when any of nullable columns have a null value.
Since Postgres doesn't consider null values to invoke unique constraint, whenever any of nullable columns have null value, a new row is inserted, instead of update. This results into multiple rows for the unique key.
To overcome this, added an index on the table daily_trips
after referring this article.
create unique index daily_trips_unique_trip_idx ON daily_trips
(start_time, station_id, from_station, to_station, from_terminus, end_terminus,
(previous_station IS NULL), (next_station IS NULL)
where previous_station IS NULL or fnext_station IS NULL
However, only one row could be added with null value for any nullable column. For next row with null value for any nullable column, update is not happening and instead getting following error:
ERROR: duplicate key value violates unique constraint "daily_trips_unique_trip_idx"
What is needed?
The unique constraint should be respected and update should happen when there is null value in either of nullable columns 'previous_station' or 'next_station'.
Any help is appreciated.
The solution is to translate NULL to some other value, more specifically the 0-length string (''). The coalesce function does precisely that when used as coalesce (column_name, '')
. The problem being creating a unique constraint with that generates a syntax error. So you cannot create that constraint. However, there is a work around, although not a easy one. Postgres enforces unique constraints through a unique index, so just create the index directly.
create unique index daily_trips_unique_row on daily_trips
( start_time
, station_id
, from_station
, to_station
, from_terminus
, end_terminus
, coalesce(previous_station , '')
, coalesce(next_station, '')
);
However, while the above respects the null-ability of index columns it no longer recognizes INSERT ... ON CONFLICT
(See example here) . You will either need a function/procedure to handle the exception or use Select ... if exists then Update else Insert
logic.