From 49c411e8a792c58d77f74542987417aa98ecf2d6 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 26 Jun 2025 10:45:13 +0800 Subject: [PATCH v45 4/5] Support the conflict detection for update_deleted This patch supports detecting update_deleted conflicts during update operations. If the target row cannot be found when applying update operations, we perform an additional scan of the table using snapshotAny. This scan aims to locate the most recently deleted row that matches the old column values from the remote update operation and has not yet been removed by VACUUM. If any such tuples are found, we report the update_deleted conflict along with the origin and transaction information that deleted the tuple. --- doc/src/sgml/catalogs.sgml | 3 +- doc/src/sgml/logical-replication.sgml | 15 +++ doc/src/sgml/monitoring.sgml | 11 ++ doc/src/sgml/ref/create_subscription.sgml | 5 +- src/backend/catalog/system_views.sql | 1 + src/backend/executor/execReplication.c | 138 ++++++++++++++++++++- src/backend/replication/logical/conflict.c | 22 ++++ src/backend/replication/logical/worker.c | 66 ++++++---- src/backend/utils/adt/pgstatfuncs.c | 18 +-- src/include/catalog/pg_proc.dat | 6 +- src/include/executor/executor.h | 7 +- src/include/replication/conflict.h | 3 + src/include/replication/worker_internal.h | 5 +- src/test/regress/expected/rules.out | 3 +- src/test/subscription/t/035_conflicts.pl | 31 ++++- 15 files changed, 288 insertions(+), 46 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c948263521e..16702bbc26b 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8087,7 +8087,8 @@ SCRAM-SHA-256$<iteration count>:&l subretainconflictinfo bool - If true, the information (e.g., dead tuples, commit timestamps, and + If true, he detection of is + enabled and the information (e.g., dead tuples, commit timestamps, and origins) on the subscriber that is useful for conflict detection is retained. diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 2394f282253..34133395864 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1743,6 +1743,21 @@ Publications: + + update_deleted + + + The tuple to be updated was deleted by another origin. The update will + simply be skipped in this scenario. + Note that this conflict can only be detected when + + and retain_conflict_info + are enabled. Note that if a tuple cannot be found due to the table being + truncated only a update_missing conflict will + arise + + + update_origin_differs diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 84e5a48181c..dbe8ee6db16 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2212,6 +2212,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + confl_update_deleted bigint + + + Number of times the tuple to be updated was deleted by another origin + during the application of changes. See + for details about this conflict. + + + confl_update_origin_differs bigint diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ed835032d27..cd4aa528577 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -445,8 +445,9 @@ CREATE SUBSCRIPTION subscription_namefalse. - If set to true, a physical replication slot named - pg_conflict_detection will be + If set to true, the detection of + is enabled, and a physical + replication slot named pg_conflict_detection created on the subscriber to prevent the conflict information from being removed. diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5221454ef67..cac8d2f79d0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1390,6 +1390,7 @@ CREATE VIEW pg_stat_subscription_stats AS ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42d..ce8eec5ca20 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -14,12 +14,14 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/genam.h" #include "access/gist.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" #include "access/xact.h" +#include "access/heapam.h" #include "catalog/pg_am_d.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -27,6 +29,7 @@ #include "replication/conflict.h" #include "replication/logicalrelation.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,7 +39,7 @@ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq); + TypeCacheEntry **eq, Bitmapset *columns); /* * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that @@ -221,7 +224,7 @@ retry: if (eq == NULL) eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts); - if (!tuples_equal(outslot, searchslot, eq)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; } @@ -277,10 +280,13 @@ retry: /* * Compare the tuples in the slots by checking if they have equal values. + * + * If 'columns' is not null, only the columns specified within it will be + * considered for the equality check, ignoring all other columns. */ static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, - TypeCacheEntry **eq) + TypeCacheEntry **eq, Bitmapset *columns) { int attrnum; @@ -305,6 +311,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, if (att->attisdropped || att->attgenerated) continue; + /* + * Ignore columns that are not listed for checking. + */ + if (columns && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + columns)) + continue; + /* * If one value is NULL and other is not, then they are certainly not * equal @@ -380,7 +394,7 @@ retry: /* Try to find the tuple */ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) { - if (!tuples_equal(scanslot, searchslot, eq)) + if (!tuples_equal(scanslot, searchslot, eq, NULL)) continue; found = true; @@ -455,6 +469,122 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex) } } +/* + * Searches the relation 'rel' for the most recently deleted tuple that matches + * the values in 'searchslot' and is not yet removable by VACUUM. The function + * returns the transaction ID, origin, and commit timestamp of the transaction + * that deleted this tuple. + * + * The commit timestamp of the transaction that deleted the tuple is used to + * determine whether the tuple is the most recently deleted one. + * + * This function performs a full table scan instead of using indexes because + * index scans could miss deleted tuples if an index has been re-indexed or + * re-created during change applications. While this approach may be slow on + * large tables, it is considered acceptable because it is only used in rare + * conflict cases where the target row for an update cannot be found. + */ +bool +FindMostRecentlyDeletedTupleInfo(Relation rel, TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time) +{ + TupleTableSlot *scanslot; + TableScanDesc scan; + TypeCacheEntry **eq; + TransactionId oldestXmin; + BufferHeapTupleTableSlot *hslot; + HeapTuple tuple; + Buffer buf; + Bitmapset *indexbitmap; + TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel); + + Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor)); + + *delete_xid = InvalidTransactionId; + *delete_origin = InvalidRepOriginId; + *delete_time = 0; + + /* Exit early if the commit timestamp data is not available */ + if (!track_commit_timestamp) + return false; + + /* Get the cutoff xmin for HeapTupleSatisfiesVacuum */ + oldestXmin = GetOldestNonRemovableTransactionId(rel); + + /* Get the index column bitmap for tuples_equal */ + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* fallback to PK if no replica identity */ + if (!indexbitmap) + indexbitmap = RelationGetIndexAttrBitmap(rel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + + eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts); + + /* + * Start a heap scan using SnapshotAny to identify dead tuples that are + * not visible under a standard MVCC snapshot. + */ + scan = table_beginscan(rel, SnapshotAny, 0, NULL); + scanslot = table_slot_create(rel, NULL); + hslot = (BufferHeapTupleTableSlot *) scanslot; + + table_rescan(scan, NULL); + + /* Try to find the tuple */ + while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + { + bool dead = false; + TransactionId xmax; + TimestampTz localts; + RepOriginId localorigin; + + if (!tuples_equal(scanslot, searchslot, eq, indexbitmap)) + continue; + + tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL); + buf = hslot->buffer; + + LockBuffer(buf, BUFFER_LOCK_SHARE); + + /* + * We do not consider HEAPTUPLE_DEAD status because it indicates + * either tuples whose inserting transaction was aborted, meaning + * there is no commit timestamp or origin, or tuples deleted by a + * transaction older than oldestXmin, making it safe to ignore them + * during conflict detection (See comments atop + * maybe_advance_nonremovable_xid() for details). + */ + if (HeapTupleSatisfiesVacuum(tuple, oldestXmin, buf) == HEAPTUPLE_RECENTLY_DEAD) + dead = true; + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!dead) + continue; + + xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); + + /* Select the dead tuple with the most recent commit timestamp */ + if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) && + (TimestampDifferenceExceeds(*delete_time, localts, 0) || + *delete_time == 0)) + { + *delete_xid = xmax; + *delete_time = localts; + *delete_origin = localorigin; + } + } + + table_endscan(scan); + ExecDropSingleTupleTableSlot(scanslot); + + return *delete_time != 0; +} + /* * Find the tuple that violates the passed unique index (conflictindex). * diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 97c4e26b586..24d0b4ada4d 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -25,6 +25,7 @@ static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", + [CT_UPDATE_DELETED] = "update_deleted", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", [CT_UPDATE_EXISTS] = "update_exists", [CT_UPDATE_MISSING] = "update_missing", @@ -173,6 +174,7 @@ errcode_apply_conflict(ConflictType type) case CT_UPDATE_EXISTS: case CT_MULTIPLE_UNIQUE_CONFLICTS: return errcode(ERRCODE_UNIQUE_VIOLATION); + case CT_UPDATE_DELETED: case CT_UPDATE_ORIGIN_DIFFERS: case CT_UPDATE_MISSING: case CT_DELETE_ORIGIN_DIFFERS: @@ -246,6 +248,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, break; + case CT_UPDATE_DELETED: + if (localts) + { + if (localorigin == InvalidRepOriginId) + appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + else if (replorigin_by_oid(localorigin, true, &origin_name)) + appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."), + origin_name, localxmin, timestamptz_to_str(localts)); + + /* The origin that modified this row has been removed. */ + else + appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."), + localxmin, timestamptz_to_str(localts)); + } + else + appendStringInfo(&err_detail, _("The row to be updated was deleted.")); + + break; + case CT_UPDATE_ORIGIN_DIFFERS: if (localorigin == InvalidRepOriginId) appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5fff9f64a13..ca60a668b1d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2818,17 +2818,28 @@ apply_handle_update_internal(ApplyExecutionData *edata, } else { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(localrel, remoteslot, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); /* - * The tuple to be updated could not be found. Do nothing except for - * emitting a log message. + * The tuple to be updated could not be found or was deleted. Do + * nothing except for emitting a log message. */ - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, newslot, list_make1(&conflicttuple)); + ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot, + list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3166,18 +3177,29 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part, &localslot); if (!found) { + ConflictType type; TupleTableSlot *newslot = localslot; + if (MySubscription->retainconflictinfo && + FindMostRecentlyDeletedTupleInfo(partrel, remoteslot_part, + &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + type = CT_UPDATE_DELETED; + else + type = CT_UPDATE_MISSING; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); /* - * The tuple to be updated could not be found. Do nothing - * except for emitting a log message. + * The tuple to be updated could not be found or was + * deleted. Do nothing except for emitting a log message. */ ReportApplyConflict(estate, partrelinfo, LOG, - CT_UPDATE_MISSING, remoteslot_part, - newslot, list_make1(&conflicttuple)); + type, remoteslot_part, newslot, + list_make1(&conflicttuple)); return; } @@ -4060,10 +4082,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) * * The oldest_nonremovable_xid is maintained in shared memory to prevent dead * rows from being removed prematurely when the apply worker still needs them - * to detect conflicts reliably. This helps to retain the required commit_ts - * module information, which further helps to detect update_origin_differs and - * delete_origin_differs conflicts reliably, as otherwise, vacuum freeze could - * remove the required information. + * to detect update_deleted conflicts. Additionally, this helps to retain + * the required commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. * * The non-removable transaction ID is advanced to the oldest running * transaction ID once all concurrent transactions on the publisher have been @@ -4102,10 +4124,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) * transactions that occurred concurrently with the tuple DELETE, any * subsequent UPDATE from a remote node should have a later timestamp. In such * cases, it is acceptable to detect an update_missing scenario and convert the - * UPDATE to an INSERT when applying it. But, detecting concurrent remote - * transactions with earlier timestamps than the DELETE is necessary, as the - * UPDATEs in remote transactions should be ignored if their timestamp is - * earlier than that of the dead tuples. + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. * * Note that advancing the non-removable transaction ID is not supported if the * publisher is also a physical standby. This is because the logical walsender @@ -4140,8 +4162,8 @@ can_advance_nonremovable_xid(RetainConflictInfoData *rci_data) { /* * It is sufficient to manage non-removable transaction ID for a - * subscription by the main apply worker to detect conflicts reliably even - * for table sync or parallel apply workers. + * subscription by the main apply worker to detect update_deleted reliably + * even for table sync or parallel apply workers. */ if (!am_leader_apply_worker()) return false; @@ -4346,10 +4368,10 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) * We expect the publisher and subscriber clocks to be in sync using time * sync service like NTP. Otherwise, we will advance this worker's * oldest_nonremovable_xid prematurely, leading to the removal of rows - * required to detect conflicts reliably. This check primarily addresses - * scenarios where the publisher's clock falls behind; if the publisher's - * clock is ahead, subsequent transactions will naturally bear later - * commit timestamps, conforming to the design outlined atop of + * required to detect update_delete reliably. This check primarily + * addresses scenarios where the publisher's clock falls behind; if the + * publisher's clock is ahead, subsequent transactions will naturally bear + * later commit timestamps, conforming to the design outlined atop of * maybe_advance_nonremovable_xid(). * * XXX Consider waiting for the publisher's clock to catch up with the diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 1c12ddbae49..2a084d3f3f0 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; @@ -2193,19 +2193,21 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_deleted", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists", + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3ed0b1be9a4..220eaa4d20c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5688,9 +5688,9 @@ { oid => '6231', descr => 'statistics: information about subscription stats', proname => 'pg_stat_get_subscription_stats', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_deleted,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 104b059544d..f8a0aadd95b 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -14,6 +14,7 @@ #ifndef EXECUTOR_H #define EXECUTOR_H +#include "datatype/timestamp.h" #include "executor/execdesc.h" #include "fmgr.h" #include "nodes/lockoptions.h" @@ -759,7 +760,11 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, TupleTableSlot *outslot); extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot); - +extern bool FindMostRecentlyDeletedTupleInfo(Relation rel, + TupleTableSlot *searchslot, + TransactionId *delete_xid, + RepOriginId *delete_origin, + TimestampTz *delete_time); extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot); extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 6c59125f256..cbd9656a60a 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -26,6 +26,9 @@ typedef enum /* The row to be inserted violates unique constraint */ CT_INSERT_EXISTS, + /* The row to be updated was deleted by a different origin */ + CT_UPDATE_DELETED, + /* The row to be updated was modified by a different origin */ CT_UPDATE_ORIGIN_DIFFERS, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 436fe445d64..243164aada8 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -87,8 +87,9 @@ typedef struct LogicalRepWorker bool parallel_apply; /* - * The changes made by this and later transactions must be retained to - * ensure reliable conflict detection during the apply phase. + * The changes made by this and later transactions shouldn't be removed. + * This allows the detection of update_deleted conflicts when applying + * changes in this logical replication worker. * * The logical replication launcher manages an internal replication slot * named "pg_conflict_detection". It asynchronously collects this ID to diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 79ed5233edb..d0428ac47fb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2174,6 +2174,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.apply_error_count, ss.sync_error_count, ss.confl_insert_exists, + ss.confl_update_deleted, ss.confl_update_origin_differs, ss.confl_update_exists, ss.confl_update_missing, @@ -2182,7 +2183,7 @@ pg_stat_subscription_stats| SELECT ss.subid, ss.confl_multiple_unique_conflicts, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_deleted, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset); pg_stat_sys_indexes| SELECT relid, indexrelid, schemaname, diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 890bcdaac57..21407fb325b 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -150,7 +150,9 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert'); # Setup a bidirectional logical replication between node_A & node_B ############################################################################### -# Initialize nodes. +# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect +# the conflict when attempting to update a row that was previously modified by +# a different origin. # node_A. Increase the log_min_messages setting to DEBUG2 to debug test # failures. Disable autovacuum to avoid generating xid that could affect the @@ -158,7 +160,8 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert'); my $node_A = $node_publisher; $node_A->append_conf( 'postgresql.conf', - qq{autovacuum = off + qq{track_commit_timestamp = on + autovacuum = off log_min_messages = 'debug2'}); $node_A->restart; @@ -257,6 +260,8 @@ is($result, qq(t), 'worker on node A retains conflict information'); ############################################################################### # Check that dead tuples on node A cannot be cleaned by VACUUM until the # concurrent transactions on Node B have been applied and flushed on Node A. +# And check that an update_deleted conflict is detected when updating a row +# that was deleted by a different origin. ############################################################################### # Insert a record @@ -270,6 +275,8 @@ is($result, qq(1|1 # Disable the logical replication from node B to node A $node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); +my $log_location = -s $node_B->logfile; + $node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); @@ -281,10 +288,30 @@ ok( $stderr =~ qr/1 are dead but not yet removable/, 'the deleted column is non-removable'); +# Ensure the DELETE is replayed on Node B +$node_A->wait_for_catchup($subname_BA); + +# Check the conflict detected on Node B +my $logfile = slurp_file($node_B->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.* +.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .* +.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'delete target row was modified in tab'); + +$log_location = -s $node_A->logfile; + $node_A->safe_psql( 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); $node_B->wait_for_catchup($subname_AB); +$logfile = slurp_file($node_A->logfile(), $log_location); +ok( $logfile =~ + qr/conflict detected on relation "public.tab": conflict=update_deleted.* +.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .* +.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/, + 'update target row was deleted in tab'); + # Remember the next transaction ID to be assigned my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); -- 2.31.1