Handling Duplicate Records in Input Data Streams

Posted April 17, 2018 by Soniya Shah, Information Developer

This blog post was authored by Ravi Gupta. We have often found that sources or operational systems that provide data for further analysis have duplicate records and these are sent to a downstream application or EDW for processing. This post shows a few scenarios of how to handle these duplicate records using various SQL options, how Vertica handles them via primary keys, and how to ignore duplicate records. The example scenario shows Apache Kafka streams that have some duplicate records and how to handle them. Let’s first create a staging table where Kafka streams will push the data. Then, we will create another table from where users, reporting applications, or analysts will read data. The streams data will ultimately be inserted as records into the tables. vsql=> create table stage_table ( id int not null constraint stg_pk primary key enabled, val varchar(10), ts timestamp); vsql=> create table report_table ( id int not null constraint rep_pk primary key enabled, val varchar(10), ts timestamp); vsql=> insert into stage_table values ( 0, ‘zero’ , ‘2018-01-01 01:00:00’::timestamp); vsql=> insert into stage_table values ( 1, ‘one’ , current_timestamp – 2 ); vsql=> insert into stage_table values ( 2, ‘two’ , current_timestamp – 2 ); vsql=> insert into stage_table values ( 1, ‘one’ , current_timestamp – 1 ); ERROR 6745: Duplicate key values: ‘id=1’ — violates constraint ‘public.stage_table.stg_pk’ This query failed because the primary key was enabled and the value 1 was loaded twice. Now, try to load data from a source file as well: vsql=> \! cat /tmp/t.dat 2|two|2018-02-02 3|three|2018-02-20 vsql=> copy stage_table from ‘/tmp/t.dat’ rejected data ‘/tmp/rej’ exceptions ‘/tmp/exc’ ; ERROR 6745: Duplicate key values: ‘id=2’ — violates constraint ‘public.stage_table.stg_pk’ Because the primary key was created and enabled, it will rollback the transaction as soon as it found the duplicate record. The whole batch is rolled back. vsql=> SELECT * FROM stage_table; id | val | ts ----+------+---------------------------- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 (3 rows) We ended up only with unique records in the staging table, but the copy job has failed with errors. Let’s see what happens if we disable the primary key on the staging table: vsql=> ALTER TABLE stage_table ALTER CONSTRAINT stg_pk DISABLED; vsql=> COPY stage_table FROM '/tmp/t.dat' rejected data '/tmp/rej' exceptions '/tmp/exc' ; Rows Loaded ------------- 2 Now both records are loaded without issues. In fact if we load one more duplicate record manually, it will load without errors. vsql=> insert into stage_table values ( 3, 'three-2' , current_timestamp ); OUTPUT -------- 1 vsql=> commit; vsql=> SELECT * FROM stage_table; id | val | ts —-+———+—————————- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 2 | two | 2018-02-02 00:00:00 3 | three | 2018-02-20 00:00:00 3 | three-2 | 2018-03-26 22:17:04.70945 (6 rows) If we want to check the ID wise record number for all the records we can run the following: vsql=> SELECT id, val, ts, ROW_NUMBER() OVER(PARTITION BY id) FROM stage_table; id | val | ts | ?column? —-+———+—————————-+———- 0 | zero | 2018-01-01 01:00:00 | 1 1 | one | 2018-03-24 22:06:13.090875 | 1 2 | two | 2018-02-02 00:00:00 | 1 2 | two | 2018-03-24 22:06:13.090875 | 2 3 | three-2 | 2018-03-26 22:17:04.70945 | 1 3 | three | 2018-02-20 00:00:00 | 2 (6 rows) Let’s try to fetch unique records based upon the latest timestamp column: vsql=> SELECT * FROM ( SELECT id, val, ts, ROW_NUMBER() OVER(PARTITION BY id order by ts desc) RN FROM stage_table ) AS tb WHERE tb.RN = 1 ; id | val | ts | RN —-+———+—————————-+—- 0 | zero | 2018-01-01 01:00:00 | 1 1 | one | 2018-03-24 22:06:13.090875 | 1 2 | two | 2018-03-24 22:06:13.090875 | 1 3 | three-2 | 2018-03-26 22:17:04.70945 | 1 How can we check which primary keys are having constraint violation issues on a particular table? We can use the ANALYZE_CONSTRAINT function: vsql=> SELECT ANALYZE_CONSTRAINTS ('public.stage_table', 'id'); Schema Name | Table Name | Column Names | Constraint Name | Constraint Type | Column Values -------------+-------------+--------------+-----------------+-----------------+--------------- public | stage_table | id | stg_pk | PRIMARY | ('3') public | stage_table | id | stg_pk | PRIMARY | ('2') We can also check which keys have duplicate records by running the following query: vsql=> SELECT id, count(*) cnt FROM stage_table group by id having count(*) > 1; id | cnt ----+----- 2 | 2 3 | 2 (2 rows) Now let’s try to load this data from the staging table to the report table. The report table primary key constraint may not allow duplicates: vsql=> INSERT Into report_table SELECT * FROM stage_table ; ERROR 6745: Duplicate key values: 'id=2' -- violates constraint 'public.report_table.rep_pk' DETAIL: Additional violations: Constraint 'public.report_table.rep_pk': duplicate key values: 'id=3' The query failed. Let’s try using the MERGE Statement to merge data from the staging table to the report table. We can see the report table primary key does not allow duplicates vsql=> merge into report_table R using stage_table S on S.id=S.id WHEN MATCHED THEN UPDATE SET val=S.val,ts=S.ts WHEN NOT MATCHED THEN INSERT (id,val,ts) VALUES ( S.id, S.val, S.ts); ERROR 6745: Duplicate key values: 'id=2' -- violates constraint 'public.report_table.rep_pk' DETAIL: Additional violations: Constraint 'public.report_table.rep_pk': duplicate key values: 'id=3' However, if we pick up the unique records from the staging table and load them into the report table, it will work: vsql=> INSERT INTO report_table ( SELECT id,val,ts FROM ( SELECT id, val, ts, row_number() over(partition by id order by ts desc) RN FROM stage_table ) as tb WHERE tb.RN = 1 ) ; OUTPUT -------- 4 vsql=> SELECT * FROM report_table; id | val | ts ----+---------+---------------------------- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 3 | three-2 | 2018-03-26 22:17:04.70945 (4 rows) We assume the staging table keeps getting data from source systems and keeps adding data from load streams. If we try to re-merge the staging tables’ data into the report table, the load may fail due to a primary key issue on the report table’s primary key. vsql=> MERGE into report_table R using stage_table S on R.id=S.id WHEN MATCHED THEN UPDATE SET val=S.val,ts=S.ts WHEN NOT MATCHED THEN INSERT (id,val,ts) VALUES ( S.id, S.val, S.ts); ERROR 3147: Duplicate MERGE key detected in join [(public.report_table x public.stage_table) using report_table_super and subquery (PATH ID: 1)]; value [2] Let’s try to drop the primary key constraint from the staging table and check if the error is due to stg_pk: vsql=> ALTER TABLE stage_table DROP CONSTRAINT stg_pk CASCADE; ALTER TABLE vsql=> MERGE INTO report_table R using stage_table S on R.id=S.id WHEN MATCHED THEN UPDATE SET val=S.val,ts=S.ts WHEN NOT MATCHED THEN INSERT (id,val,ts) VALUES ( S.id, S.val, S.ts); ERROR 3147: Duplicate MERGE key detected in join [(public.report_table x public.stage_table) using report_table_super and subquery (PATH ID: 1)]; value [2] The issue is because of primary key constraints on the report table. Because data was already loaded on the report table, if we reload from the staging table it will fail. This is the current state of the data: vsql=> SELECT * FROM report_table; id | val | ts ----+---------+---------------------------- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 3 | three-2 | 2018-03-26 22:17:04.70945 (4 rows) vsql=> SELECT * FROM stage_table; id | val | ts —-+———+—————————- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 2 | two | 2018-02-02 00:00:00 3 | three | 2018-02-20 00:00:00 3 | three-2 | 2018-03-26 22:17:04.70945 (6 rows) vsql=> SELECT * FROM ( SELECT id, val, ts, row_number() over(partition by id order by ts desc) RN FROM stage_table ) as tb WHERE tb.RN = 1 ; id | val | ts | RN —-+———+—————————-+—- 0 | zero | 2018-01-01 01:00:00 | 1 1 | one | 2018-03-24 22:06:13.090875 | 1 2 | two | 2018-03-24 22:06:13.090875 | 1 3 | three-2 | 2018-03-26 22:17:04.70945 | 1 (4 rows) Now when we have unique records in the report table, let’s try to re-merge the unique records from source table into the target report table. We see this merge works: vsql=> MERGE INTO report_table R USING (SELECT * FROM ( SELECT id, val, ts, row_number() over(partition by id order by ts desc) RN FROM stage_table ) as tb WHERE tb.RN = 1 ) S on R.id=S.id WHEN MATCHED THEN UPDATE SET val = S.val, ts = S.ts WHEN NOT MATCHED THEN INSERT (id,val,ts) VALUES ( S.id, S.val, S.ts); OUTPUT -------- 4 (1 row) However, if we keep running MERGE it will end up running UPDATE and create lots of delete vectors in the report table. We should always run MERGE or remove duplicates from the staging table first, then load into the report table. Below are a few approaches to handle this scenario. First approach: delete data from the staging table that is already present in the report table. vsql=> DELETE from stage_table WHERE id IN ( SELECT ID from report_table R WHERE id = R.id ); OUTPUT -------- 6 (1 row) vsql=> select * from report_table; id | val | ts —-+———+—————————- 0 | zero | 2018-01-01 01:00:00 1 | one | 2018-03-24 22:06:13.090875 2 | two | 2018-03-24 22:06:13.090875 3 | three-2 | 2018-03-26 22:17:04.70945 (4 rows) vsql=> select * from stage_table; id | val | ts —-+—–+—- (0 rows) Now if we try to select and merge into the report table again, then no merge will happen because no data is in the staging table as per the query condition: vsql=> merge into report_table R using (SELECT * FROM ( SELECT id, val, ts, row_number() over(partition by id order by ts desc) RN FROM stage_table ) as tb WHERE tb.RN = 1) S on R.id=S.id WHEN MATCHED THEN UPDATE SET val=S.val,ts=S.ts WHEN NOT MATCHED THEN INSERT (id,val,ts) VALUES ( S.id, S.val, S.ts); OUTPUT -------- 0 (1 row) Second approach: pick based on the IDs, the latest record from the staging table, load it into a temp table and then load it into report_table. This is a better approach because it will not hold any of the data in the staging and report table for long, while it joins. In addition, the concurrent sessions can do this based upon location or the date predicate clause. If the staging table and report tables are large, then running deletes on them may be costly. vsql=> CREATE TEMPORARY TABLE stg_temp as SELECT id, val, ts FROM stage_table LIMIT 1 OVER (PARTITION BY id ORDER BY ts DESC); vsql=> DELETE from stg_temp WHERE exists ( SELECT id FROM report_table R where R.id = id) ; OUTPUT -------- 4 vsql=> SELECT * FROM stg_temp; id | val | ts ----+-----+---- (0 rows) vsql=> insert into report_table select * from stg_temp; OUTPUT -------- 0 (1 row) Third approach: create live aggregated projections (LAP), which will always give the latest data from the staging table. vsql=> CREATE PROJECTION stg_table_topk ( id, ts, val) as SELECT id, ts, val FROM stage_table LIMIT 1 OVER (PARTITION BY id ORDER BY ts DESC); In the following, we see one more record (ID 4) was loaded into the staging table. We can see the latest unique records from stage_table_topk projections: vsql=> SELECT * FROM stg_table_topk; id | ts | val ----+----------------------------+--------- 0 | 2018-01-01 01:00:00 | zero 1 | 2018-03-25 16:45:59.928004 | one 2 | 2018-03-25 16:45:59.928004 | two 3 | 2018-03-27 16:47:15.270538 | three-2 4 | 2018-03-24 17:15:39.66671 | four-4 (5 rows) vsql=> INSERT INTO report_table ( SELECT id, val, ts FROM stg_table_topk WHERE id NOT IN ( SELECT id FROM report_table R WHERE R.id = id) ); OUTPUT ——– 1 (1 row) The idea behind using these approaches is that you can populate unique records into target tables effectively and the user does not need to handle duplicate records related errors during ETL. You can choose any one of these approaches based upon data volumes and suitable scenarios in your environment.