From b5691915daf748daa5b8e57eea2cb344e684cbca Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 19 Apr 2023 09:26:12 +0000 Subject: [PATCH v4] (WIP) Time-delayed logical replication by serializing changes Similar to physical replication, a time-delayed copy of the data for logical replication is useful for some scenarios (particularly to fix errors that might cause data loss). This patch implements a new subscription parameter called 'min_apply_delay'. If the subscription sets min_apply_delay parameter, the logical replication worker will delay the transaction apply for min_apply_delay milliseconds. The delaying is implemented by serializing changes into file. The file is created when the worker receives BEGIN message. The worker writes received changes and flush at COMMIT. The delayed transaction is checked its commit time for every main loop, and applied from the file when the time exceeds the min_apply_delay. The commit time is stored in memory when the transaction is committed, or the worker restarts. The delay is calculated between the WAL time stamp and the current time on the subscriber. The combination of parallel streaming mode and min_apply_delay is not allowed. This is because in parallel streaming mode, we start applying the transaction stream as soon as the first change arrives without knowing the transaction's prepare/commit time. This means we cannot calculate the underlying network/decoding lag between publisher and subscriber, and so always waiting for the full 'min_apply_delay' period might include unnecessary delay. The other possibility was to apply the delay at the end of the parallel apply transaction but that would cause issues related to resource bloat and locks being held for a long time. Currently the combination of skip transaction feature and min_apply_delay does not work well. Earlier versions were written by Euler Taveira, Takamichi Osumi, and Kuroda Hayato Author: Kuroda Hayato, Takamichi Osumi --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/glossary.sgml | 15 + doc/src/sgml/logical-replication.sgml | 7 + doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 47 +- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 6 +- src/backend/commands/subscriptioncmds.c | 123 +- .../libpqwalreceiver/libpqwalreceiver.c | 4 + src/backend/replication/logical/worker.c | 1058 +++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 21 +- src/bin/pg_dump/pg_dump.c | 13 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 13 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 182 +-- src/test/regress/sql/subscription.sql | 27 + src/test/subscription/t/001_rep_changes.pl | 31 + 21 files changed, 1385 insertions(+), 191 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 5240840552..35a7b6a9e8 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7891,6 +7891,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subminapplydelay int4 + + + The minimum delay, in milliseconds, for applying changes + + + subname name diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml index 29bf1873bd..204fe7f3ae 100644 --- a/doc/src/sgml/glossary.sgml +++ b/doc/src/sgml/glossary.sgml @@ -1757,6 +1757,21 @@ + + Time-delayed replication + + + Replication setup that delays the application of changes by a specified + minimum time-delay period. + + + For more information, see + for physical replication + and for logical replication. + + + + TOAST diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index c65f4aabfd..0be4d652aa 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -257,6 +257,13 @@ option of CREATE SUBSCRIPTION for details. + + A subscription can delay the application of changes by specifying the + min_apply_delay + subscription parameter. See for + details. + + Replication Slot Management diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a85e04e4d6..bf6c5fe7f0 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -225,8 +225,9 @@ ALTER SUBSCRIPTION name RENAME TO < streaming, disable_on_error, password_required, - run_as_owner, and - origin. + run_as_owner, + origin, and + min_apply_delay. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 71652fd918..a3ac91f27b 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,7 +399,47 @@ CREATE SUBSCRIPTION subscription_name - + + + min_apply_delay (integer) + + + By default, the subscriber applies changes as soon as possible. This + parameter allows the user to delay the application of changes by a + given time period. This is done by writing all the changes into a + file once and apply contents after spending time. If the value is + specified without units, it is taken as milliseconds. The default + is zero (no delay). See + for details on the available valid time units. + + + Any delay becomes effective only after all initial table + synchronization has finished and occurs before each transaction starts + to get applied on the subscriber. The delay is calculated as the + difference between the WAL timestamp as written on the publisher and + the current time on the subscriber. Any overhead of time spent in + logical decoding and in transferring the transaction may reduce the + actual wait time. Even if the overhead already exceeds the requested + min_apply_delay value, all the changes are written + into file and applied immediately. If the system clocks on publisher + and subscriber are not synchronized, this may lead to apply changes + earlier than expected, but this is not a major issue because this + parameter is typically much larger than the time deviations between + servers. + + + + Delaying the replication means there is a much longer time between + making a change on the publisher, and that change being committed + on the subscriber. This can impact the performance of synchronous + replication. See + parameter. + + + + + + @@ -472,6 +512,11 @@ CREATE SUBSCRIPTION subscription_name + + A non-zero min_apply_delay parameter is not allowed when + streaming in parallel mode. + + We allow non-existent publications to be specified so that users can add those later. This means diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d07f88ce28..56f8fdda10 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->skiplsn = subform->subskiplsn; sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; + sub->minapplydelay = subform->subminapplydelay; sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2129c916aa..aeac1d8064 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1324,9 +1324,9 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, - subpasswordrequired, subrunasowner, +GRANT SELECT (oid, subdbid, subskiplsn, subminapplydelay, subname, subowner, + subenabled, subbinary, substream, subtwophasestate, + subdisableonerr, subpasswordrequired, subrunasowner, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3251d89ba8..aaa2065311 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_MIN_APPLY_DELAY 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool runasowner; char *origin; XLogRecPtr lsn; + int32 min_apply_delay; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -107,7 +109,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); - +static int32 defGetMinApplyDelay(DefElem *def); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY)) + opts->min_apply_delay = 0; /* Parse options */ foreach(lc, stmt_options) @@ -353,6 +357,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + strcmp(defel->defname, "min_apply_delay") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY; + opts->min_apply_delay = defGetMinApplyDelay(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -433,6 +446,32 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * The combination of parallel streaming mode and min_apply_delay is not + * allowed. This is because in parallel streaming mode, we start applying + * the transaction stream as soon as the first change arrives without + * knowing the transaction's prepare/commit time. This means we cannot + * calculate the underlying network/decoding lag between publisher and + * subscriber, and so always waiting for the full 'min_apply_delay' period + * might include unnecessary delay. + * + * The other possibility was to apply the delay at the end of the parallel + * apply transaction but that would cause issues related to resource bloat + * and locks being held for a long time. + */ + if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + opts->min_apply_delay > 0 && + opts->streaming == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + + /* + * translator: the first %s is a string of the form "parameter > 0" + * and the second one is "option = value". + */ + errmsg("%s and %s are mutually exclusive options", + "min_apply_delay > 0", "streaming = parallel")); } /* @@ -591,7 +630,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_MIN_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -682,6 +722,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId); values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + values[Anum_pg_subscription_subminapplydelay - 1] = Int32GetDatum(opts.min_apply_delay); values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); @@ -1130,7 +1171,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_MIN_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1174,6 +1216,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { + /* + * The combination of parallel streaming mode and + * min_apply_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.streaming == LOGICALREP_STREAM_PARALLEL && + !IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) + && sub->minapplydelay > 0) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set parallel streaming mode for subscription with %s", + "min_apply_delay")); + values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -1202,6 +1257,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) + { + /* + * The combination of parallel streaming mode and + * min_apply_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.min_apply_delay > 0 && + !IsSet(opts.specified_opts, SUBOPT_STREAMING) + && sub->stream == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for subscription in parallel streaming mode", + "min_apply_delay")); + + values[Anum_pg_subscription_subminapplydelay - 1] = + Int32GetDatum(opts.min_apply_delay); + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -2343,3 +2418,45 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Extract the min_apply_delay value from a DefElem. This is very similar to + * parse_and_validate_value() for integer values, because min_apply_delay + * accepts the same parameter format as recovery_min_apply_delay. + */ +static int32 +defGetMinApplyDelay(DefElem *def) +{ + char *input_string; + int result; + const char *hintmsg; + + input_string = defGetString(def); + + /* + * Parse given string as parameter which has millisecond unit + */ + if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"%s\": \"%s\"", + "min_apply_delay", input_string), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + + /* + * Check both the lower boundary for the valid min_apply_delay range and + * the upper boundary as the safeguard for some platforms where INT_MAX is + * wider than int32 respectively. Although parse_int() has confirmed that + * the result is less than or equal to INT_MAX, the value will be stored + * in a catalog column of int32. + */ + if (result < 0 || result > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)", + result, + "min_apply_delay", + 0, PG_INT32_MAX))); + + return result; +} diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 052505e46f..0fb073c2c1 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -470,6 +470,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.require_schema && + PQserverVersion(conn->streamConn) >= 160000) + appendStringInfo(&cmd, ", require_schema 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3d58910c14..f7ee87d188 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -153,6 +153,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "common/file_utils.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -370,6 +371,59 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* XXX macros for time-delayed logical replicaiton */ + +/* DELAYED_DIR stores files that contains changes of delayed transactions. */ +#define DELAYED_DIR "pg_logical/delayed_txns" + +/* + * The filename consists of the following, dash separated, components: + * 1) subscription oid + * 2) xid of delayed transaction on publisher + * 3) status of the delaying transaction + * 4) upper 32bit of the commit_lsn + * 5) lower 32bit of the commit_lsn + * 6) upper 32bit of the end_lsn + * 7) lower 32bit of the end_lsn + * 8) committime + */ +#define DELAYED_FORMAT "delayed-%x-%x-%c-%X-%X-%X-%X-" INT64_FORMAT +#define DELAYED_TXN_COMMITTED 'c' +#define DELAYED_TXN_PREPARED 'p' +#define DELAYED_TXN_UNKNOWN 'u' + +/* List entry to map xid and commit time */ +typedef struct DelayedTxnListEntry +{ + TransactionId xid; + LogicalRepCommitData commit_data; +} DelayedTxnListEntry; + +/* + * An entry is appended when the we receives commit message and time-delayed + * logical replication is requested. The entry will be deleted after contents + * are applied. + */ +static List *DelayedTxnList = NIL; + +/* fields valid only when time-delayed logical replication is requested */ +static bool in_delayed_transaction = false; + +static TransactionId delayed_xid = InvalidTransactionId; + +/* + * Store flushed lsn for time-delayed logical replication. This is used when + * we send a feedback message to the publisher. + */ +static XLogRecPtr last_flushed = InvalidXLogRecPtr; + +/* + * FIXME: global file descriptor may be not sufficient. There is a possibility + * that non-streaming transactions are come concurrently. At that time + * create_delay_file() for the second transaction will be failed... + */ +static int delayed_fd = -1; + static inline void subxact_filename(char *path, Oid subid, TransactionId xid); static inline void changes_filename(char *path, Oid subid, TransactionId xid); @@ -432,6 +486,534 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void begin_replication_step(void); +static void end_replication_step(void); + +/* Functions for time-delayed logical replicaiton */ +static void cache_commit_data(LogicalRepCommitData *commit_data, TransactionId xid); +static void flush_delayed_changes(LogicalRepCommitData *commit_data); +static void delay_file_name(char *path, Oid subid, TransactionId xid, + char status, XLogRecPtr commit_lsn, + XLogRecPtr end_lsn, TimestampTz committime); +static bool is_given_transaction_delayed(Oid subid, TransactionId xid); +static void create_delay_file(TransactionId xid); +static bool handle_delayed_transaction(char action, StringInfo s); + +/* + * Cache commit_data into the list + */ +static void +cache_commit_data(LogicalRepCommitData *commit_data, TransactionId xid) +{ + MemoryContext old; + DelayedTxnListEntry *entry; + + old = MemoryContextSwitchTo(ApplyContext); + + entry = palloc0(sizeof(DelayedTxnListEntry)); + + /* Contruct an entry and append it */ + entry->xid = xid; + memcpy(&entry->commit_data, commit_data, sizeof(LogicalRepCommitData)); + DelayedTxnList = lappend(DelayedTxnList, entry); + + MemoryContextSwitchTo(old); + + elog(DEBUG1, "transaction %u is cached", xid); + +} + +/* + * Flush given changes, rename and close the file. This will be called at the + * end of the transaction. + */ +static void +flush_delayed_changes(LogicalRepCommitData *commit_data) +{ + char old_path[MAXPGPATH]; + char new_path[MAXPGPATH]; + + Assert(delayed_fd > 0); + Assert(TransactionIdIsValid(delayed_xid)); + + /* Cache given commit_data into the list */ + cache_commit_data(commit_data, delayed_xid); + + /* + * Close file. No need to flush here because it will be done in + * durable_rename(). + */ + close(delayed_fd); + + /* Construct old/new filename */ + delay_file_name(old_path, MyLogicalRepWorker->subid, delayed_xid, + DELAYED_TXN_UNKNOWN, InvalidXLogRecPtr, InvalidXLogRecPtr, + 0); + delay_file_name(new_path, MyLogicalRepWorker->subid, delayed_xid, + DELAYED_TXN_COMMITTED, commit_data->commit_lsn, + commit_data->end_lsn, commit_data->committime); + + /* And do actual rename */ + if (durable_rename(old_path, new_path, PANIC)) + abort(); + + /* Store flushed lsn */ + last_flushed = commit_data->end_lsn; + + /* Cleanup */ + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; +} + +/* + * Get formal filename from needed information + */ +static void +delay_file_name(char *path, Oid subid, TransactionId xid, char status, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz committime) +{ + snprintf(path, MAXPGPATH, DELAYED_DIR "/" DELAYED_FORMAT, subid, xid, + status, LSN_FORMAT_ARGS(commit_lsn), LSN_FORMAT_ARGS(end_lsn), + committime); +} + +/* + * Check whether the given transaction is delayed. This is done by checking the + * delay file. + */ +static bool +is_given_transaction_delayed(Oid subid, TransactionId xid) +{ + struct stat st; + char path[MAXPGPATH]; + + delay_file_name(path, subid, xid, DELAYED_TXN_PREPARED, InvalidXLogRecPtr, + InvalidXLogRecPtr, 0); + + return stat(path, &st) == 0; +} + +/* + * Apply the delayed transaction. In the function a delayed file is opened and + * read. Apply worker applies written changes. + */ +static void +apply_delayed_transaction(TransactionId xid, LogicalRepCommitData *commit_data) +{ + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + MemoryContext oldcxt; + ResourceOwner oldowner; + + /* Make sure we have an open transaction */ + begin_replication_step(); + + /* + * Allocate file handle and memory required to process all the messages in + * TopTransactionContext to avoid them getting reset after each message is + * processed. + */ + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* Open the spool file for the committed transaction */ + delay_file_name(path, MyLogicalRepWorker->subid, xid, + DELAYED_TXN_COMMITTED, commit_data->commit_lsn, + commit_data->end_lsn, commit_data->committime); + elog(DEBUG1, "replaying changes from file \"%s\"", path); + + /* + * Make sure the file is owned by the toplevel transaction so that the + * file will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + /* Open the specified file */ + delayed_fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + Assert(delayed_fd > 0); + + CurrentResourceOwner = oldowner; + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = commit_data->end_lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + end_replication_step(); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + size_t nbytes; + int len; + + CHECK_FOR_INTERRUPTS(); + + /* read length of the on-disk record */ + nbytes = read(delayed_fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (len <= 0) + elog(ERROR, "incorrect length %d in delaed transaction's changes file \"%s\"", + len, path); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + read(delayed_fd, buffer, len); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file \"%s\"", + nchanges, path); + } + + close(delayed_fd); + delayed_fd = -1; + durable_unlink(path, LOG); + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + return; +} + +/* + * Create a file that will be written changes. + */ +static void +create_delay_file(TransactionId xid) +{ + char path[MAXPGPATH]; + int fd; + + Assert(TransactionIdIsValid(xid)); + Assert(delayed_fd < 0); + + /* + * Construct filename. Other information like commit_lsn will be filled + * when it will be committed. + */ + delay_file_name(path, MyLogicalRepWorker->subid, xid, DELAYED_TXN_UNKNOWN, + InvalidXLogRecPtr, InvalidXLogRecPtr, 0); + + elog(DEBUG1, "creating a file \"%s\" for time-delayed logical replication", + path); + + fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL | O_APPEND | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + path)); + + delayed_fd = fd; +} + +/* + * Create a directory that holds delayed files + */ +static void +initialize_delay_directory(void) +{ + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, DELAYED_DIR); + if (MakePGDirectory(path) < 0 && errno != EEXIST) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + path)); + + START_CRIT_SECTION(); + fsync_fname(path, true); + END_CRIT_SECTION(); +} + +/* + * Transform information from commit_prepared style to commit style. + */ +static void +ConstructCommitFromCommitPrepared(LogicalRepCommitData *commit, + LogicalRepCommitPreparedTxnData *prepare_data) +{ + commit->commit_lsn = prepare_data->commit_lsn; + commit->committime = prepare_data->commit_time; + commit->end_lsn = prepare_data->end_lsn; +} + +/* + * Restore the delayed transaction from given information. + * + * This return false only when the status is unknown, which measn that the + * worker was shutted down before receiving the COMMIT/PREPARE/COMMIT PREPARED + * message. In this case we must receive whole the messages and write them into + * file again. + */ +static bool +RestoreDelayedTxn(char status, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz committime, TransactionId xid) + +{ + switch (status) + { + case DELAYED_TXN_UNKNOWN: + return false; + + case DELAYED_TXN_COMMITTED: + { + LogicalRepCommitData commit_data = { + .commit_lsn = commit_lsn, + .committime = committime, + .end_lsn = end_lsn, + }; + cache_commit_data(&commit_data, xid); + break; + } + + case DELAYED_TXN_PREPARED: + /* Do nothing */ + break; + + default: + Assert(false); + return false; /* Keep compiler quiet */ + } + + /* Update last_flushed to avoid to recevie same transaction again */ + last_flushed = end_lsn; + + return true; +} + +/* + * list_sort() comparator for sorting DelayedTxnList in commitime order. + */ +static int +file_sort_by_committime(const ListCell *a_p, const ListCell *b_p) +{ + DelayedTxnListEntry *a = (DelayedTxnListEntry *) lfirst(a_p); + DelayedTxnListEntry *b = (DelayedTxnListEntry *) lfirst(b_p); + + if (a->commit_data.committime < b->commit_data.committime) + return -1; + else if (a->commit_data.committime > b->commit_data.committime) + return 1; + return 0; +} + + +/* + * Restore all the delayed transactions to memory. + */ +static void +RestoreDelayedTxns(void) +{ + DIR *delayed_dir; + struct dirent *delayed_de; + + /* Read all the file step-by-step */ + delayed_dir = AllocateDir(DELAYED_DIR); + while ((delayed_de = ReadDir(delayed_dir, DELAYED_DIR)) != NULL) + { + Oid subid = InvalidOid; + TransactionId xid = InvalidTransactionId; + char status = 0; + XLogRecPtr commit_lsn = InvalidXLogRecPtr, + end_lsn = InvalidXLogRecPtr; + TimestampTz committime = 0; + uint32 commit_hi = 0, + commit_lo = 0, + end_hi = 0, + end_lo = 0; + + if (strcmp(delayed_de->d_name, ".") == 0 || + strcmp(delayed_de->d_name, "..") == 0) + continue; + + /* Ignore files that aren't ours */ + if (strncmp(delayed_de->d_name, "delayed-", 8) != 0) + continue; + + /* Parse filename */ + if (sscanf(delayed_de->d_name, DELAYED_FORMAT, &subid, &xid, &status, &commit_hi, + &commit_lo, &end_hi, &end_lo, &committime) != 8) + elog(ERROR, "could not parse filename \"%s\"", delayed_de->d_name); + + /* Skip if the file has been generated by other subscriptions */ + if (MyLogicalRepWorker->subid != subid) + continue; + + elog(DEBUG1, "start to restore from %s", delayed_de->d_name); + + commit_lsn = ((uint64) commit_hi) << 32 | commit_lo; + end_lsn = ((uint64) end_hi) << 32 | end_lo; + + /* + * Do actual restore here. If the server was shutted down while + * receiving transactions, the status is UNKNOWN and + * RestoreDelayedTxn() returns false. At that time we must remove the + * file once and receive changes again. + */ + if (!RestoreDelayedTxn(status, commit_lsn, end_lsn, committime, xid)) + { + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, DELAYED_DIR "/%s", delayed_de->d_name); + durable_unlink(path, LOG); + } + } + FreeDir(delayed_dir); + + list_sort(DelayedTxnList, file_sort_by_committime); +} + +/* + * Restore delayed transactions, or initialize the directory + */ +static void +InitializeDelayedTxn(void) +{ + struct stat st; + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, DELAYED_DIR); + + /* + * If the given directory does not exist, create one. Otherwise start to + * restore. + */ + if (stat(path, &st) != 0) + { + initialize_delay_directory(); + return; + } + + RestoreDelayedTxns(); +} + +/* + * Write a given message to a file. This is called for every message. + * This returns true only when changes are written into file. + * + * The format of the serialized changes is same as the streamed one. This + * has a length (not including the length), action code (identifying the + * message type) and message contents (without the subxact TransactionId + * value). + */ +static bool +handle_delayed_transaction(char action, StringInfo s) +{ + int len; + + /* Return if we are not in delay */ + if (!in_delayed_transaction) + return false; + + Assert(delayed_fd > 0); + Assert(TransactionIdIsValid(delayed_xid)); + + len = (s->len - s->cursor) + sizeof(char); + + if (write(delayed_fd, &len, sizeof(len)) != sizeof(len)) + abort(); + if (write(delayed_fd, &action, sizeof(action)) != sizeof(action)) + abort(); + + len = (s->len - s->cursor); + + if (write(delayed_fd, &s->data[s->cursor], len) != len) + abort(); + + return true; +} + +/* + * Check the delayed transactions and apply if we elapsed sufficient time + */ +static void +check_delayed_transaction(void) +{ + TimestampTz now; + ListCell *lc; + int n = 0; + + if (in_streamed_transaction) + return; + + now = GetCurrentTimestamp(); + + /* Read cache on-by-one */ + foreach(lc, DelayedTxnList) + { + DelayedTxnListEntry *entry = (DelayedTxnListEntry *) lfirst(lc); + LogicalRepCommitData *commit_data = &entry->commit_data; + TimestampTz delayUntil; + long diffms; + + delayUntil = TimestampTzPlusMilliseconds(commit_data->committime, + MySubscription->minapplydelay); + + diffms = TimestampDifferenceMilliseconds(now, delayUntil); + + /* + * The cache is aligned the commit ordering, so we do not have to check + * latter entries if we find transactions that should not be applied. + */ + if (diffms > 0) + break; + + elog(DEBUG1, "started to apply transaction %u", entry->xid); + + apply_delayed_transaction(entry->xid, commit_data); + apply_handle_commit_internal(commit_data); + + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + n++; + } + /* Discards applied entries */ + DelayedTxnList = list_delete_first_n(DelayedTxnList, n); +} + /* * Return the name of the logical replication worker. */ @@ -1019,13 +1601,28 @@ apply_handle_begin(StringInfo s) logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); - remote_final_lsn = begin_data.final_lsn; + /* + * Prepare to write changes into file if time-delayed replication is + * requested. + */ + if (MySubscription->minapplydelay && AllTablesyncsReady()) + { + in_delayed_transaction = true; - maybe_start_skipping_changes(begin_data.final_lsn); + create_delay_file(begin_data.xid); - in_remote_transaction = true; + delayed_xid = begin_data.xid; + } + else + { + remote_final_lsn = begin_data.final_lsn; - pgstat_report_activity(STATE_RUNNING, NULL); + maybe_start_skipping_changes(begin_data.final_lsn); + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); + } } /* @@ -1037,20 +1634,40 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + + /* + * If we are applying the delayed transaction, skip here. + * Actual COMMIT will be done outside the apply_delayed_transaction() + */ + if (delayed_fd > 0 && !in_delayed_transaction) + return; logicalrep_read_commit(s, &commit_data); - if (commit_data.commit_lsn != remote_final_lsn) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", - LSN_FORMAT_ARGS(commit_data.commit_lsn), - LSN_FORMAT_ARGS(remote_final_lsn)))); + /* If we are applying, skip here. */ - apply_handle_commit_internal(&commit_data); + if (in_delayed_transaction) + { + /* Write a commit message into file and flush all of messages */ + handle_delayed_transaction(LOGICAL_REP_MSG_COMMIT, &original_msg); + flush_delayed_changes(&commit_data); + } + else + { + if (commit_data.commit_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", + LSN_FORMAT_ARGS(commit_data.commit_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + apply_handle_commit_internal(&commit_data); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + } pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1076,13 +1693,28 @@ apply_handle_begin_prepare(StringInfo s) logicalrep_read_begin_prepare(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); - remote_final_lsn = begin_data.prepare_lsn; + /* + * Prepare to write changes into file if time-delayed replication is + * requested. + */ + if (MySubscription->minapplydelay && AllTablesyncsReady()) + { + in_delayed_transaction = true; - maybe_start_skipping_changes(begin_data.prepare_lsn); + create_delay_file(begin_data.xid); - in_remote_transaction = true; + delayed_xid = begin_data.xid; + } + else + { + remote_final_lsn = begin_data.prepare_lsn; - pgstat_report_activity(STATE_RUNNING, NULL); + maybe_start_skipping_changes(begin_data.prepare_lsn); + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); + } } /* @@ -1124,57 +1756,102 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) /* * Handle PREPARE message. + * + * When time-delayed logical replication is requested, we just write a message + * into file and return. This means that no transaction is prepared on + * subscriber. This can avoid that the apply worker acquires locks for a long + * time due to the long min_apply_time. + * + * Even if the transaction is applied from delayed file, the transaction is not + * prepared. We just skip PREPARE message. */ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; - logicalrep_read_prepare(s, &prepare_data); - - if (prepare_data.prepare_lsn != remote_final_lsn) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", - LSN_FORMAT_ARGS(prepare_data.prepare_lsn), - LSN_FORMAT_ARGS(remote_final_lsn)))); - /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction or all changes are skipped. It - * is done this way because at commit prepared time, we won't know whether - * we have skipped preparing a transaction because of those reasons. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. + * If we are writing changes into delayed file, construct a modified + * message and write it. This is needed for avoiding to write gid into + * file. More detail, see atop ReadPreparedCommonRecord(). */ - begin_replication_step(); + if (in_delayed_transaction) + { + char old_path[MAXPGPATH]; + char new_path[MAXPGPATH]; - apply_handle_prepare_internal(&prepare_data); + /* Cleanup */ + close(delayed_fd); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* + * Construct old/new filename. + * + * Note that commit_lsn, end_lsn, and committime are not filled here. + * This is because when COMMIT PREPARED is come, we do no have a good + * way to indicate the related transaction file if they are filled. + */ + delay_file_name(old_path, MyLogicalRepWorker->subid, delayed_xid, + DELAYED_TXN_UNKNOWN, InvalidXLogRecPtr, InvalidXLogRecPtr, 0); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + delay_file_name(new_path, MyLogicalRepWorker->subid, delayed_xid, + DELAYED_TXN_PREPARED, InvalidXLogRecPtr, InvalidXLogRecPtr, 0); - in_remote_transaction = false; + /* And do actual rename */ + if (durable_rename(old_path, new_path, PANIC)) + abort(); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; - /* - * Since we have already prepared the transaction, in a case where the - * server crashes before clearing the subskiplsn, it will be left but the - * transaction won't be resent. But that's okay because it's a rare case - * and the subskiplsn will be cleared when finishing the next transaction. - */ - stop_skipping_changes(); - clear_subscription_skip_lsn(prepare_data.prepare_lsn); + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + } + else + { + logicalrep_read_prepare(s, &prepare_data); - pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); + if (prepare_data.prepare_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", + LSN_FORMAT_ARGS(prepare_data.prepare_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); + + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); + + apply_handle_prepare_internal(&prepare_data); + + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + } } /* @@ -1192,38 +1869,95 @@ apply_handle_commit_prepared(StringInfo s) LogicalRepCommitPreparedTxnData prepare_data; char gid[GIDSIZE]; - logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); - - /* Compute GID for two_phase transactions. */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); + if (delayed_fd > 0 && !in_delayed_transaction) + return; - /* There is no transaction when COMMIT PREPARED is called */ - begin_replication_step(); + logicalrep_read_commit_prepared(s, &prepare_data); /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * Check whether delayed file exists or not. If we have a file and we have + * not opened yet, it means that time-delayed logical replication has been + * requested. At that time we write the modified message. + * Otherwise, the transaction will be committed normally. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + if (delayed_fd < 0 && + is_given_transaction_delayed(MyLogicalRepWorker->subid, prepare_data.xid)) + { + char old_path[MAXPGPATH]; + char new_path[MAXPGPATH]; + LogicalRepCommitData commit_data = {0}; - FinishPreparedTransaction(gid, true); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* + * Open the delayed transaction file. + * + * Apart from RestoreDelayedTxns(), we don't want to read whole the + * directory to find the related file. That's why we use Invalid LSN + * and committime to indicate it. + */ + delay_file_name(old_path, MyLogicalRepWorker->subid, prepare_data.xid, + DELAYED_TXN_PREPARED, InvalidXLogRecPtr, + InvalidXLogRecPtr, 0); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); - in_remote_transaction = false; + delayed_fd = BasicOpenFile(old_path, O_WRONLY | O_APPEND | PG_BINARY); + if (delayed_fd < 0) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + old_path)); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + delay_file_name(new_path, MyLogicalRepWorker->subid, + prepare_data.xid, DELAYED_TXN_COMMITTED, + prepare_data.commit_lsn, prepare_data.end_lsn, + prepare_data.commit_time); - clear_subscription_skip_lsn(prepare_data.end_lsn); + close(delayed_fd); - pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); + if (durable_rename(old_path, new_path, PANIC)) + abort(); + + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; + + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + + ConstructCommitFromCommitPrepared(&commit_data, &prepare_data); + + /* Cache the commited transaction */ + cache_commit_data(&commit_data, prepare_data.xid); + } + else + { + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* There is no transaction when COMMIT PREPARED is called */ + begin_replication_step(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + clear_subscription_skip_lsn(prepare_data.end_lsn); + } } /* @@ -1242,6 +1976,23 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + + /* + * If the delayed file exists, just remove it. The delayed transaction have + * never prepared, so it's OK not to call FinishPreparedTransaction(). + */ + if (is_given_transaction_delayed(MyLogicalRepWorker->subid, rollback_data.xid)) + { + char path[MAXPGPATH]; + delay_file_name(path, MyLogicalRepWorker->subid, rollback_data.xid, + DELAYED_TXN_PREPARED, InvalidXLogRecPtr, + InvalidXLogRecPtr, 0); + + durable_unlink(path, LOG); + + return; + } + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); /* Compute GID for two_phase transactions. */ @@ -1317,16 +2068,66 @@ apply_handle_stream_prepare(StringInfo s) switch (apply_action) { case TRANS_LEADER_APPLY: + /* + * If time-delayed is requested, start to write changes to + * permanent file instead of temporary one. + */ + if (MySubscription->minapplydelay) + { + in_delayed_transaction = true; + + create_delay_file(prepare_data.xid); + + delayed_xid = prepare_data.xid; + } /* * The transaction has been serialized to file, so replay all the * spooled operations. + * Note that if time-delayed replication is requested, changes are + * written into permanent file here. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); - /* Mark the transaction as prepared. */ - apply_handle_prepare_internal(&prepare_data); + + /* + * If time-delayed replication is requested, construct a modified + * message and write it. This is needed for avoiding to write gid into + * file. More detail, see atop ReadPreparedCommonRecord(). + */ + if (MySubscription->minapplydelay) + { + char old_path[MAXPGPATH]; + char new_path[MAXPGPATH]; + + close(delayed_fd); + + delay_file_name(old_path, MyLogicalRepWorker->subid, + prepare_data.xid, DELAYED_TXN_UNKNOWN, + InvalidXLogRecPtr, InvalidXLogRecPtr, 0); + + delay_file_name(new_path, MyLogicalRepWorker->subid, + prepare_data.xid, DELAYED_TXN_PREPARED, + InvalidXLogRecPtr, InvalidXLogRecPtr, 0); + + if (durable_rename(old_path, new_path, PANIC)) + abort(); + + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; + + close(delayed_fd); + + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + } + else + { + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + } CommitTransactionCommand(); @@ -1405,8 +2206,11 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + if (list_length(DelayedTxnList) == 0) + { + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + } /* * Similar to prepare case, the subskiplsn could be left in a case of @@ -2175,19 +2979,43 @@ apply_handle_stream_commit(StringInfo s) { case TRANS_LEADER_APPLY: + /* + * If time-delayed is requested, start to write changes to + * permanent file instead of temporary one. + */ + if (MySubscription->minapplydelay) + { + in_delayed_transaction = true; + + create_delay_file(xid); + + delayed_xid = xid; + } + /* * The transaction has been serialized to file, so replay all the * spooled operations. + * Note that if time-delayed replication is requested, changes are + * written into permanent file here. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn); - apply_handle_commit_internal(&commit_data); + + /* Flush changes if time-delayed is requested */ + if (MySubscription->minapplydelay) + { + handle_delayed_transaction(LOGICAL_REP_MSG_COMMIT, &original_msg); + flush_delayed_changes(&commit_data); + } + else + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); elog(DEBUG1, "finished processing the STREAM COMMIT command"); + break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -2249,8 +3077,11 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + if (list_length(DelayedTxnList) == 0) + { + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + } pgstat_report_activity(STATE_IDLE, NULL); @@ -2325,7 +3156,8 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; - if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) + if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; rel = logicalrep_read_rel(s); @@ -2348,7 +3180,8 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) + if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_TYPE, s)) return; logicalrep_read_typ(s, &typ); @@ -2408,7 +3241,8 @@ apply_handle_insert(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -2560,7 +3394,8 @@ apply_handle_update(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -2741,7 +3576,8 @@ apply_handle_delete(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -3169,7 +4005,8 @@ apply_handle_truncate(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3431,11 +4268,14 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, pos = dlist_tail_element(FlushPosition, node, &lsn_mapping); *write = pos->remote_end; - *have_pending_txes = true; - return; + break; } } + /* If change are written into file, report the LSN instead */ + if (last_flushed > *flush) + *flush = last_flushed; + *have_pending_txes = !dlist_is_empty(&lsn_mapping); } @@ -3632,9 +4472,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(last_received); + if (list_length(DelayedTxnList) == 0) + process_syncing_tables(last_received); } + /* Check delayed transactions and apply them */ + check_delayed_transaction(); + /* Cleanup the memory. */ MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); @@ -3776,8 +4620,14 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) /* * No outstanding transactions to flush, we can report the latest received * position. This is important for synchronous replication. + * + * If the logical replication subscription has unprocessed changes then do + * not inform the publisher that the received latest LSN is already + * applied and flushed, otherwise, the publisher will make a wrong + * assumption about the logical replication progress. Instead, just send a + * feedback message to avoid a replication timeout during the delay. */ - if (!have_pending_txes) + if (!have_pending_txes && (list_length(DelayedTxnList) == 0)) flushpos = writepos = recvpos; if (writepos < last_writepos) @@ -3936,7 +4786,8 @@ maybe_reread_subscription(void) newsub->stream != MySubscription->stream || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + (newsub->minapplydelay == 0) != (MySubscription->minapplydelay == 0)) { if (am_parallel_apply_worker()) ereport(LOG, @@ -4581,6 +5432,9 @@ ApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("subscription has no replication slot set"))); + /* Check delayed files or initialize directory */ + InitializeDelayedTxn(); + /* Setup replication origin tracking. */ StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, @@ -4592,6 +5446,14 @@ ApplyWorkerMain(Datum main_arg) replorigin_session_origin = originid; origin_startpos = replorigin_session_get_progress(false); + /* + * If last_flushed exceeds origin_startpos, it means that some + * transactions are delaying. They have already been written into + * pernament file, so no need to recevie them again. + */ + if (origin_startpos < last_flushed) + origin_startpos = last_flushed; + /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !superuser_arg(MySubscription->owner); @@ -4663,9 +5525,15 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.twophase = false; options.proto.logical.origin = pstrdup(MySubscription->origin); + options.proto.logical.require_schema = false; + if (!am_tablesync_worker()) { + if (server_version >= 160000) + options.proto.logical.require_schema = + MySubscription->minapplydelay > 0; + /* * Even when the two_phase mode is requested by the user, it remains * as the tri-state PENDING until all tablesyncs have reached READY diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f88389de84..6718fe062b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -286,11 +286,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool require_schema_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->require_schema = false; foreach(lc, options) { @@ -397,6 +399,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", data->origin)); } + else if (strcmp(defel->defname, "require_schema") == 0) + { + if (require_schema_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + require_schema_option_given = true; + + data->require_schema = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -677,7 +689,8 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, - Relation relation, RelationSyncEntry *relentry) + Relation relation, RelationSyncEntry *relentry, + PGOutputData *data) { bool schema_sent; TransactionId xid = InvalidTransactionId; @@ -717,7 +730,7 @@ maybe_send_schema(LogicalDecodingContext *ctx, schema_sent = relentry->schema_sent; /* Nothing to do if we already sent the schema. */ - if (schema_sent) + if (!data->require_schema && schema_sent) return; /* @@ -1520,7 +1533,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Schema should be sent using the original relation because it also sends * the ancestor's relation. */ - maybe_send_schema(ctx, change, relation, relentry); + maybe_send_schema(ctx, change, relation, relentry, data); OutputPluginPrepareWrite(ctx, true); @@ -1605,7 +1618,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx, txn); - maybe_send_schema(ctx, change, relation, relentry); + maybe_send_schema(ctx, change, relation, relentry, data); } if (nrelids > 0) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 058244cd17..3ca0121d20 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4610,6 +4610,7 @@ getSubscriptions(Archive *fout) int i_subpublications; int i_subbinary; int i_subpasswordrequired; + int i_subminapplydelay; int i, ntups; @@ -4664,11 +4665,13 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 160000) appendPQExpBufferStr(query, " s.suborigin,\n" - " s.subpasswordrequired\n"); + " s.subpasswordrequired,\n" + " s.subminapplydelay\n"); else appendPQExpBuffer(query, " '%s' AS suborigin,\n" - " 't' AS subpasswordrequired\n", + " 't' AS subpasswordrequired,\n" + " 0 AS subminapplydelay\n", LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, @@ -4698,6 +4701,7 @@ getSubscriptions(Archive *fout) i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); + i_subminapplydelay = PQfnumber(res, "subminapplydelay"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4730,6 +4734,8 @@ getSubscriptions(Archive *fout) subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); subinfo[i].subpasswordrequired = pg_strdup(PQgetvalue(res, i, i_subpasswordrequired)); + subinfo[i].subminapplydelay = + atoi(PQgetvalue(res, i, i_subminapplydelay)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4814,6 +4820,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subpasswordrequired, "t") != 0) appendPQExpBuffer(query, ", password_required = false"); + if (subinfo->subminapplydelay > 0) + appendPQExpBuffer(query, ", min_apply_delay = '%d ms'", subinfo->subminapplydelay); + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ed6ce41ad7..6bf889a00a 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -662,6 +662,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *suborigin; char *subsynccommit; + int subminapplydelay; char *subpublications; char *subpasswordrequired; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 83a37ee601..78f2426d99 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6493,7 +6493,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6551,9 +6551,11 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 160000) appendPQExpBuffer(&buf, ", suborigin AS \"%s\"\n" - ", subrunasowner AS \"%s\"\n", + ", subrunasowner AS \"%s\"\n" + ", subminapplydelay AS \"%s\"\n", gettext_noop("Origin"), - gettext_noop("Run as Owner?")); + gettext_noop("Run as Owner?"), + gettext_noop("Min apply delay")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 5825b2a195..d7a9aeff86 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1925,9 +1925,9 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + COMPLETE_WITH("binary", "disable_on_error", "min_apply_delay", + "origin", "password_required", "run_as_owner", + "slot_name", "streaming", "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3269,9 +3269,10 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "disable_on_error", "enabled", "min_apply_delay", + "origin", "password_required", "run_as_owner", + "slot_name", "streaming", "synchronous_commit", + "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 91d729d62d..649e789240 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ + int32 subminapplydelay; /* Replication apply delay (ms) */ + bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ @@ -127,6 +129,7 @@ typedef struct Subscription * skipped */ char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ + int32 minapplydelay; /* Replication apply delay (ms) */ bool enabled; /* Indicates if the subscription is enabled */ bool binary; /* Indicates if the subscription wants data in * binary format */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index b4a8015403..59d924084f 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -30,6 +30,7 @@ typedef struct PGOutputData bool messages; bool two_phase; char *origin; + bool require_schema; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..954d297401 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -187,6 +187,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool require_schema; } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 9c52890f1d..b0e1ea5a56 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -115,18 +115,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -144,10 +144,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -166,10 +166,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -178,10 +178,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -213,10 +213,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -245,19 +245,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -269,27 +269,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -304,10 +304,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -322,10 +322,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -361,10 +361,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -373,10 +373,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -386,10 +386,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -402,18 +402,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -463,6 +463,44 @@ ERROR: permission denied for database regression -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +SET SESSION AUTHORIZATION regress_subscription_user; +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +ERROR: invalid value for parameter "min_apply_delay": "foo" +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); +ERROR: -1 ms is outside the valid range for parameter "min_apply_delay" (0 .. 2147483647) +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123); +ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive options +-- success -- min_apply_delay value without unit is taken as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 123 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- success -- min_apply_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d'); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 86400000 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); +ERROR: cannot set parallel streaming mode for subscription with min_apply_delay +-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel); +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123); +ERROR: cannot set min_apply_delay for subscription in parallel streaming mode +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index cc53458d91..a3f1f2cf1d 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -332,7 +332,34 @@ ALTER SUBSCRIPTION regress_testsub RENAME TO regress_testsub2; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +SET SESSION AUTHORIZATION regress_subscription_user; + +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); + +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123); + +-- success -- min_apply_delay value without unit is taken as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); +\dRs+ + +-- success -- min_apply_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d'); +\dRs+ + +-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); + +-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel); +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; + DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 91aa068c95..01f2c4284d 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -515,6 +515,37 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after renaming SUBSCRIPTION"; +# Test time-delayed logical replication +# +# If the subscription sets min_apply_delay parameter, the logical replication +# worker will delay the transaction apply for min_apply_delay milliseconds. We +# verify this by looking at the time difference between a) when tuples are +# inserted on the publisher, and b) when those changes are replicated on the +# subscriber. Even on slow machines, this strategy will give predictable behavior. + +# Set min_apply_delay parameter to 3 seconds +my $delay = 3; +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')"); + +# Before doing the insertion, get the current timestamp that will be +# used as a comparison base. +my $publisher_insert_time = time(); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins VALUES (generate_series(1101, 1120))"); + +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 1 FROM tab_ins WHERE a = 1120;" + ) + or die + "failed to replicate changes"; + +# This test is successful if and only if the LSN has been applied with at least +# the configured apply delay. +ok( time() - $publisher_insert_time >= $delay, + "subscriber applies WAL only after replication delay for non-streaming transaction" +); + # check all the cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); -- 2.27.0