From 36acc99dbfe4c6ed9b846dee067839954ce6778d Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Thu, 4 Jul 2024 16:16:46 +0530 Subject: [PATCH v5 4/5] Manage Clock skew and implement last_update_wins This patch attempts to manage clock skew between nodes by introducing two new GUCs: a) max_logical_rep_clock_skew b) max_logical_rep_clock_skew_action This patch also implements last_update_wins resolver. --- src/backend/commands/subscriptioncmds.c | 18 +++ .../replication/logical/applyparallelworker.c | 30 +++- src/backend/replication/logical/conflict.c | 107 ++++++++++++-- src/backend/replication/logical/origin.c | 1 + src/backend/replication/logical/worker.c | 130 +++++++++++++++++- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 26 ++++ src/backend/utils/misc/postgresql.conf.sample | 6 +- src/include/catalog/pg_conflict.dat | 6 +- src/include/replication/conflict.h | 3 + src/include/replication/logicalworker.h | 17 +++ src/include/replication/origin.h | 1 + src/include/replication/worker_internal.h | 2 +- src/include/utils/timestamp.h | 1 + .../regress/expected/conflict_resolver.out | 16 +-- src/test/regress/expected/subscription.out | 2 + .../subscription/t/034_conflict_resolver.pl | 16 ++- src/tools/pgindent/typedefs.list | 1 + 18 files changed, 349 insertions(+), 35 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e670d72708..9126255ff3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/xact.h" @@ -671,6 +672,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\""); #endif + /* Warn if detect_conflict is enabled and track_commit_timestamp is off */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); + rel = table_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -1277,6 +1285,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, values[Anum_pg_subscription_subdetectconflict - 1] = BoolGetDatum(opts.detectconflict); replaces[Anum_pg_subscription_subdetectconflict - 1] = true; + + /* + * Warn if detect_conflict is enabled and + * track_commit_timestamp is off. + */ + if (opts.detectconflict && !track_commit_timestamp) + ereport(WARNING, + (errmsg("detect_conflict is enabled but \"%s\" is OFF, the last_update_wins resolution may not work", + "track_commit_timestamp"), + errhint("Enable \"%s\".", "track_commit_timestamp"))); } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index e7f7d4c5e4..ab4ede3fa9 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -312,6 +312,24 @@ pa_can_start(void) if (!AllTablesyncsReady()) return false; + /* + * Don't start a new parallel worker if user has set max clock skew + * tolerance as the commit timestamp will be needed during 'begin' itself + * to manage clock skew. + * + * Also don't start parallel apply worker if conflict detection and + * resolution is ON as commit timesamp will be needed for time based + * resolution methods while applying concerned changes. + * + * XXX: For second case, see if we can reduce the scope of this + * restriction to only such cases where time-based resolvers are actually + * being used. + */ + if ((max_logical_rep_clock_skew > LR_CLOCK_SKEW_DEFAULT) || + MySubscription->detectconflict) + return false; + + return true; } @@ -696,9 +714,19 @@ pa_process_spooled_messages_if_required(void) } else if (fileset_state == FS_READY) { + /* + * Currently we do not support starting parallel apply worker when + * either clock skew is configured or conflict resolution is + * configured, thus it is okay to pass 0 as origin-timestamp here. + * + * XXX: If in future, we support starting pa worker even with conflict + * detection enabled, then here we need to pass remote's + * commit/prepare/abort timestamp; we can get that info from leader + * worker in shared memory. + */ apply_spooled_messages(&MyParallelShared->fileset, MyParallelShared->xid, - InvalidXLogRecPtr); + InvalidXLogRecPtr, 0); pa_set_fileset_state(MyParallelShared, FS_EMPTY); } diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 5a7f0e0b53..c330b21217 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -44,6 +44,7 @@ const char *const ConflictTypeNames[] = { const char *const ConflictResolverNames[] = { [CR_REMOTE_APPLY] = "remote_apply", [CR_KEEP_LOCAL] = "keep_local", + [CR_LAST_UPDATE_WINS] = "last_update_wins", [CR_APPLY_OR_SKIP] = "apply_or_skip", [CR_APPLY_OR_ERROR] = "apply_or_error", [CR_SKIP] = "skip", @@ -69,11 +70,11 @@ const char *const ConflictResolverNames[] = { * friendly name for a resolver and thus has been added here. */ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { - [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, - [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR}, + [CT_INSERT_EXISTS] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, + [CT_UPDATE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR}, [CT_UPDATE_MISSING] = {CR_APPLY_OR_SKIP, CR_APPLY_OR_ERROR, CR_SKIP, CR_ERROR}, [CT_DELETE_MISSING] = {CR_SKIP, CR_ERROR}, - [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_ERROR} + [CT_DELETE_DIFFER] = {CR_REMOTE_APPLY, CR_KEEP_LOCAL, CR_LAST_UPDATE_WINS, CR_ERROR} }; /* @@ -81,11 +82,11 @@ const int ConflictTypeResolverMap[][CONFLICT_TYPE_MAX_RESOLVERS] = { * If this changes, change it in pg_conflict.dat as well. */ const int ConflictTypeDefaultResolvers[] = { - [CT_INSERT_EXISTS] = CR_REMOTE_APPLY, - [CT_UPDATE_DIFFER] = CR_REMOTE_APPLY, + [CT_INSERT_EXISTS] = CR_LAST_UPDATE_WINS, + [CT_UPDATE_DIFFER] = CR_LAST_UPDATE_WINS, [CT_UPDATE_MISSING] = CR_APPLY_OR_SKIP, [CT_DELETE_MISSING] = CR_SKIP, - [CT_DELETE_DIFFER] = CR_REMOTE_APPLY + [CT_DELETE_DIFFER] = CR_LAST_UPDATE_WINS }; @@ -215,11 +216,30 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, get_rel_name(conflictidx)); } else - return errdetail("Key already exists. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); + { + if (resolver == CR_LAST_UPDATE_WINS) + return errdetail("Key already exists. Applying resolution method \"%s\". The local tuple : origin=%u, timestamp=%s; The remote tuple : origin=%u, timestamp=%s.", + ConflictResolverNames[resolver], + localorigin, timestamptz_to_str(localts), + replorigin_session_origin, + timestamptz_to_str(replorigin_session_origin_timestamp)); + else + return errdetail("Key already exists. Applying resolution method \"%s\"", + ConflictResolverNames[resolver]); + } + } case CT_UPDATE_DIFFER: - return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", - localorigin, localxmin, timestamptz_to_str(localts), ConflictResolverNames[resolver]); + if (resolver == CR_LAST_UPDATE_WINS) + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\". The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver], + replorigin_session_origin, + timestamptz_to_str(replorigin_session_origin_timestamp)); + else + return errdetail("Updating a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver]); case CT_UPDATE_MISSING: return errdetail("Did not find the row to be updated. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); @@ -227,9 +247,16 @@ errdetail_apply_conflict(ConflictType type, ConflictResolver resolver, return errdetail("Did not find the row to be deleted. Applying resolution method \"%s\"", ConflictResolverNames[resolver]); case CT_DELETE_DIFFER: - return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", - localorigin, localxmin, timestamptz_to_str(localts), - ConflictResolverNames[resolver]); + if (resolver == CR_LAST_UPDATE_WINS) + return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\". The remote tuple : origin=%u, timestamp=%s.", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver], + replorigin_session_origin, + timestamptz_to_str(replorigin_session_origin_timestamp)); + else + return errdetail("Deleting a row that was modified by a different origin %u in transaction %u at %s. Applying resolution method \"%s\"", + localorigin, localxmin, timestamptz_to_str(localts), + ConflictResolverNames[resolver]); } return 0; /* silence compiler warning */ @@ -340,6 +367,15 @@ validate_conflict_type_and_resolver(char *conflict_type, char *conflict_resolver errmsg("%s is not a valid conflict resolver for conflict type %s", conflict_resolver, conflict_type)); + + if ((resolver == CR_LAST_UPDATE_WINS) && !track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + conflict_resolver, "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + return type; } @@ -381,6 +417,42 @@ get_conflict_resolver_internal(ConflictType type) return resolver; } +/* + * Compare the timestamps of given local tuple and the remote tuple to + * resolve the conflict. + * + * Returns true if remote tuple has the latest timestamp, false otherwise. + */ +static bool +resolve_by_timestamp(TupleTableSlot *localslot) +{ + TransactionId local_xmin; + TimestampTz local_ts; + RepOriginId local_origin; + int ts_cmp; + uint64 local_system_identifier; + + /* Get origin and timestamp info of the local tuple */ + GetTupleCommitTs(localslot, &local_xmin, &local_origin, &local_ts); + + /* Compare the timestamps of remote & local tuple to decide the winner */ + ts_cmp = timestamptz_cmp_internal(replorigin_session_origin_timestamp, + local_ts); + + if (ts_cmp == 0) + { + elog(LOG, "Timestamps of remote and local tuple are equal, comparing remote and local system identifiers"); + + /* Get current system's identifier */ + local_system_identifier = GetSystemIdentifier(); + + return local_system_identifier <= replorigin_session_origin_sysid; + } + else + return (ts_cmp > 0); + +} + /* * Check if a full tuple can be created from the new tuple. * Return true if yes, false otherwise. @@ -499,6 +571,17 @@ GetConflictResolver(TupleTableSlot *localslot, Relation localrel, switch (resolver) { + case CR_LAST_UPDATE_WINS: + if (!track_commit_timestamp) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("resolver %s requires \"%s\" to be enabled", + ConflictResolverNames[resolver], "track_commit_timestamp"), + errhint("Make sure the configuration parameter \"%s\" is set.", + "track_commit_timestamp")); + else + *apply_remote = resolve_by_timestamp(localslot); + break; case CR_REMOTE_APPLY: *apply_remote = true; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 419e4814f0..bd8e6f0024 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -155,6 +155,7 @@ typedef struct ReplicationStateCtl RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; TimestampTz replorigin_session_origin_timestamp = 0; +uint64 replorigin_session_origin_sysid = 0; /* * Base address into a shared memory array of replication states of size diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 239d5c7cd1..3156b0773f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -318,6 +318,19 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +/* + * GUC support + */ +const struct config_enum_entry logical_rep_clock_skew_action_options[] = { + {"error", LR_CLOCK_SKEW_ACTION_ERROR, false}, + {"wait", LR_CLOCK_SKEW_ACTION_WAIT, false}, + {NULL, 0, false} +}; + +/* GUCs */ +int max_logical_rep_clock_skew = LR_CLOCK_SKEW_DEFAULT; +int max_logical_rep_clock_skew_action = LR_CLOCK_SKEW_ACTION_ERROR; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -987,6 +1000,86 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, ExecStoreVirtualTuple(slot); } +/* + * Manage clock skew between nodes. + * + * It checks if the remote commit timestamp is ahead of the local clock + * and if the difference exceeds max_logical_rep_clock_skew, it performs + * the action specified by the max_logical_rep_clock_skew_action. + */ +static void +manage_clock_skew(TimestampTz origin_timestamp) +{ + TimestampTz current; + TimestampTz delayUntil; + long msecs; + int rc; + + /* nothing to do if no max clock skew configured */ + if (max_logical_rep_clock_skew == LR_CLOCK_SKEW_DEFAULT) + return; + + current = GetCurrentTimestamp(); + + /* + * If the timestamp of the currently replayed transaction is in the future + * compared to the current time on the subscriber and the difference is + * larger than max_logical_rep_clock_skew, then perform the action + * specified by the max_logical_rep_clock_skew_action setting. + */ + if (origin_timestamp > current && + TimestampDifferenceExceeds(current, origin_timestamp, + max_logical_rep_clock_skew * 1000)) + { + if (max_logical_rep_clock_skew_action == LR_CLOCK_SKEW_ACTION_ERROR) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("logical replication clock skew exceeded max tolerated value of %d seconds", + max_logical_rep_clock_skew))); + + /* Perform the wait */ + while (true) + { + delayUntil = + TimestampTzMinusSeconds(origin_timestamp, + max_logical_rep_clock_skew); + + /* Exit without waiting if it's already past 'delayUntil' time */ + msecs = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), + delayUntil); + if (msecs <= 0) + break; + + elog(LOG, "delaying apply for %ld milliseconds to bring clock skew " + "within permissible value of %d seconds", + msecs, max_logical_rep_clock_skew); + + /* Sleep until we are signaled or msecs have elapsed */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + msecs, + WAIT_EVENT_LOGICAL_CLOCK_SKEW); + + /* Exit the loop if msecs have elapsed */ + if (rc & WL_TIMEOUT) + break; + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* This might change max_logical_rep_clock_skew. */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + } +} + /* * Handle BEGIN message. */ @@ -1008,6 +1101,12 @@ apply_handle_begin(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.committime); + + /* Capture the commit timestamp of the remote transaction */ + replorigin_session_origin_timestamp = begin_data.committime; } /* @@ -1065,6 +1164,12 @@ apply_handle_begin_prepare(StringInfo s) in_remote_transaction = true; pgstat_report_activity(STATE_RUNNING, NULL); + + /* Check if there is any clock skew and perform configured action */ + manage_clock_skew(begin_data.prepare_time); + + /* Capture the prepare timestamp of the remote transaction */ + replorigin_session_origin_timestamp = begin_data.prepare_time; } /* @@ -1305,7 +1410,8 @@ apply_handle_stream_prepare(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, - prepare_data.xid, prepare_data.prepare_lsn); + prepare_data.xid, prepare_data.prepare_lsn, + prepare_data.prepare_time); /* Mark the transaction as prepared. */ apply_handle_prepare_internal(&prepare_data); @@ -2002,7 +2108,8 @@ ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, */ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + TimestampTz origin_timestamp) { int nchanges; char path[MAXPGPATH]; @@ -2055,6 +2162,13 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, end_replication_step(); + /* Check if there is any clock skew and take configured action */ + if (origin_timestamp) + manage_clock_skew(origin_timestamp); + + /* Capture the timestamp (prepare or commit) of the remote transaction */ + replorigin_session_origin_timestamp = origin_timestamp; + /* * Read the entries one by one and pass them through the same logic as in * apply_dispatch. @@ -2160,7 +2274,8 @@ apply_handle_stream_commit(StringInfo s) * spooled operations. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn, + commit_data.committime); apply_handle_commit_internal(&commit_data); @@ -4694,6 +4809,7 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + char *replorigin_sysid; slotname = MySubscription->slotname; @@ -4733,10 +4849,12 @@ run_apply_worker() errmsg("could not connect to the publisher: %s", err))); /* - * We don't really use the output identify_system for anything but it does - * some initializations on the upstream so let's still call it. + * Call identify_system to do some initializations on the upstream and + * store the output as system identifier of the replication origin node. */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + replorigin_sysid = walrcv_identify_system(LogRepWorkerWalRcvConn, + &startpointTLI); + replorigin_session_origin_sysid = strtoul(replorigin_sysid, NULL, 10); set_apply_error_context_origin(originname); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index db37beeaae..a51f82169e 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -59,6 +59,7 @@ CHECKPOINTER_MAIN "Waiting in main loop of checkpointer process." LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." +LOGICAL_CLOCK_SKEW "Waiting in apply-begin of logical replication apply process to bring clock skew in permissible range." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 630ed0f162..f6911ac4a9 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" @@ -492,6 +493,7 @@ extern const struct config_enum_entry archive_mode_options[]; extern const struct config_enum_entry recovery_target_action_options[]; extern const struct config_enum_entry wal_sync_method_options[]; extern const struct config_enum_entry dynamic_shared_memory_options[]; +extern const struct config_enum_entry logical_rep_clock_skew_action_options[]; /* * GUC option variables that are exported from this module @@ -3649,6 +3651,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew", PGC_SIGHUP, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets maximum clock skew tolerance between logical " + "replication nodes beyond which action configured " + "in max_logical_rep_clock_skew_action is triggered."), + NULL, + GUC_UNIT_S + }, + &max_logical_rep_clock_skew, + LR_CLOCK_SKEW_DEFAULT, LR_CLOCK_SKEW_DEFAULT, INT_MAX, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL @@ -4915,6 +4930,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"max_logical_rep_clock_skew_action", PGC_POSTMASTER, REPLICATION_SUBSCRIBERS, + gettext_noop("Sets the action to perform if a clock skew higher " + "than max_logical_rep_clock_skew is detected."), + NULL + }, + &max_logical_rep_clock_skew_action, + LR_CLOCK_SKEW_ACTION_ERROR, logical_rep_clock_skew_action_options, + NULL, NULL, NULL + }, + { {"track_functions", PGC_SUSET, STATS_CUMULATIVE, gettext_noop("Collects function-level statistics on database activity."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9ec9f97e92..3a7fd70506 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -383,7 +383,11 @@ # (change requires restart) #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers #max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers - +#max_logical_rep_clock_skew = -1 # maximum clock skew tolerance between logical + # replication nodes beyond which action configured in + # 'max_logical_rep_clock_skew_action' is triggered. +#max_logical_rep_clock_skew_action = error # error or wait + # (change requires restart) #------------------------------------------------------------------------------ # QUERY TUNING diff --git a/src/include/catalog/pg_conflict.dat b/src/include/catalog/pg_conflict.dat index 8d6183e3f2..1905151840 100644 --- a/src/include/catalog/pg_conflict.dat +++ b/src/include/catalog/pg_conflict.dat @@ -12,10 +12,10 @@ [ -{ conftype => 'insert_exists', confres => 'remote_apply' }, -{ conftype => 'update_differ', confres => 'remote_apply' }, +{ conftype => 'insert_exists', confres => 'last_update_wins' }, +{ conftype => 'update_differ', confres => 'last_update_wins' }, { conftype => 'update_missing', confres => 'apply_or_skip' }, { conftype => 'delete_missing', confres => 'skip' }, -{ conftype => 'delete_differ', confres => 'remote_apply' } +{ conftype => 'delete_differ', confres => 'last_update_wins' } ] diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index dcc89f222f..d38a22ddde 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -57,6 +57,9 @@ typedef enum ConflictResolver /* Keep the local change */ CR_KEEP_LOCAL, + /* Apply the change with latest timestamp */ + CR_LAST_UPDATE_WINS, + /* Apply the remote change; skip if it can not be applied */ CR_APPLY_OR_SKIP, diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..2b922f9c62 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -14,7 +14,24 @@ #include +/* + * The default for max_logical_rep_clock_skew is -1, which means ignore clock + * skew (the check is turned off). + */ +#define LR_CLOCK_SKEW_DEFAULT -1 + +/* + * Worker Clock Skew Action. + */ +typedef enum +{ + LR_CLOCK_SKEW_ACTION_ERROR, + LR_CLOCK_SKEW_ACTION_WAIT, +} LogicalRepClockSkewAction; + extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; +extern PGDLLIMPORT int max_logical_rep_clock_skew; +extern PGDLLIMPORT int max_logical_rep_clock_skew_action; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 7189ba9e76..dcbbbdf6ea 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -36,6 +36,7 @@ typedef struct xl_replorigin_drop extern PGDLLIMPORT RepOriginId replorigin_session_origin; extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +extern PGDLLIMPORT uint64 replorigin_session_origin_sysid; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..dc9e067fac 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -267,7 +267,7 @@ extern void stream_stop_internal(TransactionId xid); /* Common streaming function to apply all the spooled messages */ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, - XLogRecPtr lsn); + XLogRecPtr lsn, TimestampTz origin_timestamp); extern void apply_dispatch(StringInfo s); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index a6ce03ed46..53b828d89d 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -84,6 +84,7 @@ IntervalPGetDatum(const Interval *X) /* Macros for doing timestamp arithmetic without assuming timestamp's units */ #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) * (int64) 1000)) #define TimestampTzPlusSeconds(tz,s) ((tz) + ((s) * (int64) 1000000)) +#define TimestampTzMinusSeconds(tz,s) ((tz) - ((s) * (int64) 1000000)) /* Set at postmaster start */ diff --git a/src/test/regress/expected/conflict_resolver.out b/src/test/regress/expected/conflict_resolver.out index 53ecb430fa..c21486dbb4 100644 --- a/src/test/regress/expected/conflict_resolver.out +++ b/src/test/regress/expected/conflict_resolver.out @@ -1,11 +1,11 @@ --check default global resolvers in system catalog select * from pg_conflict order by conftype; - conftype | confres -----------------+--------------- - delete_differ | remote_apply + conftype | confres +----------------+------------------ + delete_differ | last_update_wins delete_missing | skip - insert_exists | remote_apply - update_differ | remote_apply + insert_exists | last_update_wins + update_differ | last_update_wins update_missing | apply_or_skip (5 rows) @@ -43,11 +43,11 @@ RESET CONFLICT RESOLVER for 'delete_missing'; RESET CONFLICT RESOLVER for 'insert_exists'; --check resolvers are reset to default for delete_missing and insert_exists select * from pg_conflict order by conftype; - conftype | confres -----------------+---------------- + conftype | confres +----------------+------------------ delete_differ | keep_local delete_missing | skip - insert_exists | remote_apply + insert_exists | last_update_wins update_differ | keep_local update_missing | apply_or_error (5 rows) diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a8b0086dd9..47c4dccc3e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -433,6 +433,8 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB ERROR: detect_conflict requires a Boolean value -- now it works CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, detect_conflict = true); +WARNING: detect_conflict is enabled but "track_commit_timestamp" is OFF, the last_update_wins resolution may not work +HINT: Enable "track_commit_timestamp". WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ diff --git a/src/test/subscription/t/034_conflict_resolver.pl b/src/test/subscription/t/034_conflict_resolver.pl index ae8030aac6..6f1479dff7 100755 --- a/src/test/subscription/t/034_conflict_resolver.pl +++ b/src/test/subscription/t/034_conflict_resolver.pl @@ -51,10 +51,10 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT conftype, confres FROM pg_conflict ORDER BY conftype" ); -is( $result, qq(delete_differ|remote_apply +is( $result, qq(delete_differ|last_update_wins delete_missing|skip -insert_exists|remote_apply -update_differ|remote_apply +insert_exists|last_update_wins +update_differ|last_update_wins update_missing|apply_or_skip), "confirm that the default conflict resolvers are in place" ); @@ -63,6 +63,11 @@ update_missing|apply_or_skip), # Test 'remote_apply' for 'insert_exists' ############################################ +# Change CONFLICT RESOLVER of insert_exists to remote_apply +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'remote_apply' for 'insert_exists';" +); + # Create local data on the subscriber $node_subscriber->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'fromsub')"); @@ -202,6 +207,11 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); # Test 'remote_apply' for 'update_differ' ######################################### +# Change CONFLICT RESOLVER of update_differ to remote_apply +$node_subscriber->safe_psql('postgres', + "SET CONFLICT RESOLVER 'remote_apply' for 'update_differ';" +); + # Insert data in the publisher $node_publisher->safe_psql('postgres', "INSERT INTO conf_tab(a, data) VALUES (1,'frompub'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c920a59525..13cc7a33fb 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1566,6 +1566,7 @@ LogicalOutputPluginWriterPrepareWrite LogicalOutputPluginWriterUpdateProgress LogicalOutputPluginWriterWrite LogicalRepBeginData +LogicalRepClockSkewAction LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct -- 2.34.1