From 2d119a5d5210ad3665e77646cff96e51e4c1e956 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 16 Feb 2023 07:52:23 +0000 Subject: [PATCH v2] Time-delayed logical replication on publisher side Similar to physical replication, a time-delayed copy of the data for logical replication is useful for some scenarios (particularly to fix errors that might cause data loss). This patch implements a new subscription parameter called 'min_send_delay'. If the subscription sets min_send_delay parameter, an apply worker passes the value to the publisher as an output plugin option. And then, the walsender will delay the transaction sending for given milliseconds. The delay does not take into account the overhead of time spent in transferring the transaction, which means that the arrival time at the subscriber may be delayed more than the given time. The combination of parallel streaming mode and min_send_delay is not allowed. This is because in parallel streaming mode, we start applying the transaction stream as soon as the first change arrives without knowing the transaction's prepare/commit time. Always waiting for the full 'min_send_delay' period might include unnecessary delay. The other possibility was to apply the delay at the end of the parallel apply transaction but that would cause issues related to resource bloat and locks being held for a long time. The delay occurs before we start to send the transaction on the publisher. Regular and prepared transactions are covered. Streamed transactions are also covered. Author: Euler Taveira, Takamichi Osumi, Kuroda Hayato Reviewed-by: Amit Kapila, Peter Smith, Vignesh C, Shveta Malik, Kyotaro Horiguchi, Shi Yu, Wang Wei, Dilip Kumar, Melih Mutlu, Andres Freund --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/glossary.sgml | 15 ++ doc/src/sgml/logical-replication.sgml | 6 + doc/src/sgml/monitoring.sgml | 5 + doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 39 +++- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 7 +- src/backend/commands/subscriptioncmds.c | 122 +++++++++++- .../libpqwalreceiver/libpqwalreceiver.c | 5 + src/backend/replication/logical/decode.c | 4 + src/backend/replication/logical/logical.c | 18 +- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/logical/worker.c | 12 +- src/backend/replication/pgoutput/pgoutput.c | 36 ++++ src/backend/replication/slotfuncs.c | 4 +- src/backend/replication/walsender.c | 77 +++++++- src/backend/utils/activity/wait_event.c | 3 + src/bin/pg_dump/pg_dump.c | 15 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 9 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/logical.h | 13 +- src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/include/utils/wait_event.h | 3 +- src/test/regress/expected/subscription.out | 174 ++++++++++-------- src/test/regress/sql/subscription.sql | 18 ++ src/test/subscription/t/001_rep_changes.pl | 28 +++ 30 files changed, 536 insertions(+), 104 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c1e4048054..3c013f976a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7873,6 +7873,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subminsenddelay int4 + + + The minimum delay for publisher sends data, in milliseconds + + + subname name diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml index 7c01a541fe..9ede9d05f6 100644 --- a/doc/src/sgml/glossary.sgml +++ b/doc/src/sgml/glossary.sgml @@ -1729,6 +1729,21 @@ + + Time-delayed replication + + + Replication setup that delays the application of changes by a specified + minimum time-delay period. + + + For more information, see + for physical replication + and for logical replication. + + + + TOAST diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 1bd5660c87..8bca0b3800 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -247,6 +247,12 @@ target table. + + A publication can delay sending changes to the subscription by specifying + the min_send_delay subscription parameter. See + for details. + + Replication Slot Management diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index b0b997f092..e75525c5b8 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2349,6 +2349,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting to acquire an exclusive lock to truncate off any empty pages at the end of a table vacuumed. + + WalSenderSendDelay + Waiting for sending changes to subscriber in WAL sender + process. + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 964fcbb8ff..3f238b958b 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -213,8 +213,9 @@ ALTER SUBSCRIPTION name RENAME TO < are slot_name, synchronous_commit, binary, streaming, - disable_on_error, and - origin. + disable_on_error, + origin, and + min_send_delay. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 51c45f17c7..4a665c8d07 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -349,7 +349,39 @@ CREATE SUBSCRIPTION subscription_name - + + + min_send_delay (integer) + + + By default, the publisher sends changes as soon as possible. This + parameter allows the user to delay the publisher to send changes by + given time period. If the value is specified without units, it is + taken as milliseconds. The default is zero (no delay). See + for details on the + available valid time units. + + + Any delay becomes effective only after all initial table + synchronization has finished and occurs before each transaction + starts to get applied on the subscriber. The delay does not take into + account the overhead of time spent in transferring the transaction, + which means that the arrival time at the subscriber may be delayed + more than the given time. + + + + Delaying the replication means there is a much longer time between + making a change on the publisher, and that change being committed + on the subscriber. This can impact the performance of synchronous + replication. See + parameter. + + + + + + @@ -420,6 +452,11 @@ CREATE SUBSCRIPTION subscription_name + + A non-zero min_send_delay parameter is not allowed when + streaming in parallel mode. + + We allow non-existent publications to be specified so that users can add those later. This means diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a56ae311c3..63a10b06d1 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->skiplsn = subform->subskiplsn; sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; + sub->minsenddelay = subform->subminsenddelay; sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 34ca0e739f..6b7b741a1e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1314,9 +1314,10 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) +GRANT SELECT (oid, subdbid, subskiplsn, subminsenddelay, subname, subowner, + subenabled, subbinary, substream, subtwophasestate, + subdisableonerr, subslotname, subsynccommit, subpublications, + suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 464db6d247..fca23ae7e1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -66,6 +66,7 @@ #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 #define SUBOPT_ORIGIN 0x00001000 +#define SUBOPT_MIN_SEND_DELAY 0x00002000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -90,6 +91,7 @@ typedef struct SubOpts bool disableonerr; char *origin; XLogRecPtr lsn; + int32 min_send_delay; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -100,7 +102,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); - +static int32 defGetMinSendDelay(DefElem *def); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->disableonerr = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY)) + opts->min_send_delay = 0; /* Parse options */ foreach(lc, stmt_options) @@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) && + strcmp(defel->defname, "min_send_delay") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MIN_SEND_DELAY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MIN_SEND_DELAY; + opts->min_send_delay = defGetMinSendDelay(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -404,6 +417,32 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * The combination of parallel streaming mode and min_send_delay is not + * allowed. This is because in parallel streaming mode, we start applying + * the transaction stream as soon as the first change arrives without + * knowing the transaction's prepare/commit time. Always waiting for the + * full 'min_send_delay' period might include unnecessary delay. + * + * The other possibility was to apply the delay at the end of the parallel + * apply transaction but that would cause issues related to resource bloat + * and locks being held for a long time. + */ + if (IsSet(supported_opts, SUBOPT_MIN_SEND_DELAY) && + opts->min_send_delay > 0 && + opts->streaming == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + + /* + * translator: the first %s is a string of the form "parameter > 0" + * and the second one is "option = value". + */ + errmsg("%s and %s are mutually exclusive options", + "min_send_delay > 0", "streaming = parallel")); + + } /* @@ -560,7 +599,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN | + SUBOPT_MIN_SEND_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -625,6 +665,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId); values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + values[Anum_pg_subscription_subminsenddelay - 1] = Int32GetDatum(opts.min_send_delay); values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); @@ -1054,7 +1095,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | SUBOPT_MIN_SEND_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1098,6 +1139,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { + /* + * The combination of parallel streaming mode and + * min_send_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.streaming == LOGICALREP_STREAM_PARALLEL && + !IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY) && + sub->minsenddelay > 0) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set parallel streaming mode for subscription with %s", + "min_send_delay")); + values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -1111,6 +1165,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_MIN_SEND_DELAY)) + { + /* + * The combination of parallel streaming mode and + * min_send_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.min_send_delay > 0 && + !IsSet(opts.specified_opts, SUBOPT_STREAMING) && + sub->stream == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for subscription in parallel streaming mode", + "min_send_delay")); + + values[Anum_pg_subscription_subminsenddelay - 1] = + Int32GetDatum(opts.min_send_delay); + replaces[Anum_pg_subscription_subminsenddelay - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -2195,3 +2269,45 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Extract the min_send_delay value from a DefElem. This is very similar to + * parse_and_validate_value() for integer values, because min_send_delay + * accepts the same parameter format as recovery_min_apply_delay. + */ +static int32 +defGetMinSendDelay(DefElem *def) +{ + char *input_string; + int result; + const char *hintmsg; + + input_string = defGetString(def); + + /* + * Parse given string as parameter which has millisecond unit + */ + if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"%s\": \"%s\"", + "min_send_delay", input_string), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + + /* + * Check both the lower boundary for the valid min_send_delay range and + * the upper boundary as the safeguard for some platforms where INT_MAX is + * wider than int32 respectively. Although parse_int() has confirmed that + * the result is less than or equal to INT_MAX, the value will be stored + * in a catalog column of int32. + */ + if (result < 0 || result > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)", + result, + "min_send_delay", + 0, PG_INT32_MAX))); + + return result; +} diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 560ec974fa..89a72c1abe 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -443,6 +443,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", binary 'true'"); + if (options->proto.logical.min_send_delay > 0 && + PQserverVersion(conn->streamConn) >= 160000) + appendStringInfo(&cmd, ", min_send_delay '%d'", + options->proto.logical.min_send_delay); + appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index a53e23c679..80415baec4 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -666,6 +666,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* Delay given time if the context has 'delay' callback */ + if (ctx->delay) + ctx->delay(ctx, commit_time); + /* * Send the final commit record if the transaction data is already * decoded, otherwise, process the entire transaction. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c3ec97a0a6..ac1f9f92f7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -156,7 +156,8 @@ StartupDecodingContext(List *output_plugin_options, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + LogicalOutputPluginWriterDelay delay) { ReplicationSlot *slot; MemoryContext context, @@ -293,6 +294,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->prepare_write = prepare_write; ctx->write = do_write; ctx->update_progress = update_progress; + ctx->delay = delay; ctx->output_plugin_options = output_plugin_options; @@ -316,7 +318,7 @@ StartupDecodingContext(List *output_plugin_options, * marking WAL reserved beforehand. In that scenario, it's up to the * caller to guarantee that WAL remains available. * xl_routine -- XLogReaderRoutine for underlying XLogReader - * prepare_write, do_write, update_progress -- + * prepare_write, do_write, update_progress, delay -- * callbacks that perform the use-case dependent, actual, work. * * Needs to be called while in a memory context that's at least as long lived @@ -334,7 +336,8 @@ CreateInitDecodingContext(const char *plugin, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + LogicalOutputPluginWriterDelay delay) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -435,7 +438,7 @@ CreateInitDecodingContext(const char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, xl_routine, prepare_write, do_write, - update_progress); + update_progress, delay); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -475,7 +478,7 @@ CreateInitDecodingContext(const char *plugin, * xl_routine * XLogReaderRoutine used by underlying xlogreader * - * prepare_write, do_write, update_progress + * prepare_write, do_write, update_progress, delay * callbacks that have to be filled to perform the use-case dependent, * actual work. * @@ -493,7 +496,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + LogicalOutputPluginWriterDelay delay) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -547,7 +551,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, fast_forward, xl_routine, prepare_write, - do_write, update_progress); + do_write, update_progress, delay); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..960025197f 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -212,7 +212,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin .segment_open = wal_segment_open, .segment_close = wal_segment_close), LogicalOutputPrepareWrite, - LogicalOutputWrite, NULL); + LogicalOutputWrite, NULL, NULL); /* * After the sanity checks in CreateDecodingContext, make sure the diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cfb2ab6248..afbac3d80e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3898,7 +3898,8 @@ maybe_reread_subscription(void) newsub->stream != MySubscription->stream || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + newsub->minsenddelay != MySubscription->minsenddelay) { if (am_parallel_apply_worker()) ereport(LOG, @@ -4617,9 +4618,18 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.twophase = false; options.proto.logical.origin = pstrdup(MySubscription->origin); + options.proto.logical.min_send_delay = 0; if (!am_tablesync_worker()) { + /* + * Time-delayed logical replication does not support tablesync + * workers, so only the leader apply worker can request walsenders to + * apply delay on the publisher side. + */ + if (server_version >= 160000 && MySubscription->minsenddelay > 0) + options.proto.logical.min_send_delay = MySubscription->minsenddelay; + /* * Even when the two_phase mode is requested by the user, it remains * as the tri-state PENDING until all tablesyncs have reached READY diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 98377c094b..be0095cf52 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -285,6 +285,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool min_send_delay_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; @@ -396,6 +397,32 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", data->origin)); } + else if (strcmp(defel->defname, "min_send_delay") == 0) + { + long parsed; + char *endptr; + + if (min_send_delay_option_given) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options")); + min_send_delay_option_given = true; + + errno = 0; + parsed = strtoul(strVal(defel->arg), &endptr, 10); + if (errno != 0 || *endptr != '\0') + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid min_send_delay"))); + + if (parsed > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("min_send_delay \"%s\" out of range", + strVal(defel->arg)))); + + data->min_send_delay = (int32) parsed; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -501,6 +528,15 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, else ctx->twophase_opt_given = true; + if (data->min_send_delay && + data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support delay sending data, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM))); + else + ctx->min_send_delay = data->min_send_delay; + /* Init publication state. */ data->publications = NIL; publications_valid = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c964824..522f7600a1 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -148,7 +148,7 @@ create_logical_replication_slot(char *name, char *plugin, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, .segment_close = wal_segment_close), - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); /* * If caller needs us to determine the decoding start point, do so now. @@ -481,7 +481,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, .segment_close = wal_segment_close), - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); /* * Start reading at the slot's restart_lsn, which we know to point to diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 75e8363e24..a4f03ddba1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -252,6 +252,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact); +static void WalSndDelay(LogicalDecodingContext *ctx, TimestampTz delay_start); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1126,7 +1127,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) .segment_open = WalSndSegmentOpen, .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + WalSndUpdateProgress, WalSndDelay); /* * Signal that we don't need the timeout mechanism. We're just @@ -1285,7 +1286,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) .segment_open = WalSndSegmentOpen, .segment_close = wal_segment_close), WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + WalSndUpdateProgress, WalSndDelay); xlogreader = logical_decoding_ctx->reader; WalSndSetState(WALSNDSTATE_CATCHUP); @@ -3849,3 +3850,75 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } + +/* + * LogicalDecodingContext 'delay' callback. + * + * Wait long enough to make sure a transaction is applied at least that + * period behind the publisher. + */ +static void +WalSndDelay(LogicalDecodingContext *ctx, TimestampTz delay_start) +{ + /* Wait till delayUntil by the latch mechanism */ + while (true) + { + TimestampTz delayUntil; + long diffms; + long timeout_interval_ms; + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* This might change wal_sender_timeout */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* + * If we've requested to shut down, exit the process. + * + * Note that WalSndDone() cannot be used here because the delaying + * changes will be sent in the function. + */ + if (got_STOPPING) + WalSndShutdown(); + + delayUntil = TimestampTzPlusMilliseconds(delay_start, ctx->min_send_delay); + diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), delayUntil); + + /* + * Exit without arming the latch if it's already past time to send + * this transaction. + */ + if (diffms <= 0) + break; + + /* Sleep until appropriate time. */ + timeout_interval_ms = WalSndComputeSleeptime(GetCurrentTimestamp()); + + elog(DEBUG2, "time-delayed replication for txid %u, delay_time = %d ms, remaining wait time: %ld ms", + ctx->write_xid, (int) ctx->min_send_delay, diffms); + + /* Sleep until we get reply from worker or we time out */ + WalSndWait(WL_SOCKET_READABLE, + Min(timeout_interval_ms, diffms), + WAIT_EVENT_WALSENDER_SEND_DELAY); + } +} diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index cb99cc6339..76c19fe11d 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -515,6 +515,9 @@ pgstat_get_wait_timeout(WaitEventTimeout w) case WAIT_EVENT_VACUUM_TRUNCATE: event_name = "VacuumTruncate"; break; + case WAIT_EVENT_WALSENDER_SEND_DELAY: + event_name = "WalSenderSendDelay"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 527c7651ab..bd95747840 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4494,6 +4494,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_subminsenddelay; int i, ntups; @@ -4546,9 +4547,13 @@ getSubscriptions(Archive *fout) LOGICALREP_TWOPHASE_STATE_DISABLED); if (fout->remoteVersion >= 160000) - appendPQExpBufferStr(query, " s.suborigin\n"); + appendPQExpBufferStr(query, + " s.suborigin,\n" + " s.subminsenddelay\n"); else - appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY); + appendPQExpBuffer(query, " '%s' AS suborigin,\n" + " 0 AS subminsenddelay\n", + LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4576,6 +4581,7 @@ getSubscriptions(Archive *fout) i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); + i_subminsenddelay = PQfnumber(res, "subminsenddelay"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4606,6 +4612,8 @@ getSubscriptions(Archive *fout) subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + subinfo[i].subminsenddelay = + atoi(PQgetvalue(res, i, i_subminsenddelay)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4687,6 +4695,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); + if (subinfo->subminsenddelay > 0) + appendPQExpBuffer(query, ", min_send_delay = '%d ms'", subinfo->subminsenddelay); + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index e7cbd8d7ed..24e0f6737f 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *suborigin; char *subsynccommit; + int subminsenddelay; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index c8a0bb7b3a..c7d303a168 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6527,10 +6527,13 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Two-phase commit"), gettext_noop("Disable on error")); + /* Origin and min_send_delay are only supported in v16 and higher */ if (pset.sversion >= 160000) appendPQExpBuffer(&buf, - ", suborigin AS \"%s\"\n", - gettext_noop("Origin")); + ", suborigin AS \"%s\"\n" + ", subminsenddelay AS \"%s\"\n", + gettext_noop("Origin"), + gettext_noop("Min send delay")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 5e1882eaea..6643db6f55 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name", + COMPLETE_WITH("binary", "disable_on_error", "min_send_delay", "origin", "slot_name", "streaming", "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) @@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "origin", "slot_name", + "disable_on_error", "enabled", "min_send_delay", "origin", "slot_name", "streaming", "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index b0f2a1705d..69ae4314b4 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ + int32 subminsenddelay; /* Replication send delay (ms) */ + bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ @@ -122,6 +124,7 @@ typedef struct Subscription * skipped */ char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ + int32 minsenddelay; /* Replication send delay (ms) */ bool enabled; /* Indicates if the subscription is enabled */ bool binary; /* Indicates if the subscription wants data in * binary format */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea0..603c37b6ce 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -30,6 +30,10 @@ typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingC bool skipped_xact ); +typedef void (*LogicalOutputPluginWriterDelay) (struct LogicalDecodingContext *lr, + TimestampTz start_time +); + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -64,6 +68,7 @@ typedef struct LogicalDecodingContext LogicalOutputPluginWriterPrepareWrite prepare_write; LogicalOutputPluginWriterWrite write; LogicalOutputPluginWriterUpdateProgress update_progress; + LogicalOutputPluginWriterDelay delay; /* * Output buffer. @@ -100,6 +105,8 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + int32 min_send_delay; + /* * State for writing output. */ @@ -121,14 +128,16 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress); + LogicalOutputPluginWriterUpdateProgress update_progress, + LogicalOutputPluginWriterDelay delay); extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress); + LogicalOutputPluginWriterUpdateProgress update_progress, + LogicalOutputPluginWriterDelay delay); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b4a8015403..d2fde09e00 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -30,6 +30,7 @@ typedef struct PGOutputData bool messages; bool two_phase; char *origin; + int32 min_send_delay; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index decffe352d..c20969aed7 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -187,6 +187,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + int32 min_send_delay; /* The minimum send delay */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 9ab23e1c4a..cc3a234eba 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -150,7 +150,8 @@ typedef enum WAIT_EVENT_REGISTER_SYNC_REQUEST, WAIT_EVENT_SPIN_DELAY, WAIT_EVENT_VACUUM_DELAY, - WAIT_EVENT_VACUUM_TRUNCATE + WAIT_EVENT_VACUUM_TRUNCATE, + WAIT_EVENT_WALSENDER_SEND_DELAY } WaitEventTimeout; /* ---------- diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 3f99b14394..b14384e8e7 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -163,10 +163,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | 0 | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -324,10 +324,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -375,10 +375,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -404,18 +404,48 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | 0 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail -- min_send_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo); +ERROR: invalid value for parameter "min_send_delay": "foo" +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1); +ERROR: -1 ms is outside the valid range for parameter "min_send_delay" (0 .. 2147483647) +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123); +ERROR: min_send_delay > 0 and streaming = parallel are mutually exclusive options +-- success -- min_send_delay value without unit is take as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 123 | off | dbname=regress_doesnotexit | 0/0 +(1 row) + +-- success -- min_send_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d'); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Min send delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+----------------+--------------------+----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | 86400000 | off | dbname=regress_doesnotexit | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 7281f5fee2..2fae3b06c7 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -286,6 +286,24 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail -- min_send_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = foo); +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = -1); + +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_send_delay = 123); + +-- success -- min_send_delay value without unit is take as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexit' PUBLICATION testpub WITH (connect = false, min_send_delay = 123); +\dRs+ + +-- success -- min_send_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_send_delay = '1 d'); +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 91aa068c95..8984e14d74 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -515,6 +515,34 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after renaming SUBSCRIPTION"; +# Test time-delayed logical replication +# +# If the subscription sets min_send_delay parameter, the logical replication +# worker will delay the transaction apply for min_send_delay milliseconds. We +# verify this by looking at the time difference between a) when tuples are +# inserted on the publisher, and b) when those changes are replicated on the +# subscriber. Even on slow machines, this strategy will give predictable behavior. + +# Set min_send_delay parameter to 3 seconds +my $delay = 3; +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_renamed SET (min_send_delay = '${delay}s')"); + +# Before doing the insertion, get the current timestamp that will be +# used as a comparison base. +my $publisher_insert_time = time(); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins VALUES (generate_series(1101, 1120))"); + +# The publisher waits for the replication to complete +$node_publisher->wait_for_catchup('tap_sub_renamed'); + +# This test is successful if and only if the LSN has been applied with at least +# the configured apply delay. +ok( time() - $publisher_insert_time >= $delay, + "subscriber applies WAL only after replication delay for non-streaming transaction" +); + # check all the cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); -- 2.27.0