From 633170653f788dfd5ae4e293058fa0c93b8f5b85 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 12 Jan 2026 20:21:03 +0530 Subject: [PATCH v21 2/3] Implement the conflict insertion infrastructure for the conflict log table This patch introduces the core logic to populate the conflict log table whenever a logical replication conflict is detected. It captures the remote transaction details along with the corresponding local state at the time of the conflict. Handling Multi-row Conflicts: A single remote tuple may conflict with multiple local tuples (e.g., in the case of multiple_unique_conflicts). To handle this, the infrastructure creates a single row in the conflict log table for each remote tuple. The details of all conflicting local rows are aggregated into a single JSON array in the local_conflicts column. The JSON array uses the following structured format: [ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1", "key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ] Example of querying the structured conflict data: SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM myschema.conflict_log_history2; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- src/backend/replication/logical/conflict.c | 560 +++++++++++++++++++-- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 31 +- src/include/replication/conflict.h | 3 + src/include/replication/worker_internal.h | 7 + src/test/subscription/t/035_conflicts.pl | 60 ++- 6 files changed, 617 insertions(+), 45 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 93222ee3b88..d05af2fbd6f 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,13 +15,20 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/jsonb.h" static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -34,6 +41,18 @@ static const char *const ConflictTypeNames[] = { [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; +/* Schema for the elements within the 'local_conflicts' JSON array */ +static const ConflictLogColumnDef LocalConflictSchema[] = +{ + { .attname = "xid", .atttypid = XIDOID }, + { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "origin", .atttypid = TEXTOID }, + { .attname = "key", .atttypid = JSONOID }, + { .attname = "tuple", .atttypid = JSONOID } +}; + +#define MAX_LOCAL_CONFLICT_INFO_ATTRS lengthof(LocalConflictSchema) + static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, @@ -50,8 +69,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -105,30 +143,90 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples) { - Relation localrel = relinfo->ri_RelationDesc; - StringInfoData err_detail; + Relation localrel = relinfo->ri_RelationDesc; + ConflictLogDest dest; + Relation conflictlogrel; + bool log_dest_clt; + bool log_dest_logfile; - initStringInfo(&err_detail); + pgstat_report_subscription_conflict(MySubscription->oid, type); - /* Form errdetail message by combining conflicting tuples information. */ - foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) - errdetail_apply_conflict(estate, relinfo, type, searchslot, - conflicttuple->slot, remoteslot, - conflicttuple->indexoid, - conflicttuple->xmin, - conflicttuple->origin, - conflicttuple->ts, - &err_detail); + /* + * Get both the conflict log destination and the opened conflict log + * relation for insertion. + */ + conflictlogrel = GetConflictLogDestAndTable(&dest); - pgstat_report_subscription_conflict(MySubscription->oid, type); + log_dest_clt = ((dest & CONFLICT_LOG_DEST_TABLE) != 0); + log_dest_logfile = ((dest & CONFLICT_LOG_DEST_LOG) != 0); + + /* Insert to table if requested. */ + if (log_dest_clt) + { + Assert(conflictlogrel != NULL); + + /* + * Prepare the conflict log tuple. If the error level is below ERROR, + * insert it immediately. Otherwise, defer the insertion to a new + * transaction after the current one aborts, ensuring the insertion of + * the log tuple is not rolled back. + */ + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + if (elevel < ERROR) + InsertConflictLogTuple(conflictlogrel); + + table_close(conflictlogrel, RowExclusiveLock); + + if (!log_dest_logfile) + { + /* + * Not logging conflict details to the server log; Report the error + * msg but omit raw tuple data from server logs since it's already + * captured in the internal table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details logged to internal table with OID %u.", + MySubscription->conflictlogrelid)); + } + } - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), - ConflictTypeNames[type]), - errdetail_internal("%s", err_detail.data)); + /* Log into the server log if requested. */ + if (log_dest_logfile) + { + StringInfoData err_detail; + + initStringInfo(&err_detail); + + /* Form errdetail message by combining conflicting tuples information. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, + &err_detail); + + /* Standard reporting with full internal details. */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s.%s\": conflict=%s", + get_namespace_name(RelationGetNamespace(localrel)), + RelationGetRelationName(localrel), + ConflictTypeNames[type]), + errdetail_internal("%s", err_detail.data)); + } } /* @@ -162,6 +260,64 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogDestAndTable + * + * Fetches conflict logging metadata from the cached MySubscription pointer. + * Sets the destination enum in *log_dest and, if applicable, opens and + * returns the relation handle for the internal log table. + */ +Relation +GetConflictLogDestAndTable(ConflictLogDest *log_dest) +{ + Oid conflictlogrelid; + Relation conflictlogrel = NULL; + + /* + * Convert the text log destination to the internal enum. MySubscription + * already contains the data from pg_subscription. + */ + *log_dest = GetLogDestination(MySubscription->conflictlogdest); + + /* Quick exit if a conflict log table was not requested. */ + if ((*log_dest & CONFLICT_LOG_DEST_TABLE) == 0) + return NULL; + + conflictlogrelid = MySubscription->conflictlogrelid; + + Assert(OidIsValid(conflictlogrelid)); + + conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock); + if (conflictlogrel == NULL) + elog(ERROR, "could not open conflict log table (OID %u)", + conflictlogrelid); + + return conflictlogrel; +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple + * inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + int options = HEAP_INSERT_NO_LOGICAL; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), options, NULL); + + /* Free conflict log tuple. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -472,6 +628,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, return tuple_value.data; } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -487,41 +677,325 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to Jsonb. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * jsonb datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = RelationGetDescr(indexDesc); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + index_close(indexDesc, NoLock); + heap_freetuple(tuple); + + /* Convert to a JSONB datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * build_conflict_tupledesc + * + * Build and bless a tuple descriptor for the internal conflict log table + * based on the predefined LocalConflictSchema. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + TupleDesc tupdesc; + + tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + + for (int i = 0; i < MAX_LOCAL_CONFLICT_INFO_ATTRS; i++) + TupleDescInitEntry(tupdesc, (AttrNumber) (i + 1), + LocalConflictSchema[i].attname, + LocalConflictSchema[i].atttypid, + -1, 0); + + BlessTupleDesc(tupdesc); + + return tupdesc; +} + +/* + * Builds the local conflicts JSONB array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; /* List to hold the row_to_json results (type json) */ + Datum *json_datum_array; + bool *json_null_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; + + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); + + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS] = {0}; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; + + if (conflicttuple->origin != InvalidRepOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + num_conflicts = list_length(json_datums); - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); + json_datum_array = palloc_array(Datum, num_conflicts); + json_null_array = palloc0_array(bool, num_conflicts); - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } - index_close(indexDesc, NoLock); + /* Construct the json[] array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + pfree(json_null_array); + + return json_array_datum; +} - return index_value; +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[MAX_CONFLICT_ATTR_NUM] = {0}; + bool nulls[MAX_CONFLICT_ATTR_NUM] = {0}; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_session_origin != InvalidRepOriginId) + replorigin_by_oid(replorigin_session_origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * json datum from key value. Otherwise, construct it from the complete + * tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3ed86480be2..2dda5a44218 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -477,6 +477,7 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ad281e7069b..d4be1122603 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,7 +482,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -5609,6 +5617,27 @@ start_apply(XLogRecPtr origin_startpos) pgstat_report_subscription_error(MySubscription->oid, MyLogicalRepWorker->type); + /* + * Insert any pending conflict log tuple under a new transaction. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + Relation conflictlogrel; + ConflictLogDest dest; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogDestAndTable(&dest); + Assert((dest & CONFLICT_LOG_DEST_TABLE) != 0); + InsertConflictLogTuple(conflictlogrel); + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_RE_THROW(); } } diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index ecdfacd4f00..009104b0503 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -144,4 +144,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest); +extern void InsertConflictLogTuple(Relation conflictlogrel); +extern bool ValidateConflictLogTable(Relation rel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index c1285fdd1bc..5bedfc5450f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -101,6 +101,9 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index ddc75e23fb0..4b2b36edba5 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -50,7 +50,7 @@ $node_subscriber->safe_psql( 'postgres', "CREATE SUBSCRIPTION sub_tab CONNECTION '$publisher_connstr application_name=$appname' - PUBLICATION pub_tab;"); + PUBLICATION pub_tab WITH (conflict_log_destination=all)"); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); @@ -86,10 +86,41 @@ $node_subscriber->wait_for_log( .*Key \(c\)=\(4\); existing local row \(4, 4, 4\); remote row \(2, 3, 4\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the clt contains the expected +# type and specific key data. +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); +my $clt = "pg_conflict.pg_conflict_$subid"; + +# Wait for the conflict to be logged in the CLT +my $log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +my $conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 1, 'Verified multiple_unique_conflicts logged into internal table'); + +my $json_query = qq[ + SELECT string_agg((unnested.j::json)->'key'->>'a', ',') + FROM ( + SELECT unnest(local_conflicts) AS j + FROM $clt + ) AS unnested; +]; + +my $all_keys = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '2' is present in the resulting string +like($all_keys, qr/\b2\b/, 'Verified that key 2 exists in the local_conflicts log'); + pass('multiple_unique_conflicts detected during insert'); # Truncate table to get rid of the error $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); +$node_subscriber->safe_psql('postgres', "DELETE FROM $clt"); ################################################## # Test multiple_unique_conflicts due to UPDATE @@ -118,6 +149,33 @@ $node_subscriber->wait_for_log( .*Key \(c\)=\(8\); existing local row \(8, 8, 8\); remote row \(6, 7, 8\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the CLT contains the expected +# type and specific key data. + +# Wait for the conflict to be logged in the CLT +$log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +$conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 1, 'Verified multiple_unique_conflicts logged into internal table'); + +$json_query = qq[ + SELECT string_agg((unnested.j::json)->'key'->>'a', ',') + FROM ( + SELECT unnest(local_conflicts) AS j + FROM $clt + ) AS unnested; +]; + +$all_keys = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '6' is present in the resulting string +like($all_keys, qr/\b6\b/, 'Verified that key 6 exists in the local_conflicts log'); + pass('multiple_unique_conflicts detected during update'); # Truncate table to get rid of the error -- 2.49.0