From 6493edd4371eb5085eaed0ffdcb8e93bf0ffada2 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 30 Mar 2023 05:25:53 +0000 Subject: [PATCH v3] (WIP) Time-delayed logical replication by serializing changes Similar to physical replication, a time-delayed copy of the data for logical replication is useful for some scenarios (particularly to fix errors that might cause data loss). This patch implements a new subscription parameter called 'min_apply_delay'. If the subscription sets min_apply_delay parameter, the logical replication worker will delay the transaction apply for min_apply_delay milliseconds. The delaying is implemented by serializing changes into file. The file is created when the worker receives BEGIN message. The worker writes received changes and flush at COMMIT. The delayed transaction is checked its commit time for every main loop, and applied from the file when the time exceeds the min_apply_delay. The commit time is stored in memory when the transaction is committed, or the worker restarts. The delay is calculated between the WAL time stamp and the current time on the subscriber. The combination of parallel streaming mode and min_apply_delay is not allowed. This is because in parallel streaming mode, we start applying the transaction stream as soon as the first change arrives without knowing the transaction's prepare/commit time. This means we cannot calculate the underlying network/decoding lag between publisher and subscriber, and so always waiting for the full 'min_apply_delay' period might include unnecessary delay. The other possibility was to apply the delay at the end of the parallel apply transaction but that would cause issues related to resource bloat and locks being held for a long time. Currently the combination of skip transaction feature and min_apply_delay does not work well. Earlier versions were written by Euler Taveira, Takamichi Osumi, and Kuroda Hayato Author: Kuroda Hayato, Takamichi Osumi --- doc/src/sgml/catalogs.sgml | 9 + doc/src/sgml/glossary.sgml | 15 + doc/src/sgml/logical-replication.sgml | 7 + doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 47 +- src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 6 +- src/backend/commands/subscriptioncmds.c | 123 +- src/backend/replication/logical/worker.c | 1232 +++++++++++++++++--- src/bin/pg_dump/pg_dump.c | 13 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 3 + src/test/regress/expected/subscription.out | 181 +-- src/test/regress/sql/subscription.sql | 25 + src/test/subscription/t/001_rep_changes.pl | 31 + 17 files changed, 1487 insertions(+), 224 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 7c09ab3000..6f2e348351 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7891,6 +7891,15 @@ SCRAM-SHA-256$<iteration count>:&l + + + subminapplydelay int4 + + + The minimum delay, in milliseconds, for applying changes + + + subname name diff --git a/doc/src/sgml/glossary.sgml b/doc/src/sgml/glossary.sgml index 29bf1873bd..204fe7f3ae 100644 --- a/doc/src/sgml/glossary.sgml +++ b/doc/src/sgml/glossary.sgml @@ -1757,6 +1757,21 @@ + + Time-delayed replication + + + Replication setup that delays the application of changes by a specified + minimum time-delay period. + + + For more information, see + for physical replication + and for logical replication. + + + + TOAST diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index c65f4aabfd..0be4d652aa 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -257,6 +257,13 @@ option of CREATE SUBSCRIPTION for details. + + A subscription can delay the application of changes by specifying the + min_apply_delay + subscription parameter. See for + details. + + Replication Slot Management diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index a85e04e4d6..bf6c5fe7f0 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -225,8 +225,9 @@ ALTER SUBSCRIPTION name RENAME TO < streaming, disable_on_error, password_required, - run_as_owner, and - origin. + run_as_owner, + origin, and + min_apply_delay. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 68aa2b47f2..8f31a14b37 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,7 +399,47 @@ CREATE SUBSCRIPTION subscription_name - + + + min_apply_delay (integer) + + + By default, the subscriber applies changes as soon as possible. This + parameter allows the user to delay the application of changes by a + given time period. This is done by writing all the changes into a + file once and apply contents after spending time. If the value is + specified without units, it is taken as milliseconds. The default + is zero (no delay). See + for details on the available valid time units. + + + Any delay becomes effective only after all initial table + synchronization has finished and occurs before each transaction starts + to get applied on the subscriber. The delay is calculated as the + difference between the WAL timestamp as written on the publisher and + the current time on the subscriber. Any overhead of time spent in + logical decoding and in transferring the transaction may reduce the + actual wait time. Even if the overhead already exceeds the requested + min_apply_delay value, all the changes are written + into file and applied immediately. If the system clocks on publisher + and subscriber are not synchronized, this may lead to apply changes + earlier than expected, but this is not a major issue because this + parameter is typically much larger than the time deviations between + servers. + + + + Delaying the replication means there is a much longer time between + making a change on the publisher, and that change being committed + on the subscriber. This can impact the performance of synchronous + replication. See + parameter. + + + + + + @@ -472,6 +512,11 @@ CREATE SUBSCRIPTION subscription_name + + A non-zero min_apply_delay parameter is not allowed when + streaming in parallel mode. + + We allow non-existent publications to be specified so that users can add those later. This means diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d07f88ce28..56f8fdda10 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->skiplsn = subform->subskiplsn; sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; + sub->minapplydelay = subform->subminapplydelay; sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6b098234f8..1b1f40de62 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1317,9 +1317,9 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, - subpasswordrequired, subrunasowner, +GRANT SELECT (oid, subdbid, subskiplsn, subminapplydelay, subname, subowner, + subenabled, subbinary, substream, subtwophasestate, + subdisableonerr, subpasswordrequired, subrunasowner, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3251d89ba8..aaa2065311 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_MIN_APPLY_DELAY 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool runasowner; char *origin; XLogRecPtr lsn; + int32 min_apply_delay; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -107,7 +109,7 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); - +static int32 defGetMinApplyDelay(DefElem *def); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY)) + opts->min_apply_delay = 0; /* Parse options */ foreach(lc, stmt_options) @@ -353,6 +357,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + strcmp(defel->defname, "min_apply_delay") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY; + opts->min_apply_delay = defGetMinApplyDelay(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -433,6 +446,32 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, "slot_name = NONE", "create_slot = false"))); } } + + /* + * The combination of parallel streaming mode and min_apply_delay is not + * allowed. This is because in parallel streaming mode, we start applying + * the transaction stream as soon as the first change arrives without + * knowing the transaction's prepare/commit time. This means we cannot + * calculate the underlying network/decoding lag between publisher and + * subscriber, and so always waiting for the full 'min_apply_delay' period + * might include unnecessary delay. + * + * The other possibility was to apply the delay at the end of the parallel + * apply transaction but that would cause issues related to resource bloat + * and locks being held for a long time. + */ + if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) && + opts->min_apply_delay > 0 && + opts->streaming == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + + /* + * translator: the first %s is a string of the form "parameter > 0" + * and the second one is "option = value". + */ + errmsg("%s and %s are mutually exclusive options", + "min_apply_delay > 0", "streaming = parallel")); } /* @@ -591,7 +630,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_MIN_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -682,6 +722,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId); values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr); + values[Anum_pg_subscription_subminapplydelay - 1] = Int32GetDatum(opts.min_apply_delay); values[Anum_pg_subscription_subname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); @@ -1130,7 +1171,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_MIN_APPLY_DELAY); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1174,6 +1216,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { + /* + * The combination of parallel streaming mode and + * min_apply_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.streaming == LOGICALREP_STREAM_PARALLEL && + !IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY) + && sub->minapplydelay > 0) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set parallel streaming mode for subscription with %s", + "min_apply_delay")); + values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; @@ -1202,6 +1257,26 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY)) + { + /* + * The combination of parallel streaming mode and + * min_apply_delay is not allowed. See + * parse_subscription_options. + */ + if (opts.min_apply_delay > 0 && + !IsSet(opts.specified_opts, SUBOPT_STREAMING) + && sub->stream == LOGICALREP_STREAM_PARALLEL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for subscription in parallel streaming mode", + "min_apply_delay")); + + values[Anum_pg_subscription_subminapplydelay - 1] = + Int32GetDatum(opts.min_apply_delay); + replaces[Anum_pg_subscription_subminapplydelay - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = @@ -2343,3 +2418,45 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Extract the min_apply_delay value from a DefElem. This is very similar to + * parse_and_validate_value() for integer values, because min_apply_delay + * accepts the same parameter format as recovery_min_apply_delay. + */ +static int32 +defGetMinApplyDelay(DefElem *def) +{ + char *input_string; + int result; + const char *hintmsg; + + input_string = defGetString(def); + + /* + * Parse given string as parameter which has millisecond unit + */ + if (!parse_int(input_string, &result, GUC_UNIT_MS, &hintmsg)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"%s\": \"%s\"", + "min_apply_delay", input_string), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + + /* + * Check both the lower boundary for the valid min_apply_delay range and + * the upper boundary as the safeguard for some platforms where INT_MAX is + * wider than int32 respectively. Although parse_int() has confirmed that + * the result is less than or equal to INT_MAX, the value will be stored + * in a catalog column of int32. + */ + if (result < 0 || result > PG_INT32_MAX) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("%d ms is outside the valid range for parameter \"%s\" (%d .. %d)", + result, + "min_apply_delay", + 0, PG_INT32_MAX))); + + return result; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3d58910c14..7311ca75ab 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -153,6 +153,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "common/file_utils.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -370,67 +371,742 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* XXX definitions for time-delayed logical replicaiton */ + +/* DELAYEDDIR stores files that contains changes of delayed transactions. */ +#define DELAYEDDIR "pg_logical/delayed_txns" +#define DELAYEDSUFFIX ".delayed_changes" + +/* List entry to map xid and commit time */ +typedef struct DelayedTxnListEntry +{ + TransactionId xid; + LogicalRepCommitData commit_data; +} DelayedTxnListEntry; + +/* + * An entry is appended when the we receives commit message and time-delayed + * logical replication is requested. The entry will be deleted after contents + * are applied. + */ +static List *DelayedTxnList = NIL; + +/* fields valid only when time-delayed logical replication is requested */ +static bool in_delayed_transaction = false; + +static TransactionId delayed_xid = InvalidTransactionId; + +/* + * Store flushed lsn for time-delayed logical replication. This is used when + * we send a feedback message to the publisher. + */ +static XLogRecPtr last_flushed = InvalidXLogRecPtr; + +/* + * FIXME: global file descriptor may be not sufficient. There is a possibility + * that non-streaming transactions are come concurrently. At that time + * create_delay_file() for the second transaction will be failed... + */ +static int delayed_fd = -1; + static inline void subxact_filename(char *path, Oid subid, TransactionId xid); static inline void changes_filename(char *path, Oid subid, TransactionId xid); /* - * Information about subtransactions of a given toplevel transaction. + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); +static inline void cleanup_subxact_info(void); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_open_file(Oid subid, TransactionId xid, + bool first_segment); +static void stream_write_change(char action, StringInfo s); +static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); +static void stream_close_file(void); + +static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); + +static void DisableSubscriptionAndExit(void); + +static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); +static void apply_handle_insert_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot); +static void apply_handle_update_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + Oid localindexoid); +static void apply_handle_delete_internal(ApplyExecutionData *edata, + ResultRelInfo *relinfo, + TupleTableSlot *remoteslot, + Oid localindexoid); +static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, + LogicalRepRelation *remoterel, + Oid localidxoid, + TupleTableSlot *remoteslot, + TupleTableSlot **localslot); +static void apply_handle_tuple_routing(ApplyExecutionData *edata, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + CmdType operation); + +/* Compute GID for two_phase transactions */ +static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); + +/* Functions for skipping changes */ +static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); +static void stop_skipping_changes(void); +static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + +/* Functions for apply error callback */ +static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); +static inline void reset_apply_error_context_info(void); + +static TransApplyAction get_transaction_apply_action(TransactionId xid, + ParallelApplyWorkerInfo **winfo); + +static void begin_replication_step(void); +static void end_replication_step(void); + +/* Functions for time-delayed logical replicaiton */ +static void cache_commit_data(LogicalRepCommitData *commit_data, TransactionId xid); +static void flush_delayed_changes(LogicalRepCommitData *commit_data); +static void delay_file_name(char *path, Oid subid, TransactionId xid); +static bool is_given_transaction_delayed(Oid subid, TransactionId xid); +static void create_delay_file(TransactionId xid); +static bool handle_delayed_transaction(char action, StringInfo s); +static void handle_delayed_prepared(char action, XLogRecPtr prepare_lsn, + XLogRecPtr end_lsn, TimestampTz prepare_time, + TransactionId xid); + +/* + * Cache commit_data into the list + */ +static void +cache_commit_data(LogicalRepCommitData *commit_data, TransactionId xid) +{ + MemoryContext old; + DelayedTxnListEntry *entry; + + old = MemoryContextSwitchTo(ApplyContext); + + entry = palloc0(sizeof(DelayedTxnListEntry)); + + /* Contruct an entry and append it */ + entry->xid = xid; + memcpy(&entry->commit_data, commit_data, sizeof(LogicalRepCommitData)); + DelayedTxnList = lappend(DelayedTxnList, entry); + + MemoryContextSwitchTo(old); +} + +/* + * Flush given changes and close the file. This will be called at the end of + * the transaction. + */ +static void +flush_delayed_changes(LogicalRepCommitData *commit_data) +{ + Assert(delayed_fd > 0); + Assert(TransactionIdIsValid(delayed_xid)); + + /* Cache given commit_data into the list */ + cache_commit_data(commit_data, delayed_xid); + + /* Flush previously written changes */ + if (pg_fdatasync(delayed_fd) != 0) + { + int save_errno = errno; + close(delayed_fd); + errno = save_errno; + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not fsync file")); + } + + /* Store flushed lsn */ + last_flushed = commit_data->end_lsn; + + /* Cleanup */ + close(delayed_fd); + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; +} + +/* + * Get formal filename from subid and xid + */ +static void +delay_file_name(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, DELAYEDDIR "/%u-%u" DELAYEDSUFFIX, subid, xid); +} + +/* + * Extract subid and xid and given pathname + */ +static void +extract_info_from_delay_file(char *path, Oid *subid, TransactionId *xid) +{ + sscanf(path, DELAYEDDIR "/%u-%u", subid, xid); +} + +/* + * Check whether the given transaction is delayed. This is done by checking the + * delay file. + */ +static bool +is_given_transaction_delayed(Oid subid, TransactionId xid) +{ + struct stat st; + char path[MAXPGPATH]; + + delay_file_name(path, subid, xid); + + return stat(path, &st) == 0; +} + +/* + * Apply the delayed transaction. In the function a delayed file is opened and + * read. Apply worker applies written changes. + */ +static void +apply_delayed_transaction(TransactionId xid, XLogRecPtr lsn) +{ + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + MemoryContext oldcxt; + ResourceOwner oldowner; + + /* Make sure we have an open transaction */ + begin_replication_step(); + + /* + * Allocate file handle and memory required to process all the messages in + * TopTransactionContext to avoid them getting reset after each message is + * processed. + */ + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* Open the spool file for the committed transaction */ + delay_file_name(path, MyLogicalRepWorker->subid, xid); + elog(DEBUG1, "replaying changes from file \"%s\"", path); + + /* + * Make sure the file is owned by the toplevel transaction so that the + * file will not be accidentally closed when aborting a subtransaction. + */ + oldowner = CurrentResourceOwner; + CurrentResourceOwner = TopTransactionResourceOwner; + + /* Open the specified file */ + delayed_fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + Assert(delayed_fd > 0); + + CurrentResourceOwner = oldowner; + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + end_replication_step(); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + size_t nbytes; + int len; + + CHECK_FOR_INTERRUPTS(); + + /* read length of the on-disk record */ + nbytes = read(delayed_fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (len <= 0) + elog(ERROR, "incorrect length %d in delaed transaction's changes file \"%s\"", + len, path); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + read(delayed_fd, buffer, len); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file \"%s\"", + nchanges, path); + } + + if (delayed_fd > 0) + { + close(delayed_fd); + delayed_fd = -1; + durable_unlink(path, LOG); + } + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + return; +} + +/* + * Create a file that will be written changes. + */ +static void +create_delay_file(TransactionId xid) +{ + char path[MAXPGPATH]; + int fd; + + Assert(TransactionIdIsValid(xid)); + Assert(delayed_fd < 0); + + delay_file_name(path, MyLogicalRepWorker->subid, xid); + + elog(DEBUG1, "creating a file \"%s\" for time-delayed logical replication", + path); + + fd = BasicOpenFile(path, O_WRONLY | O_CREAT | O_EXCL | O_APPEND | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + path)); + + delayed_fd = fd; +} + +/* + * Create a directory that holds delayed files + */ +static void +initialize_delay_directory(void) +{ + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, DELAYEDDIR); + if (MakePGDirectory(path) < 0 && errno != EEXIST) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + path)); + + START_CRIT_SECTION(); + fsync_fname(path, true); + END_CRIT_SECTION(); +} + +/* + * Read the delayed file and cache information of transaction e.g. committime + */ +static bool +ReadCommitRecord(int fd, TransactionId xid) +{ + int len = 0; + char action = 0; + char *buffer; + StringInfoData commit_message; + LogicalRepCommitData commit_data = {0}; + + /* + * If the transaction is not 2PC, we can assume that decoded commit record + * is at the end of the file. Therefore, read from the end. + */ + + /* FIXME: size of the messages is estimated from the document */ +#define COMMIT_MESSAGE_SIZE (sizeof(int) + sizeof(char) + sizeof(int8) + sizeof(LogicalRepCommitData)) + + /* seek file to the end */ + lseek(fd, -COMMIT_MESSAGE_SIZE, SEEK_END); + + read(fd, &len, sizeof(int)); + read(fd, &action, sizeof(char)); + + /* + * If the action is not 'C' and the got length is not valid, the + * transaction may be 2PC. So stop reading more. + */ + if (len != (COMMIT_MESSAGE_SIZE - sizeof(int)) && + action != LOGICAL_REP_MSG_COMMIT) + return false; + + /* + * If we reach here, this file seems valid and normal transaction. + * Start to read more and cache into memory to start delaying. + */ + + /* Prepare buffer and read from file */ + buffer = palloc0(len - sizeof(char)); + read(fd, buffer, len - sizeof(char)); + + /* Append to StringInfo in order to use same read function */ + initStringInfo(&commit_message); + appendBinaryStringInfo(&commit_message, buffer, len - sizeof(char)); + + /* Finally start to read decoded commit record */ + logicalrep_read_commit(&commit_message, &commit_data); + + /* ..and cache into the list */ + cache_commit_data(&commit_data, xid); + + pfree(buffer); + pfree(commit_message.data); + +#undef COMMIT_MESSAGE_SIZE + + return true; +} + +/* + * Read the delayed file and cache information of transaction e.g. committime. + * + * Note that apart from above, the native PREPARE/COMMIT PREPARED message is + * not directly written into the file. This is because gid can have arbitrary + * length and then we cannot estimate the offset of these records from the end + * of the file. Instread, the important information - prepare/commit_lsn, + * end_lsn, prepare/commit_time, and its transaction id are serialized. + * Functions for PREPARE/COMMIT PREPARED were combined because they have same + * attributes. + */ +static bool +ReadPreparedCommonRecord(int fd) +{ + int len = 0; + char action = 0; + + /* + * If the transaction is 2PC, we can assume that the final record is either + * or decoded prepare/commit prepared. + */ + + /* + * XXX: Modified message contains + * - length + * - message type + * - prepare/commit_lsn + * - end_lsn + * - xid + */ +#define PREPARE_MESSAGE_SIZE (sizeof(int) + sizeof(char) + sizeof(XLogRecPtr) + sizeof(XLogRecPtr) + sizeof(TimestampTz) + sizeof(TransactionId)) + lseek(fd, -PREPARE_MESSAGE_SIZE, SEEK_END); + read(fd, &len, sizeof(int)); + read(fd, &action, sizeof(char)); + + /* + * Do something if the record seems to be PREPARE or COMMIT PREPARED + */ + if (len == (PREPARE_MESSAGE_SIZE - sizeof(int)) && + action == LOGICAL_REP_MSG_PREPARE) + { + /* For PREPARE, do nothing */ + return true; + } + else if (len == (PREPARE_MESSAGE_SIZE - sizeof(int)) && + LOGICAL_REP_MSG_COMMIT_PREPARED) + { + /* For COMMIT PREPARED, cache into memory and start to delay */ + + LogicalRepCommitData commit_data = {0}; + TransactionId xid = InvalidTransactionId; + + /* Adjust position and append to StringInfo in order to use same read function */ + read(fd, &commit_data.commit_lsn, sizeof(XLogRecPtr)); + read(fd, &commit_data.end_lsn, sizeof(XLogRecPtr)); + read(fd, &commit_data.committime, sizeof(TimestampTz)); + read(fd, &xid, sizeof(TransactionId)); + + cache_commit_data(&commit_data, xid); + + return true; + } + else + return false; +} + +/* + * Transform information from commit_prepared style to commit style. + */ +static void +ConstructCommitFromCommitPrepared(LogicalRepCommitData *commit, + LogicalRepCommitPreparedTxnData *prepare_data) +{ + commit->commit_lsn = prepare_data->commit_lsn; + commit->committime = prepare_data->commit_time; + commit->end_lsn = prepare_data->end_lsn; +} + +/* + * Restore the delayed transaction from given files. + */ +static void +RestoreDelayedTxn(char *path) +{ + Oid subid = InvalidOid; + TransactionId xid = InvalidTransactionId; + int fd; + + /* Check filename to extract subid and xid */ + extract_info_from_delay_file(path, &subid, &xid); + + /* + * If the subid is not related with the apply worker, the transaction is + * out-of-scope for us... + */ + if (MyLogicalRepWorker->subid != subid) + return; + + /* OK, the transaction must be maintained by the worker. Open file */ + fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); + + /* And restore from the end of the file */ + if (ReadCommitRecord(fd, xid)) + goto cleanup; + + if (ReadPreparedCommonRecord(fd)) + goto cleanup; + + /* + * If we reach here the file seems to be corrupted. So remove once and + * receive changes again. + */ + close(fd); + durable_unlink(path, LOG); + return; + +cleanup: + close(fd); +} + +/* + * Restore all the delayed transactions to memory. + */ +static void +RestoreDelayedTxns(void) +{ + DIR *delayed_dir; + struct dirent *delayed_de; + + /* Read all the file step-by-step */ + delayed_dir = AllocateDir(DELAYEDDIR); + while ((delayed_de = ReadDir(delayed_dir, DELAYEDDIR)) != NULL) + { + char path[MAXPGPATH]; + PGFileType de_type; + + if (strcmp(delayed_de->d_name, ".") == 0 || + strcmp(delayed_de->d_name, "..") == 0) + continue; + + /* Check the filename and status */ + snprintf(path, sizeof(path), DELAYEDDIR "/%s", delayed_de->d_name); + de_type = get_dirent_type(path, delayed_de, false, DEBUG1); + + if (de_type != PGFILETYPE_REG) + continue; + + /* Found a delayed transaction. Restore it. */ + RestoreDelayedTxn(path); + } + FreeDir(delayed_dir); +} + +/* + * Restore delayed transactions, or initialize the directory + */ +static void +InitializeDelayedTxn(void) +{ + struct stat st; + char path[MAXPGPATH]; + + snprintf(path, MAXPGPATH, DELAYEDDIR); + + /* + * If the given directory does not exist, create one. Otherwise start to + * restore. + */ + if (stat(path, &st) != 0) + { + initialize_delay_directory(); + return; + } + + RestoreDelayedTxns(); +} + +/* + * Write a given message to a file. This is called for every message. + * This returns true only when changes are written into file. + * + * The format of the serialized changes is same as the streamed one. This + * has a length (not including the length), action code (identifying the + * message type) and message contents (without the subxact TransactionId + * value). + */ +static bool +handle_delayed_transaction(char action, StringInfo s) +{ + int len; + + /* Return if we are not in delay */ + if (!in_delayed_transaction) + return false; + + Assert(delayed_fd > 0); + Assert(TransactionIdIsValid(delayed_xid)); + + len = (s->len - s->cursor) + sizeof(char); + + if (write(delayed_fd, &len, sizeof(len)) != sizeof(len)) + abort(); + if (write(delayed_fd, &action, sizeof(action)) != sizeof(action)) + abort(); + + len = (s->len - s->cursor); + + if (write(delayed_fd, &s->data[s->cursor], len) != len) + abort(); + + return true; +} + +/* + * Write a given information from PREPARE/COMMIT PREPARED to a file. This is + * called when we receive PREPARE or COMMIT PREPARED message. This returns true + * only when changes are written into file. + * + * About the needness of the function see comments atop + * ReadPreparedCommonRecord(). */ -static void subxact_info_write(Oid subid, TransactionId xid); -static void subxact_info_read(Oid subid, TransactionId xid); -static void subxact_info_add(TransactionId xid); -static inline void cleanup_subxact_info(void); +static void +handle_delayed_prepared(char action, XLogRecPtr prepare_lsn, + XLogRecPtr end_lsn, TimestampTz prepare_time, + TransactionId xid) +{ + int len; + + Assert(delayed_fd > 0); + +#define MESSAGE_SIZE (sizeof(char) + sizeof(XLogRecPtr) + sizeof(XLogRecPtr) + sizeof(TimestampTz) + sizeof(TransactionId)) + len = MESSAGE_SIZE; + + /* + * XXX: Modified message contains + * - length + * - message type + * - prepare/commit_lsn + * - end_lsn + * - xid + */ + if (write(delayed_fd, &len, sizeof(len)) != sizeof(len)) + abort(); + if (write(delayed_fd, &action, sizeof(action)) != sizeof(action)) + abort(); + if (write(delayed_fd, &prepare_lsn, sizeof(prepare_lsn)) != sizeof(prepare_lsn)) + abort(); + if (write(delayed_fd, &end_lsn, sizeof(end_lsn)) != sizeof(end_lsn)) + abort(); + if (write(delayed_fd, &prepare_time, sizeof(prepare_time)) != sizeof(prepare_time)) + abort(); + if (write(delayed_fd, &xid, sizeof(xid)) != sizeof(xid)) + abort(); +#undef MESSAGE_SIZE +} /* - * Serialize and deserialize changes for a toplevel transaction. + * Check the delayed transactions and apply if we elapsed sufficient time */ -static void stream_open_file(Oid subid, TransactionId xid, - bool first_segment); -static void stream_write_change(char action, StringInfo s); -static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s); -static void stream_close_file(void); +static void +check_delayed_transaction(void) +{ + TimestampTz now; + ListCell *lc; + int n = 0; -static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); + if (in_streamed_transaction) + return; -static void DisableSubscriptionAndExit(void); + now = GetCurrentTimestamp(); -static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); -static void apply_handle_insert_internal(ApplyExecutionData *edata, - ResultRelInfo *relinfo, - TupleTableSlot *remoteslot); -static void apply_handle_update_internal(ApplyExecutionData *edata, - ResultRelInfo *relinfo, - TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - Oid localindexoid); -static void apply_handle_delete_internal(ApplyExecutionData *edata, - ResultRelInfo *relinfo, - TupleTableSlot *remoteslot, - Oid localindexoid); -static bool FindReplTupleInLocalRel(EState *estate, Relation localrel, - LogicalRepRelation *remoterel, - Oid localidxoid, - TupleTableSlot *remoteslot, - TupleTableSlot **localslot); -static void apply_handle_tuple_routing(ApplyExecutionData *edata, - TupleTableSlot *remoteslot, - LogicalRepTupleData *newtup, - CmdType operation); + /* Read cache on-by-one */ + foreach(lc, DelayedTxnList) + { + DelayedTxnListEntry *entry = (DelayedTxnListEntry *) lfirst(lc); + LogicalRepCommitData *commit_data = &entry->commit_data; + TimestampTz delayUntil; + long diffms; -/* Compute GID for two_phase transactions */ -static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); + delayUntil = TimestampTzPlusMilliseconds(commit_data->committime, + MySubscription->minapplydelay); -/* Functions for skipping changes */ -static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); -static void stop_skipping_changes(void); -static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); + diffms = TimestampDifferenceMilliseconds(now, delayUntil); -/* Functions for apply error callback */ -static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); -static inline void reset_apply_error_context_info(void); + /* + * The cache is aligned the commit ordering, so we do not have to check + * latter entries if we find transactions that should not be applied. + */ + if (diffms > 0) + break; -static TransApplyAction get_transaction_apply_action(TransactionId xid, - ParallelApplyWorkerInfo **winfo); + elog(DEBUG1, "started to apply transaction %u", entry->xid); + + apply_delayed_transaction(entry->xid, commit_data->end_lsn); + apply_handle_commit_internal(commit_data); + + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + n++; + } + /* Discards applied entries */ + DelayedTxnList = list_delete_first_n(DelayedTxnList, n); +} /* * Return the name of the logical replication worker. @@ -1019,13 +1695,28 @@ apply_handle_begin(StringInfo s) logicalrep_read_begin(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); - remote_final_lsn = begin_data.final_lsn; + /* + * Prepare to write changes into file if time-delayed replication is + * requested. + */ + if (MySubscription->minapplydelay && AllTablesyncsReady()) + { + in_delayed_transaction = true; - maybe_start_skipping_changes(begin_data.final_lsn); + create_delay_file(begin_data.xid); - in_remote_transaction = true; + delayed_xid = begin_data.xid; + } + else + { + remote_final_lsn = begin_data.final_lsn; - pgstat_report_activity(STATE_RUNNING, NULL); + maybe_start_skipping_changes(begin_data.final_lsn); + + in_remote_transaction = true; + + pgstat_report_activity(STATE_RUNNING, NULL); + } } /* @@ -1037,20 +1728,40 @@ static void apply_handle_commit(StringInfo s) { LogicalRepCommitData commit_data; + /* Save the message before it is consumed. */ + StringInfoData original_msg = *s; + + /* + * If we are applying the delayed transaction, skip here. + * Actual COMMIT will be done outside the apply_delayed_transaction() + */ + if (delayed_fd > 0 && !in_delayed_transaction) + return; logicalrep_read_commit(s, &commit_data); - if (commit_data.commit_lsn != remote_final_lsn) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", - LSN_FORMAT_ARGS(commit_data.commit_lsn), - LSN_FORMAT_ARGS(remote_final_lsn)))); + /* If we are applying, skip here. */ + + if (in_delayed_transaction) + { + /* Write a commit message into file and flush all of messages */ + handle_delayed_transaction(LOGICAL_REP_MSG_COMMIT, &original_msg); + flush_delayed_changes(&commit_data); + } + else + { + if (commit_data.commit_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)", + LSN_FORMAT_ARGS(commit_data.commit_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); - apply_handle_commit_internal(&commit_data); + apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + } pgstat_report_activity(STATE_IDLE, NULL); reset_apply_error_context_info(); @@ -1076,13 +1787,28 @@ apply_handle_begin_prepare(StringInfo s) logicalrep_read_begin_prepare(s, &begin_data); set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); - remote_final_lsn = begin_data.prepare_lsn; + /* + * Prepare to write changes into file if time-delayed replication is + * requested. + */ + if (MySubscription->minapplydelay && AllTablesyncsReady()) + { + in_delayed_transaction = true; + + create_delay_file(begin_data.xid); + + delayed_xid = begin_data.xid; + } + else + { + remote_final_lsn = begin_data.prepare_lsn; - maybe_start_skipping_changes(begin_data.prepare_lsn); + maybe_start_skipping_changes(begin_data.prepare_lsn); - in_remote_transaction = true; + in_remote_transaction = true; - pgstat_report_activity(STATE_RUNNING, NULL); + pgstat_report_activity(STATE_RUNNING, NULL); + } } /* @@ -1124,57 +1850,115 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) /* * Handle PREPARE message. + * + * When time-delayed logical replication is requested, we just write a message + * into file and return. This means that no transaction is prepared on + * subscriber. This can avoid that the apply worker acquires locks for a long + * time due to the long min_apply_time. + * + * Even if the transaction is applied from delayed file, the transaction is not + * prepared. We just skip PREPARE message. */ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; - logicalrep_read_prepare(s, &prepare_data); + /* + * If we are applying the delayed transaction, just consume the PREPARE + * message and return. + */ + if (delayed_fd > 0 && !in_delayed_transaction) + { + /* Consume non-needed data */ + (void) pq_getmsgint64(s); + (void) pq_getmsgint64(s); + (void) pq_getmsgint64(s); + (void) pq_getmsgint(s, 4); - if (prepare_data.prepare_lsn != remote_final_lsn) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", - LSN_FORMAT_ARGS(prepare_data.prepare_lsn), - LSN_FORMAT_ARGS(remote_final_lsn)))); + return; + } /* - * Unlike commit, here, we always prepare the transaction even though no - * change has happened in this transaction or all changes are skipped. It - * is done this way because at commit prepared time, we won't know whether - * we have skipped preparing a transaction because of those reasons. - * - * XXX, We can optimize such that at commit prepared time, we first check - * whether we have prepared the transaction or not but that doesn't seem - * worthwhile because such cases shouldn't be common. + * If we are writing changes into delayed file, construct a modified + * message and write it. This is needed for avoiding to write gid into + * file. More detail, see atop ReadPreparedCommonRecord(). */ - begin_replication_step(); + if (in_delayed_transaction) + { + /* Write the modifed message */ + handle_delayed_prepared(LOGICAL_REP_MSG_PREPARE, + prepare_data.prepare_lsn, + prepare_data.end_lsn, + prepare_data.prepare_time, + prepare_data.xid); + + /* Flush changes */ + if (pg_fdatasync(delayed_fd) != 0) + { + int save_errno = errno; + close(delayed_fd); + errno = save_errno; + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not fsync file")); + } - apply_handle_prepare_internal(&prepare_data); + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* Cleanup */ + close(delayed_fd); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + } + else + { + logicalrep_read_prepare(s, &prepare_data); - in_remote_transaction = false; + if (prepare_data.prepare_lsn != remote_final_lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)", + LSN_FORMAT_ARGS(prepare_data.prepare_lsn), + LSN_FORMAT_ARGS(remote_final_lsn)))); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + /* + * Unlike commit, here, we always prepare the transaction even though no + * change has happened in this transaction or all changes are skipped. It + * is done this way because at commit prepared time, we won't know whether + * we have skipped preparing a transaction because of those reasons. + * + * XXX, We can optimize such that at commit prepared time, we first check + * whether we have prepared the transaction or not but that doesn't seem + * worthwhile because such cases shouldn't be common. + */ + begin_replication_step(); - /* - * Since we have already prepared the transaction, in a case where the - * server crashes before clearing the subskiplsn, it will be left but the - * transaction won't be resent. But that's okay because it's a rare case - * and the subskiplsn will be cleared when finishing the next transaction. - */ - stop_skipping_changes(); - clear_subscription_skip_lsn(prepare_data.prepare_lsn); + apply_handle_prepare_internal(&prepare_data); - pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + /* + * Since we have already prepared the transaction, in a case where the + * server crashes before clearing the subskiplsn, it will be left but the + * transaction won't be resent. But that's okay because it's a rare case + * and the subskiplsn will be cleared when finishing the next transaction. + */ + stop_skipping_changes(); + clear_subscription_skip_lsn(prepare_data.prepare_lsn); + } } /* @@ -1192,38 +1976,95 @@ apply_handle_commit_prepared(StringInfo s) LogicalRepCommitPreparedTxnData prepare_data; char gid[GIDSIZE]; - logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); - - /* Compute GID for two_phase transactions. */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); + if (delayed_fd > 0 && !in_delayed_transaction) + return; - /* There is no transaction when COMMIT PREPARED is called */ - begin_replication_step(); + logicalrep_read_commit_prepared(s, &prepare_data); /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * Check whether delayed file exists or not. If we have a file and we have + * not opened yet, it means that time-delayed logical replication has been + * requested. At that time we write the modified message. + * Otherwise, the transaction will be committed normally. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + if (delayed_fd < 0 && + is_given_transaction_delayed(MyLogicalRepWorker->subid, prepare_data.xid)) + { + char path[MAXPGPATH]; + LogicalRepCommitData commit_data = {0}; - FinishPreparedTransaction(gid, true); - end_replication_step(); - CommitTransactionCommand(); - pgstat_report_stat(false); + /* Open file again */ + delay_file_name(path, MyLogicalRepWorker->subid, prepare_data.xid); + delayed_fd = BasicOpenFile(path, O_WRONLY | O_APPEND | PG_BINARY); + if (delayed_fd < 0) + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path)); + + /* Write modified message to file */ + handle_delayed_prepared(LOGICAL_REP_MSG_COMMIT_PREPARED, + prepare_data.commit_lsn, + prepare_data.end_lsn, + prepare_data.commit_time, + prepare_data.xid); + /* Flush it */ + if (pg_fdatasync(delayed_fd) != 0) + { + int save_errno = errno; + close(delayed_fd); + errno = save_errno; + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not fsync file")); + } - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); - in_remote_transaction = false; + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + /* clean up */ + close(delayed_fd); - clear_subscription_skip_lsn(prepare_data.end_lsn); + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; - pgstat_report_activity(STATE_IDLE, NULL); - reset_apply_error_context_info(); + ConstructCommitFromCommitPrepared(&commit_data, &prepare_data); + + /* Cache the commited transaction */ + cache_commit_data(&commit_data, prepare_data.xid); + } + else + { + set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn); + + /* Compute GID for two_phase transactions. */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, + gid, sizeof(gid)); + + /* There is no transaction when COMMIT PREPARED is called */ + begin_replication_step(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + end_replication_step(); + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + clear_subscription_skip_lsn(prepare_data.end_lsn); + } } /* @@ -1242,6 +2083,20 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); + + /* + * If the delayed file exists, just remove it. The delayed transaction have + * never prepared, so it's OK not to call FinishPreparedTransaction(). + */ + if (is_given_transaction_delayed(MyLogicalRepWorker->subid, rollback_data.xid)) + { + char path[MAXPGPATH]; + delay_file_name(path, MyLogicalRepWorker->subid, rollback_data.xid); + durable_unlink(path, LOG); + + return; + } + set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn); /* Compute GID for two_phase transactions. */ @@ -1317,16 +2172,68 @@ apply_handle_stream_prepare(StringInfo s) switch (apply_action) { case TRANS_LEADER_APPLY: + /* + * If time-delayed is requested, start to write changes to + * permanent file instead of temporary one. + */ + if (MySubscription->minapplydelay) + { + in_delayed_transaction = true; + + create_delay_file(prepare_data.xid); + + delayed_xid = prepare_data.xid; + } /* * The transaction has been serialized to file, so replay all the * spooled operations. + * Note that if time-delayed replication is requested, changes are + * written into permanent file here. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, prepare_data.xid, prepare_data.prepare_lsn); - /* Mark the transaction as prepared. */ - apply_handle_prepare_internal(&prepare_data); + + /* + * If time-delayed replication is requested, construct a modified + * message and write it. This is needed for avoiding to write gid into + * file. More detail, see atop ReadPreparedCommonRecord(). + */ + if (MySubscription->minapplydelay) + { + /* Write the modified message */ + handle_delayed_prepared(LOGICAL_REP_MSG_PREPARE, + prepare_data.prepare_lsn, + prepare_data.end_lsn, + prepare_data.prepare_time, + prepare_data.xid); + + /* Flush changes */ + if (pg_fdatasync(delayed_fd) != 0) + { + int save_errno = errno; + close(delayed_fd); + errno = save_errno; + ereport(ERROR, + errcode_for_file_access(), + errmsg("could not fsync file")); + } + + /* Store flushed lsn */ + last_flushed = prepare_data.end_lsn; + + close(delayed_fd); + + delayed_fd = -1; + delayed_xid = InvalidTransactionId; + in_delayed_transaction = false; + } + else + { + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + } CommitTransactionCommand(); @@ -1405,8 +2312,11 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + if (list_length(DelayedTxnList) == 0) + { + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + } /* * Similar to prepare case, the subskiplsn could be left in a case of @@ -2175,19 +3085,43 @@ apply_handle_stream_commit(StringInfo s) { case TRANS_LEADER_APPLY: + /* + * If time-delayed is requested, start to write changes to + * permanent file instead of temporary one. + */ + if (MySubscription->minapplydelay) + { + in_delayed_transaction = true; + + create_delay_file(xid); + + delayed_xid = xid; + } + /* * The transaction has been serialized to file, so replay all the * spooled operations. + * Note that if time-delayed replication is requested, changes are + * written into permanent file here. */ apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid, - commit_data.commit_lsn); + commit_data.commit_lsn); - apply_handle_commit_internal(&commit_data); + + /* Flush changes if time-delayed is requested */ + if (MySubscription->minapplydelay) + { + handle_delayed_transaction(LOGICAL_REP_MSG_COMMIT, &original_msg); + flush_delayed_changes(&commit_data); + } + else + apply_handle_commit_internal(&commit_data); /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); elog(DEBUG1, "finished processing the STREAM COMMIT command"); + break; case TRANS_LEADER_SEND_TO_PARALLEL: @@ -2249,8 +3183,11 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(commit_data.end_lsn); + if (list_length(DelayedTxnList) == 0) + { + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + } pgstat_report_activity(STATE_IDLE, NULL); @@ -2325,7 +3262,8 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; - if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s)) + if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_RELATION, s)) return; rel = logicalrep_read_rel(s); @@ -2348,7 +3286,8 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; - if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s)) + if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_TYPE, s)) return; logicalrep_read_typ(s, &typ); @@ -2408,7 +3347,8 @@ apply_handle_insert(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_INSERT, s)) return; begin_replication_step(); @@ -2560,7 +3500,8 @@ apply_handle_update(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_UPDATE, s)) return; begin_replication_step(); @@ -2741,7 +3682,8 @@ apply_handle_delete(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_DELETE, s)) return; begin_replication_step(); @@ -3169,7 +4111,8 @@ apply_handle_truncate(StringInfo s) * streamed transactions. */ if (is_skipping_changes() || - handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) + handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s) || + handle_delayed_transaction(LOGICAL_REP_MSG_TRUNCATE, s)) return; begin_replication_step(); @@ -3436,6 +4379,10 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, } } + /* If change are written into file, report the LSN instead */ + if (last_flushed > *flush) + *flush = last_flushed; + *have_pending_txes = !dlist_is_empty(&lsn_mapping); } @@ -3632,9 +4579,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) maybe_reread_subscription(); /* Process any table synchronization changes. */ - process_syncing_tables(last_received); + if (list_length(DelayedTxnList) == 0) + process_syncing_tables(last_received); } + /* Check delayed transactions and apply them */ + check_delayed_transaction(); + /* Cleanup the memory. */ MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); @@ -3776,8 +4727,14 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) /* * No outstanding transactions to flush, we can report the latest received * position. This is important for synchronous replication. + * + * If the logical replication subscription has unprocessed changes then do + * not inform the publisher that the received latest LSN is already + * applied and flushed, otherwise, the publisher will make a wrong + * assumption about the logical replication progress. Instead, just send a + * feedback message to avoid a replication timeout during the delay. */ - if (!have_pending_txes) + if (!have_pending_txes && (list_length(DelayedTxnList) == 0)) flushpos = writepos = recvpos; if (writepos < last_writepos) @@ -4581,6 +5538,9 @@ ApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("subscription has no replication slot set"))); + /* Check delayed files or initialize directory */ + InitializeDelayedTxn(); + /* Setup replication origin tracking. */ StartTransactionCommand(); ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6abbcff683..2916965c56 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4609,6 +4609,7 @@ getSubscriptions(Archive *fout) int i_subpublications; int i_subbinary; int i_subpasswordrequired; + int i_subminapplydelay; int i, ntups; @@ -4663,11 +4664,13 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 160000) appendPQExpBufferStr(query, " s.suborigin,\n" - " s.subpasswordrequired\n"); + " s.subpasswordrequired,\n" + " s.subminapplydelay\n"); else appendPQExpBuffer(query, " '%s' AS suborigin,\n" - " 't' AS subpasswordrequired\n", + " 't' AS subpasswordrequired,\n" + " 0 AS subminapplydelay\n", LOGICALREP_ORIGIN_ANY); appendPQExpBufferStr(query, @@ -4697,6 +4700,7 @@ getSubscriptions(Archive *fout) i_subdisableonerr = PQfnumber(res, "subdisableonerr"); i_suborigin = PQfnumber(res, "suborigin"); i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); + i_subminapplydelay = PQfnumber(res, "subminapplydelay"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4729,6 +4733,8 @@ getSubscriptions(Archive *fout) subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); subinfo[i].subpasswordrequired = pg_strdup(PQgetvalue(res, i, i_subpasswordrequired)); + subinfo[i].subminapplydelay = + atoi(PQgetvalue(res, i, i_subminapplydelay)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4813,6 +4819,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subpasswordrequired, "t") != 0) appendPQExpBuffer(query, ", password_required = false"); + if (subinfo->subminapplydelay > 0) + appendPQExpBuffer(query, ", min_apply_delay = '%d ms'", subinfo->subminapplydelay); + appendPQExpBufferStr(query, ");\n"); if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index ed6ce41ad7..6bf889a00a 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -662,6 +662,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *suborigin; char *subsynccommit; + int subminapplydelay; char *subpublications; char *subpasswordrequired; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 83a37ee601..78f2426d99 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6493,7 +6493,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6551,9 +6551,11 @@ describeSubscriptions(const char *pattern, bool verbose) if (pset.sversion >= 160000) appendPQExpBuffer(&buf, ", suborigin AS \"%s\"\n" - ", subrunasowner AS \"%s\"\n", + ", subrunasowner AS \"%s\"\n" + ", subminapplydelay AS \"%s\"\n", gettext_noop("Origin"), - gettext_noop("Run as Owner?")); + gettext_noop("Run as Owner?"), + gettext_noop("Min apply delay")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index e38a49e8bd..881f8288a7 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1925,7 +1925,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name", + COMPLETE_WITH("binary", "disable_on_error", "min_apply_delay", "origin", "slot_name", "streaming", "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) @@ -3268,7 +3268,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "origin", "slot_name", + "disable_on_error", "enabled", "min_apply_delay", "origin", "slot_name", "streaming", "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 91d729d62d..649e789240 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -74,6 +74,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW Oid subowner BKI_LOOKUP(pg_authid); /* Owner of the subscription */ + int32 subminapplydelay; /* Replication apply delay (ms) */ + bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ @@ -127,6 +129,7 @@ typedef struct Subscription * skipped */ char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ + int32 minapplydelay; /* Replication apply delay (ms) */ bool enabled; /* Indicates if the subscription is enabled */ bool binary; /* Indicates if the subscription wants data in * binary format */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 9c52890f1d..31261bcd9c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -115,18 +115,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -144,10 +144,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -166,10 +166,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -178,10 +178,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -213,10 +213,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | 0 | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -245,19 +245,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -269,27 +269,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -304,10 +304,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -322,10 +322,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -361,10 +361,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -373,10 +373,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -386,10 +386,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -402,18 +402,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | f | 0 | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -464,6 +464,43 @@ ERROR: permission denied for database regression ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +ERROR: invalid value for parameter "min_apply_delay": "foo" +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); +ERROR: -1 ms is outside the valid range for parameter "min_apply_delay" (0 .. 2147483647) +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123); +ERROR: min_apply_delay > 0 and streaming = parallel are mutually exclusive options +-- success -- min_apply_delay value without unit is taken as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+--------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | hayato | f | {testpub} | f | off | d | f | any | f | 123 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- success -- min_apply_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d'); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Min apply delay | Synchronous commit | Conninfo | Skip LSN +-----------------+--------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+-----------------+--------------------+-----------------------------+---------- + regress_testsub | hayato | f | {testpub} | f | off | d | f | any | f | 86400000 | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); +ERROR: cannot set parallel streaming mode for subscription with min_apply_delay +-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel); +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123); +ERROR: cannot set min_apply_delay for subscription in parallel streaming mode +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index cc53458d91..d5ad91a96f 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -333,6 +333,31 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; RESET SESSION AUTHORIZATION; + +-- fail -- min_apply_delay must be a non-negative integer +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo); +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1); + +-- fail - utilizing streaming = parallel with time-delayed replication is not supported +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = parallel, min_apply_delay = 123); + +-- success -- min_apply_delay value without unit is taken as milliseconds +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123); +\dRs+ + +-- success -- min_apply_delay value with unit is converted into ms and stored as an integer +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = '1 d'); +\dRs+ + +-- fail - alter subscription with streaming = parallel should fail when time-delayed replication is set +ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); + +-- fail - alter subscription with min_apply_delay should fail when streaming = parallel is set +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 0, streaming = parallel); +ALTER SUBSCRIPTION regress_testsub SET (min_apply_delay = 123); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 91aa068c95..01f2c4284d 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -515,6 +515,37 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after renaming SUBSCRIPTION"; +# Test time-delayed logical replication +# +# If the subscription sets min_apply_delay parameter, the logical replication +# worker will delay the transaction apply for min_apply_delay milliseconds. We +# verify this by looking at the time difference between a) when tuples are +# inserted on the publisher, and b) when those changes are replicated on the +# subscriber. Even on slow machines, this strategy will give predictable behavior. + +# Set min_apply_delay parameter to 3 seconds +my $delay = 3; +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')"); + +# Before doing the insertion, get the current timestamp that will be +# used as a comparison base. +my $publisher_insert_time = time(); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins VALUES (generate_series(1101, 1120))"); + +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 1 FROM tab_ins WHERE a = 1120;" + ) + or die + "failed to replicate changes"; + +# This test is successful if and only if the LSN has been applied with at least +# the configured apply delay. +ok( time() - $publisher_insert_time >= $delay, + "subscriber applies WAL only after replication delay for non-streaming transaction" +); + # check all the cleanup $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); -- 2.27.0