From 691bd39b685677e2f316087c679eb442132d1a15 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Tue, 16 Jul 2024 13:54:23 +0530 Subject: [PATCH v6 4/5] Manage Clock skew and implement last_update_wins This patch attempts to manage clock skew between nodes by introducing two new GUCs: a) max_logical_rep_clock_skew b) max_logical_rep_clock_skew_action c) max_logical_rep_clock_skew_wait If the timestamp of the currently replayed transaction is in the future compared to the current time on the subscriber and the difference is larger than 'max_logical_rep_clock_skew', then the action configured in 'max_logical_rep_clock_skew_action' is performed by the apply worker. If user configures 'wait' in 'max_logical_rep_clock_skew_action' and actual clock skew is large while 'max_logical_rep_clock_skew' is small, the apply worker may have to wait for a longer period to manage the clock skew. To control this maximum wait time, a new GUC, 'max_logical_rep_clock_skew_wait', is provided. This allows the user to set a cap on how long the apply worker should wait. If the computed wait time exceeds this value, the apply worker will error out without waiting. This patch also implements last_update_wins resolver. Since conflict resolution for two phase commit transactions using prepare-timestamp can result in data divergence, this patch also restricts enabling two_phase and detect_conflict together for a subscription --- src/backend/commands/subscriptioncmds.c | 45 +++++ src/backend/executor/execReplication.c | 2 +- .../replication/logical/applyparallelworker.c | 26 ++- src/backend/replication/logical/conflict.c | 142 ++++++++++++---- src/backend/replication/logical/origin.c | 1 + src/backend/replication/logical/worker.c | 155 ++++++++++++++++-- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 40 +++++ src/backend/utils/misc/postgresql.conf.sample | 9 +- src/include/catalog/pg_conflict.dat | 6 +- src/include/replication/conflict.h | 6 +- src/include/replication/logicalworker.h | 18 ++ src/include/replication/origin.h | 1 + src/include/replication/worker_internal.h | 2 +- src/include/utils/timestamp.h | 1 + .../regress/expected/conflict_resolver.out | 16 +- src/test/regress/expected/subscription.out | 2 + src/test/subscription/t/029_on_error.pl | 52 ++++-- .../subscription/t/034_conflict_resolver.pl | 107 +++++++++++- src/tools/pgindent/typedefs.list | 1 + 20 files changed, 556 insertions(+), 77 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 512b4273ae..7e3c7af160 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/xact.h" @@ -459,6 +460,22 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * Time based conflict resolution for two phase transactions can result in + * data divergence, so disallow enabling both together. + */ + if (opts->detectconflict && + IsSet(opts->specified_opts, SUBOPT_DETECT_CONFLICT)) + { + if (opts->twophase && + IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + /*- translator: both %s are strings of the form "option = value" */ + errmsg("%s and %s are mutually exclusive options", + "detect_conflict = true", "two_phase = true"))); + } } /* @@ -671,6 +688,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + /* Warn if detect_conflict is enabled and track_commit_timestamp is off */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -1279,6 +1303,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, values[Anum_pg_subscription_subdetectconflict - 1] = BoolGetDatum(opts.detectconflict); replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + + /* + * Time based conflict resolution for two phase + * transactions can result in data divergence, so disallow + * enabling it when two_phase is enabled. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that has two_phase enabled", + "detect_conflict"))); + + /* + * Warn if detect_conflict is enabled and + * track_commit_timestamp is off. + */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b18723e56b..8d93a965f2 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -630,7 +630,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, bool apply_remote = false; GetTupleCommitTs(*conflictslot, &xmin, &origin, &committs); - resolver = GetConflictResolver(rel, CT_INSERT_EXISTS, + resolver = GetConflictResolver(*conflictslot, rel, CT_INSERT_EXISTS, &apply_remote, NULL); ReportApplyConflict(CT_INSERT_EXISTS, resolver, rel, diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e7f7d4c5e4..10c7ca99df 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -312,6 +312,20 @@ pa_can_start(void) if (!AllTablesyncsReady()) return false; + /* + * Don't start a new parallel worker if user has either configured max + * clock skew or if conflict detection and resolution is ON. In both cases + * we need commit timestamp in the beginning. + * + * XXX: For conflict reolution case, see if we can reduce the scope of + * this restriction to only such cases where time-based resolvers are + * actually being used. + */ + if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT) || + MySubscription->detectconflict) + return false; + + return true; } @@ -696,9 +710,19 @@ pa_process_spooled_messages_if_required(void) } else if (fileset_state == FS_READY) { + /* + * Currently we do not support starting parallel apply worker when + * either clock skew is configured or conflict resolution is + * configured, thus it is okay to pass 0 as origin-timestamp here. + * + * XXX: If in future, we support starting pa worker even with conflict + * detection enabled, then here we need to pass remote's + * commit/prepare/abort timestamp; we can get that info from leader + * worker in shared memory. + */ apply_spooled_messages(&MyParallelShared->fileset, MyParallelShared->xid, - InvalidXLogRecPtr); + InvalidXLogRecPtr, 0); pa_set_fileset_state(MyParallelShared, FS_EMPTY); } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 574afe71f3..a8fa470cf7 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -44,6 +44,7 @@ const char *const ConflictTypeNames[] = { const char *const ConflictResolverNames[] = { [CR_REMOTE_APPLY] = "remote_apply", [CR_KEEP_LOCAL] = "keep_local", + [CR_LAST_UPDATE_WINS] = "last_update_wins", [CR_APPLY_OR_SKIP] = "apply_or_skip", [CR_APPLY_OR_ERROR] = "apply_or_error", [CR_SKIP] = "skip", @@ -69,11 +70,11 @@ const char *const ConflictResolverNames[] = { * friendly name for a resolver and thus has been added here. */ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { - [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, - [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, + [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, [CT_UPDATE_MISSING] = {CR_APPLY_OR_SKIP, CR_APPLY_OR_ERROR, CR_SKIP, CR_ERROR}, [CT_DELETE_MISSING] = {CR_SKIP, CR_ERROR}, - [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR} + [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR} }; /* @@ -81,11 +82,11 @@ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { * If this changes, change it in pg_conflict.dat as well. */ const int ConflictTypeDefaultResolvers[] = { - [CT_INSERT_EXISTS] = CR_REMOTE_APPLY, - [CT_UPDATE_DIFFER] = CR_REMOTE_APPLY, + [CT_INSERT_EXISTS] = CR_LAST_UPDATE_WINS, + [CT_UPDATE_DIFFER] = CR_LAST_UPDATE_WINS, [CT_UPDATE_MISSING] = CR_APPLY_OR_SKIP, [CT_DELETE_MISSING] = CR_SKIP, - [CT_DELETE_DIFFER] = CR_REMOTE_APPLY + [CT_DELETE_DIFFER] = CR_LAST_UPDATE_WINS }; @@ -201,6 +202,12 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, TupleTableSlot *conflictslot, bool apply_remote) { char *applymsg; + int errdet = 0; + char *local_ts; + char *remote_ts; + + local_ts = pstrdup(timestamptz_to_str(localts)); + remote_ts = pstrdup(timestamptz_to_str(replorigin_session_origin_timestamp)); if (apply_remote) applymsg = "applying the remote changes."; @@ -222,43 +229,63 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, if (resolver == CR_ERROR) { if (index_value && localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", - index_value, get_rel_name(conflictidx), localorigin, - localxmin, timestamptz_to_str(localts)); + errdet = errdetail("Key %s already exists in unique index \"%s\", which was modified by origin %u in transaction %u at %s.", + index_value, get_rel_name(conflictidx), + localorigin, localxmin, local_ts); else if (index_value && !localts) - return errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", - index_value, get_rel_name(conflictidx), localxmin); + errdet = errdetail("Key %s already exists in unique index \"%s\", which was modified in transaction %u.", + index_value, get_rel_name(conflictidx), localxmin); else - return errdetail("Key already exists in unique index \"%s\".", - get_rel_name(conflictidx)); + errdet = errdetail("Key already exists in unique index \"%s\".", + get_rel_name(conflictidx)); } + else if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Key already exists, %s. The local tuple : origin=%u, timestamp=%s; The remote tuple : origin=%u, timestamp=%s.", + applymsg, localorigin, local_ts, + replorigin_session_origin, remote_ts); else - return errdetail("Key already exists, %s", applymsg); + errdet = errdetail("Key already exists, %s", applymsg); } + break; case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s", - localorigin, localxmin, - timestamptz_to_str(localts), applymsg); + if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, local_ts, applymsg, + replorigin_session_origin, remote_ts); + else + errdet = errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, local_ts, applymsg); + break; case CT_UPDATE_MISSING: if (resolver == CR_APPLY_OR_SKIP && !apply_remote) - return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update."); + errdet = errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence SKIP the update."); else if (resolver == CR_APPLY_OR_ERROR && !apply_remote) - return errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence ERROR out."); + errdet = errdetail("Did not find the row to be updated. UPDATE can not be converted to INSERT, hence ERROR out."); else if (apply_remote) - return errdetail("Did not find the row to be updated. Convert UPDATE to INSERT and %s", - applymsg); + errdet = errdetail("Did not find the row to be updated. Convert UPDATE to INSERT and %s", + applymsg); else - return errdetail("Did not find the row to be updated, %s", - applymsg); + errdet = errdetail("Did not find the row to be updated, %s", + applymsg); + break; case CT_DELETE_MISSING: - return errdetail("Did not find the row to be deleted."); + errdet = errdetail("Did not find the row to be deleted."); + break; case CT_DELETE_DIFFER: - return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s", - localorigin, localxmin, - timestamptz_to_str(localts), applymsg); + if (resolver == CR_LAST_UPDATE_WINS) + errdet = errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, local_ts, applymsg, + replorigin_session_origin, remote_ts); + else + errdet = errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s, %s", + localorigin, localxmin, local_ts, applymsg); + break; } - return 0; /* silence compiler warning */ + pfree(local_ts); + pfree(remote_ts); + + return errdet; } /* @@ -366,6 +393,15 @@ validate_conflict_type_and_resolver(char *conflict_type, char *conflict_resolver errmsg("%s is not a valid conflict resolver for conflict type %s", conflict_resolver, conflict_type)); + + if ((resolver == CR_LAST_UPDATE_WINS) && !track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + conflict_resolver, "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + return type; } @@ -407,6 +443,42 @@ get_conflict_resolver_internal(ConflictType type) return resolver; } +/* + * Compare the timestamps of given local tuple and the remote tuple to + * resolve the conflict. + * + * Returns true if remote tuple has the latest timestamp, false otherwise. + */ +static bool +resolve_by_timestamp(TupleTableSlot *localslot) +{ + TransactionId local_xmin; + TimestampTz local_ts; + RepOriginId local_origin; + int ts_cmp; + uint64 local_system_identifier; + + /* Get origin and timestamp info of the local tuple */ + GetTupleCommitTs(localslot, &local_xmin, &local_origin, &local_ts); + + /* Compare the timestamps of remote & local tuple to decide the winner */ + ts_cmp = timestamptz_cmp_internal(replorigin_session_origin_timestamp, + local_ts); + + if (ts_cmp == 0) + { + elog(LOG, "Timestamps of remote and local tuple are equal, comparing remote and local system identifiers"); + + /* Get current system's identifier */ + local_system_identifier = GetSystemIdentifier(); + + return local_system_identifier <= replorigin_session_origin_sysid; + } + else + return (ts_cmp > 0); + +} + /* * Check if a full tuple can be created from the new tuple. * Return true if yes, false otherwise. @@ -512,7 +584,8 @@ ExecConflictResolverStmt(ConflictResolverStmt *stmt) * false otherwise. */ ConflictResolver -GetConflictResolver(Relation localrel, ConflictType type, bool *apply_remote, +GetConflictResolver(TupleTableSlot *localslot, Relation localrel, + ConflictType type, bool *apply_remote, LogicalRepTupleData *newtup) { ConflictResolver resolver; @@ -521,6 +594,17 @@ GetConflictResolver(Relation localrel, ConflictType type, bool *apply_remote, switch (resolver) { + case CR_LAST_UPDATE_WINS: + if (!track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + ConflictResolverNames[resolver], "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + else + *apply_remote = resolve_by_timestamp(localslot); + break; case CR_REMOTE_APPLY: *apply_remote = true; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 419e4814f0..bd8e6f0024 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -155,6 +155,7 @@ typedef struct ReplicationStateCtl RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; +uint64 replorigin_session_origin_sysid = 0; /* * Base address into a shared memory array of replication states of size diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 9859f0f2bd..5f0af32e29 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -318,6 +318,20 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +/* + * GUC support + */ +const struct config_enum_entry logical_rep_clock_skew_action_options[] = { + {"error", LR_CLOCK_SKEW_ACTION_ERROR, false}, + {"wait", LR_CLOCK_SKEW_ACTION_WAIT, false}, + {NULL, 0, false} +}; + +/* GUCs */ +int max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT; +int max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR; +int max_logical_rep_clock_skew_wait = 300; /* 5 mins */ + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -987,6 +1001,95 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +/* + * Manage clock skew between nodes. + * + * It checks if the remote timestamp is ahead of the local clock + * and if the difference exceeds max_logical_rep_clock_skew, it performs + * the action specified by the max_logical_rep_clock_skew_action. + */ +static void +manage_clock_skew(TimestampTz origin_timestamp) +{ + TimestampTz current; + TimestampTz delayUntil; + long msecs; + int rc; + + /* nothing to do if no max clock skew configured */ + if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT) + return; + + current = GetCurrentTimestamp(); + + /* + * If the timestamp of the currently replayed transaction is in the future + * compared to the current time on the subscriber and the difference is + * larger than max_logical_rep_clock_skew, then perform the action + * specified by the max_logical_rep_clock_skew_action setting. + */ + if (origin_timestamp > current && + TimestampDifferenceExceeds(current, origin_timestamp, + max_logical_rep_clock_skew * 1000)) + { + if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew exceeds max_logical_rep_clock_skew (%d seconds)", + max_logical_rep_clock_skew))); + + /* Perform the wait */ + while (true) + { + delayUntil = + TimestampTzMinusSeconds(origin_timestamp, + max_logical_rep_clock_skew); + + /* Exit without waiting if it's already past 'delayUntil' time */ + msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + delayUntil); + if (msecs <= 0) + break; + + /* The wait time should not exceed max_logical_rep_clock_skew_wait */ + if (msecs > (max_logical_rep_clock_skew_wait * 1000L)) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg_internal("clock skew wait time exceeds max_logical_rep_clock_skew_wait (%d seconds)", + max_logical_rep_clock_skew_wait))); + + elog(DEBUG2, "delaying apply for %ld milliseconds to manage clock skew", + msecs); + + /* Sleep until we are signaled or msecs have elapsed */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + msecs, + WAIT_EVENT_LOGICAL_CLOCK_SKEW); + + /* Exit the loop if msecs have elapsed */ + if (rc & WL_TIMEOUT) + break; + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* + * This might change max_logical_rep_clock_skew and + * max_logical_rep_clock_skew_wait. + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } +} + /* * Handle BEGIN message. */ @@ -1008,6 +1111,15 @@ apply_handle_begin(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.committime); + + /* + * Capture the commit timestamp of the remote transaction + * for time based conflict resolution purpose. + */ + replorigin_session_origin_timestamp = begin_data.committime; } /* @@ -1065,6 +1177,9 @@ apply_handle_begin_prepare(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.prepare_time); } /* @@ -1305,7 +1420,8 @@ apply_handle_stream_prepare(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, - prepare_data.xid, prepare_data.prepare_lsn); + prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -2002,7 +2118,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, */ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + TimestampTz origin_timestamp) { int nchanges; char path[MAXPGPATH]; @@ -2055,6 +2172,16 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, end_replication_step(); + /* + * If origin_timestamp is provided by caller, then check clock skew with + * respect to the passed time and take configured action. + */ + if (origin_timestamp) + manage_clock_skew(origin_timestamp); + + /* Capture the timestamp (prepare or commit) of the remote transaction */ + replorigin_session_origin_timestamp = origin_timestamp; + /* * Read the entries one by one and pass them through the same logic as in * apply_dispatch. @@ -2160,7 +2287,8 @@ apply_handle_stream_commit(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn, + commit_data.committime); apply_handle_commit_internal(&commit_data); @@ -2717,7 +2845,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(localrel, CT_UPDATE_DIFFER, + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_DIFFER, &apply_remote, NULL); ReportApplyConflict(CT_UPDATE_DIFFER, resolver, localrel, @@ -2753,7 +2881,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(localrel, CT_UPDATE_MISSING, + resolver = GetConflictResolver(localslot, localrel, CT_UPDATE_MISSING, &apply_remote, newtup); ReportApplyConflict(CT_UPDATE_MISSING, resolver, localrel, @@ -2906,7 +3034,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(localrel, CT_DELETE_DIFFER, + resolver = GetConflictResolver(localslot, localrel, CT_DELETE_DIFFER, &apply_remote, NULL); ReportApplyConflict(CT_DELETE_DIFFER, resolver, localrel, @@ -2932,7 +3060,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(localrel, CT_DELETE_MISSING, + resolver = GetConflictResolver(localslot, localrel, CT_DELETE_MISSING, &apply_remote, NULL); /* Resolver is set to skip, thus report the conflict and skip */ @@ -3130,7 +3258,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ if (MySubscription->detectconflict) { - resolver = GetConflictResolver(partrel, CT_UPDATE_MISSING, + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_MISSING, &apply_remote, newtup); ReportApplyConflict(CT_UPDATE_MISSING, resolver, partrel, @@ -3162,7 +3290,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, GetTupleCommitTs(localslot, &localxmin, &localorigin, &localts) && localorigin != replorigin_session_origin) { - resolver = GetConflictResolver(partrel, CT_UPDATE_DIFFER, + resolver = GetConflictResolver(localslot, partrel, CT_UPDATE_DIFFER, &apply_remote, NULL); ReportApplyConflict(CT_UPDATE_DIFFER, resolver, partrel, @@ -4682,6 +4810,7 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + char *replorigin_sysid; slotname = MySubscription->slotname; @@ -4722,10 +4851,12 @@ run_apply_worker() MySubscription->name, err))); /* - * We don't really use the output identify_system for anything but it does - * some initializations on the upstream so let's still call it. + * Call identify_system to do some initializations on the upstream and + * store the output as system identifier of the replication origin node. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + replorigin_sysid = walrcv_identify_system(LogRepWorkerWalRcvConn, + &startpointTLI); + replorigin_session_origin_sysid = strtoul(replorigin_sysid, NULL, 10); set_apply_error_context_origin(originname); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index db37beeaae..a51f82169e 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -59,6 +59,7 @@ CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." +LOGICAL_CLOCK_SKEW "Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 630ed0f162..8c843c522e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -492,6 +493,7 @@ extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry recovery_target_action_options[]; extern const struct config_enum_entry wal_sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; +extern const struct config_enum_entry logical_rep_clock_skew_action_options[]; /* * GUC option variables that are exported from this module @@ -3649,6 +3651,33 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets maximum clock skew tolerance between logical " + "replication nodes beyond which action configured " + "in max_logical_rep_clock_skew_action is triggered."), + gettext_noop("-1 turns this check off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew, + LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_logical_rep_clock_skew_wait", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets max limit on how long apply worker shall wait to " + "bring clock skew within permissible range of max_logical_rep_clock_skew. " + "If the computed wait time is more than this value, " + "apply worker will error out without waiting."), + gettext_noop("0 turns this limit off."), + GUC_UNIT_S + }, + &max_logical_rep_clock_skew_wait, + 300, 0, 3600, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4915,6 +4944,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the action to perform if a clock skew higher " + "than max_logical_rep_clock_skew is detected."), + NULL + }, + &max_logical_rep_clock_skew_action, + LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options, + NULL, NULL, NULL + }, + { {"track_functions", PGC_SUSET, STATS_CUMULATIVE, gettext_noop("Collects function-level statistics on database activity."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9ec9f97e92..f7a664a538 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -383,7 +383,14 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers - +#max_logical_rep_clock_skew = -1 # maximum clock skew tolerance between logical + # replication nodes beyond which action configured in + # 'max_logical_rep_clock_skew_action' is triggered. +#max_logical_rep_clock_skew_action = error # error or wait + # (change requires restart) +#max_logical_rep_clock_skew_wait = 300 # max limit on how long apply worker + # shall wait to bring clock skew within permissible + # range of max_logical_rep_clock_skew. #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/include/catalog/pg_conflict.dat b/src/include/catalog/pg_conflict.dat index 8d6183e3f2..1905151840 100644 --- a/src/include/catalog/pg_conflict.dat +++ b/src/include/catalog/pg_conflict.dat @@ -12,10 +12,10 @@ [ -{ conftype => 'insert_exists', confres => 'remote_apply' }, -{ conftype => 'update_differ', confres => 'remote_apply' }, +{ conftype => 'insert_exists', confres => 'last_update_wins' }, +{ conftype => 'update_differ', confres => 'last_update_wins' }, { conftype => 'update_missing', confres => 'apply_or_skip' }, { conftype => 'delete_missing', confres => 'skip' }, -{ conftype => 'delete_differ', confres => 'remote_apply' } +{ conftype => 'delete_differ', confres => 'last_update_wins' } ] diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index b06c0f75a0..4e2853b5be 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -57,6 +57,9 @@ typedef enum ConflictResolver /* Keep the local change */ CR_KEEP_LOCAL, + /* Apply the change with latest timestamp */ + CR_LAST_UPDATE_WINS, + /* Apply the remote change; skip if it can not be applied */ CR_APPLY_OR_SKIP, @@ -83,7 +86,8 @@ extern void ReportApplyConflict(ConflictType type, ConflictResolver resolver, TupleTableSlot *conflictslot, bool apply_remote); extern void InitConflictIndexes(ResultRelInfo *relInfo); extern void ExecConflictResolverStmt(ConflictResolverStmt *stmt); -extern ConflictResolver GetConflictResolver(Relation localrel, +extern ConflictResolver GetConflictResolver(TupleTableSlot *localslot, + Relation localrel, ConflictType type, bool *apply_remote, LogicalRepTupleData *newtup); diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..7cb03062ac 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,7 +14,25 @@ #include +/* + * The default for max_logical_rep_clock_skew is -1, which means ignore clock + * skew (the check is turned off). + */ +#define LR_CLOCK_SKEW_DEFAULT -1 + +/* + * Worker Clock Skew Action. + */ +typedef enum +{ + LR_CLOCK_SKEW_ACTION_ERROR, + LR_CLOCK_SKEW_ACTION_WAIT, +} LogicalRepClockSkewAction; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; +extern PGDLLIMPORT int max_logical_rep_clock_skew; +extern PGDLLIMPORT int max_logical_rep_clock_skew_action; +extern PGDLLIMPORT int max_logical_rep_clock_skew_wait; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 7189ba9e76..dcbbbdf6ea 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -36,6 +36,7 @@ typedef struct xl_replorigin_drop extern PGDLLIMPORT RepOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +extern PGDLLIMPORT uint64 replorigin_session_origin_sysid; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..dc9e067fac 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -267,7 +267,7 @@ extern void stream_stop_internal(TransactionId xid); /* Common streaming function to apply all the spooled messages */ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn); + XLogRecPtr lsn, TimestampTz origin_timestamp); extern void apply_dispatch(StringInfo s); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index a6ce03ed46..53b828d89d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X) /* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) +#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000)) /* Set at postmaster start */ diff --git a/src/test/regress/expected/conflict_resolver.out b/src/test/regress/expected/conflict_resolver.out index 53ecb430fa..c21486dbb4 100644 --- a/src/test/regress/expected/conflict_resolver.out +++ b/src/test/regress/expected/conflict_resolver.out @@ -1,11 +1,11 @@ --check default global resolvers in system catalog select * from pg_conflict order by conftype; - conftype | confres -----------------+--------------- - delete_differ | remote_apply + conftype | confres +----------------+------------------ + delete_differ | last_update_wins delete_missing | skip - insert_exists | remote_apply - update_differ | remote_apply + insert_exists | last_update_wins + update_differ | last_update_wins update_missing | apply_or_skip (5 rows) @@ -43,11 +43,11 @@ RESET CONFLICT RESOLVER for 'delete_missing'; RESET CONFLICT RESOLVER for 'insert_exists'; --check resolvers are reset to default for delete_missing and insert_exists select * from pg_conflict order by conftype; - conftype | confres -----------------+---------------- + conftype | confres +----------------+------------------ delete_differ | keep_local delete_missing | skip - insert_exists | remote_apply + insert_exists | last_update_wins update_differ | keep_local update_missing | apply_or_error (5 rows) diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 3d8b8c5d32..844a5dfff8 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -433,6 +433,8 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ERROR: detect_conflict requires a Boolean value -- now it works CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: detect_conflict is enabled but "track_commit_timestamp" is OFF, the last_update_wins resolution may not work +HINT: Enable "track_commit_timestamp". WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index c746191601..6c1eb7e5a9 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -18,7 +18,7 @@ my $offset = 0; # on the publisher. sub test_skip_lsn { - my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg) + my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg, $conflict_detection) = @_; # Wait until a conflict occurs on the subscriber. @@ -26,13 +26,25 @@ sub test_skip_lsn "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'" ); + my $lsn; + my $contents = slurp_file($node_subscriber->logfile, $offset); + # Get the finish LSN of the error transaction, mapping the expected # ERROR with its CONTEXT when retrieving this information. - my $contents = slurp_file($node_subscriber->logfile, $offset); - $contents =~ - qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m - or die "could not get error-LSN"; - my $lsn = $1; + if ($conflict_detection) + { + $contents =~ + qr/conflict insert_exists detected on relation "public.tbl".*\n.*DETAIL:.* Key \(i\)=\(1\) already exists in unique index "tbl_pkey", which was modified by origin \d+ in transaction \d+ at .*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + or die "could not get error-LSN"; + $lsn = $1; + } + else + { + $contents =~ + qr/duplicate key value violates unique constraint "tbl_pkey".*\n.*DETAIL:.*\n.*CONTEXT:.* for replication target relation "public.tbl" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m + or die "could not get error-LSN"; + $lsn = $1; + } # Set skip lsn. $node_subscriber->safe_psql('postgres', @@ -110,7 +122,7 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE tbl"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on, detect_conflict = on)" + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, detect_conflict = on)" ); # Set 'ERROR' conflict resolver for 'insert_exist' conflict type @@ -147,7 +159,22 @@ INSERT INTO tbl VALUES (1, NULL); COMMIT; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(2, NULL)", "2", "test skipping transaction"); + "(2, NULL)", "2", "test skipping transaction", 1); + +# Cleanup before we start PREPARE AND COMMIT PREPARED tests +$node_subscriber->safe_psql('postgres', "TRUNCATE tbl"); +$node_publisher->safe_psql('postgres', "TRUNCATE tbl"); + +# Drop subscription and recreate with two_phase enabled +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub'); + +$node_subscriber->safe_psql('postgres', "INSERT INTO tbl VALUES (1, NULL)"); # Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and # PREPARE the transaction, raising an error. Then skip the transaction. @@ -160,7 +187,7 @@ PREPARE TRANSACTION 'gtx'; COMMIT PREPARED 'gtx'; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(3, NULL)", "3", "test skipping prepare and commit prepared "); + "(2, NULL)", "2", "test skipping prepare and commit prepared ", 0); # Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB # limit, also raising an error on the subscriber during applying spooled @@ -173,17 +200,14 @@ INSERT INTO tbl SELECT i, sha256(i::text::bytea) FROM generate_series(1, 10000) COMMIT; ]); test_skip_lsn($node_publisher, $node_subscriber, - "(4, sha256(4::text::bytea))", - "4", "test skipping stream-commit"); + "(3, sha256(4::text::bytea))", + "3", "test skipping stream-commit", 0); $result = $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM pg_prepared_xacts"); is($result, "0", "check all prepared transactions are resolved on the subscriber"); -# Reset conflict resolver for 'insert_exist' conflict type to default. -$node_subscriber->safe_psql('postgres',"reset conflict resolver for 'insert_exists'"); - $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/034_conflict_resolver.pl b/src/test/subscription/t/034_conflict_resolver.pl index 0fb5ef9a86..3bde2750da 100755 --- a/src/test/subscription/t/034_conflict_resolver.pl +++ b/src/test/subscription/t/034_conflict_resolver.pl @@ -51,10 +51,10 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT conftype, confres FROM pg_conflict ORDER BY conftype" ); -is( $result, qq(delete_differ|remote_apply +is( $result, qq(delete_differ|last_update_wins delete_missing|skip -insert_exists|remote_apply -update_differ|remote_apply +insert_exists|last_update_wins +update_differ|last_update_wins update_missing|apply_or_skip), "confirm that the default conflict resolvers are in place" ); @@ -63,6 +63,11 @@ update_missing|apply_or_skip), # Test 'remote_apply' for 'insert_exists' ############################################ +# Change CONFLICT RESOLVER of insert_exists to remote_apply +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'remote_apply' for 'insert_exists';" +); + # Create local data on the subscriber $node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'fromsub')"); @@ -146,6 +151,34 @@ $node_subscriber->wait_for_log( $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); +############################################ +# Test 'last_update_wins' for 'insert_exists' +############################################ + +# Change CONFLICT RESOLVER of insert_exists to last_update_wins +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'last_update_wins' for 'insert_exists';" +); + +# Create local data on the subscriber +$node_subscriber->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (4,'fromsub')"); + +# Create conflicting data on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "INSERT INTO conf_tab(a, data) VALUES (4,'frompub')"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict insert_exists detected on relation \"public.conf_tab\"/, $log_offset); + +# Confirm that remote insert is converted to an update and the remote data is updated. +$result = $node_subscriber->safe_psql('postgres', + "SELECT data FROM conf_tab WHERE (a=4);" +); + +is($result, 'frompub', "remote data wins"); + ################################### # Test 'skip' for 'delete_missing' ################################### @@ -203,14 +236,48 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); ######################################### -# Test 'remote_apply' for 'delete_differ' +# Test 'last_update_wins' for 'delete_differ' ######################################### +# Change CONFLICT RESOLVER of delete_differ to last_update_wins +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'last_update_wins' for 'delete_differ';" +); + # Insert data in the publisher $node_publisher->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); INSERT INTO conf_tab(a, data) VALUES (2,'frompub'); - INSERT INTO conf_tab(a, data) VALUES (3,'frompub');"); + INSERT INTO conf_tab(a, data) VALUES (3,'frompub'); + INSERT INTO conf_tab(a, data) VALUES (4,'frompub');"); + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub' WHERE (a=4);"); + +# Create a conflicting delete on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "DELETE FROM conf_tab WHERE (a=4);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict delete_differ detected on relation \"public.conf_tab\"/, $log_offset); + +# Confirm that the remote delete the local updated row +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=4);" +); + +is($result, '', "delete from remote wins"); + +######################################### +# Test 'remote_apply' for 'delete_differ' +######################################### + +# Change CONFLICT RESOLVER of delete_differ to remote_apply +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'remote_apply' for 'delete_differ';" +); # Modify data on the subscriber $node_subscriber->safe_psql('postgres', @@ -303,9 +370,14 @@ $node_subscriber->safe_psql('postgres', $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); ######################################### -# Test 'remote_apply' for 'update_differ' +# Test 'last_update_wins' for 'update_differ' ######################################### +# Change CONFLICT RESOLVER of update_differ to remote_apply +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'remote_apply' for 'update_differ';" +); + # Insert data in the publisher $node_publisher->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); @@ -331,6 +403,29 @@ $result = $node_subscriber->safe_psql('postgres', is($result, 'frompubnew', "update from remote is kept"); +######################################### +# Test 'remote_apply' for 'update_differ' +######################################### + +# Modify data on the subscriber +$node_subscriber->safe_psql('postgres', + "UPDATE conf_tab SET data = 'fromsub2' WHERE (a=1);"); + +# Create a conflicting update on the publisher +$log_offset = -s $node_subscriber->logfile; +$node_publisher->safe_psql('postgres', + "UPDATE conf_tab SET data = 'frompubnew2' WHERE (a=1);"); + +$node_subscriber->wait_for_log( + qr/LOG: conflict update_differ detected on relation \"public.conf_tab\"/, $log_offset); + +# Confirm that the remote update overrides the local update +$result = $node_subscriber->safe_psql('postgres', + "SELECT data from conf_tab WHERE (a=1);" +); + +is($result, 'frompubnew2', "update from remote is kept"); + ######################################### # Test 'keep_local' for 'update_differ' ######################################### diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1dfcc57fb1..ae3ee18764 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1567,6 +1567,7 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepClockSkewAction LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct -- 2.34.1