From 58fd0bd09edfa2814e163e75e05449946ad1795b Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 10 Jan 2024 17:22:07 +0100 Subject: [PATCH v20240111 5/7] CREATE SUBSCRIPTION flag to enable sequences --- contrib/test_decoding/test_decoding.c | 13 ++---- doc/src/sgml/logicaldecoding.sgml | 8 ++++ doc/src/sgml/ref/alter_subscription.sgml | 5 ++- doc/src/sgml/ref/create_subscription.sgml | 12 ++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 43 ++++++++++++++++--- .../libpqwalreceiver/libpqwalreceiver.c | 4 ++ src/backend/replication/logical/logical.c | 20 +++++++++ src/backend/replication/logical/worker.c | 10 +++++ src/backend/replication/pgoutput/pgoutput.c | 14 ++++++ src/include/catalog/pg_subscription.h | 4 ++ src/include/replication/output_plugin.h | 1 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/subscription/t/034_sequences.pl | 2 +- 15 files changed, 121 insertions(+), 18 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 200caad6863..072bbba575b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -29,7 +29,6 @@ typedef struct MemoryContext context; bool include_xids; bool include_timestamp; - bool include_sequences; bool skip_empty_xacts; bool only_local; } TestDecodingData; @@ -179,12 +178,12 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_timestamp = false; data->skip_empty_xacts = false; data->only_local = false; - data->include_sequences = false; ctx->output_plugin_private = data; opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; opt->receive_rewrites = false; + opt->receive_sequences = false; foreach(option, ctx->output_plugin_options) { @@ -275,8 +274,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, { if (elem->arg == NULL) - data->include_sequences = true; - else if (!parse_bool(strVal(elem->arg), &data->include_sequences)) + opt->receive_sequences = true; + else if (!parse_bool(strVal(elem->arg), &opt->receive_sequences)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", @@ -794,9 +793,6 @@ pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TestDecodingData *data = ctx->output_plugin_private; TestDecodingTxnData *txndata = txn->output_plugin_private; - if (!data->include_sequences) - return; - /* output BEGIN if we haven't yet, but only for the transactional case */ if (transactional) { @@ -1038,9 +1034,6 @@ pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TestDecodingData *data = ctx->output_plugin_private; TestDecodingTxnData *txndata = txn->output_plugin_private; - if (!data->include_sequences) - return; - /* output BEGIN if we haven't yet, but only for the transactional case */ if (transactional) { diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 90e99a95128..31f8aa7f932 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -646,6 +646,7 @@ typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; + bool receive_sequences; } OutputPluginOptions; output_type has to either be set to @@ -658,6 +659,13 @@ typedef struct OutputPluginOptions replication, but they require special handling. + + If receive_sequences is true, the output plugin will + also be called for changes made by operations on sequences. These are + of interest to plugins that need to maintain sequences consistent with + the rest of the data, for example for purposes of upgrade or failover. + + The startup callback should validate the options present in ctx->output_plugin_options. If the output plugin diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 479ec495896..2d24ff5768e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -226,8 +226,9 @@ ALTER SUBSCRIPTION name RENAME TO < streaming, disable_on_error, password_required, - run_as_owner, and - origin. + run_as_owner, + origin and + sequences. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f1c20b3a465..4591e37dac7 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,6 +399,18 @@ CREATE SUBSCRIPTION subscription_name + + + sequences (boolean) + + + Specifies whether the subscription will request the publisher to + decode and send sequence changes. Note that for sequences to be + decoded, the sequence also has to be added to the publication. + The default is false. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index c516c25ac7b..e52abf61791 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; + sub->sequences = subform->subsequences; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 144609c1b9f..76b1bbd9b2f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_SEQUENCES 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -97,6 +98,7 @@ typedef struct SubOpts bool runasowner; char *origin; XLogRecPtr lsn; + bool sequences; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -158,6 +160,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_SEQUENCES)) + opts->sequences = false; /* Parse options */ foreach(lc, stmt_options) @@ -354,6 +358,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_SEQUENCES) && + strcmp(defel->defname, "sequences") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SEQUENCES)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_SEQUENCES; + opts->sequences = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -592,7 +605,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | SUBOPT_SEQUENCES); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -698,6 +711,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); + values[Anum_pg_subscription_subsequences - 1] = BoolGetDatum(opts.sequences); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -762,8 +776,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * relation sync status info. */ relations = fetch_table_list(wrconn, publications); - relations = list_concat(relations, - fetch_sequence_list(wrconn, publications)); + + /* + * Add sequences, but only if the subscription explicitly enabled + * them to be replicated. + */ + if (opts.sequences) + relations = list_concat(relations, + fetch_sequence_list(wrconn, publications)); foreach(lc, relations) { @@ -888,8 +908,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); - pubrel_names = list_concat(pubrel_names, - fetch_sequence_list(wrconn, sub->publications)); + + /* + * Add sequences, but only if the subscription explicitly enabled + * them to be replicated. + */ + if (sub->sequences) + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, sub->publications)); /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); @@ -1224,6 +1250,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_suborigin - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SEQUENCES)) + { + values[Anum_pg_subscription_subsequences - 1] = + BoolGetDatum(opts.sequences); + replaces[Anum_pg_subscription_subsequences - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 78344a03615..07a0e55c21d 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -475,6 +475,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.sequences && + PQserverVersion(conn->streamConn) >= 170000) + appendStringInfo(&cmd, ", sequences 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8cadc0e9e6f..86557745e18 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -475,6 +475,16 @@ CreateInitDecodingContext(const char *plugin, ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + /* + * StartupDecodingContext only checks which callbacks are defined, and + * startup_cb_wrapper sets receiver_sequences flag. We need to combine + * those two pieces and disable sequences if receiver_sequences=false. + * + * XXX We could also leave this up to the plugin startup, but it seems + * cleaner to just do it here. + */ + ctx->sequences &= ctx->options.receive_sequences; + return ctx; } @@ -622,6 +632,16 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx->reorder->output_rewrites = ctx->options.receive_rewrites; + /* + * StartupDecodingContext only checks which callbacks are defined, and + * startup_cb_wrapper sets receiver_sequences flag. We need to combine + * those two pieces and disable sequences if receiver_sequences=false. + * + * XXX We could also leave this up to the plugin startup, but it seems + * cleaner to just do it here. + */ + ctx->sequences &= ctx->options.receive_sequences; + ereport(LOG, (errmsg("starting logical decoding for slot \"%s\"", NameStr(slot->data.name)), diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 01fe9527b6b..6a71db354a3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4040,6 +4040,7 @@ maybe_reread_subscription(void) strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || + newsub->sequences != MySubscription->sequences || newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || @@ -4470,6 +4471,15 @@ set_stream_options(WalRcvStreamOptions *options, options->proto.logical.publication_names = MySubscription->publications; options->proto.logical.binary = MySubscription->binary; + /* + * FIXME maybe this should depend on server_version too? We don't want + * to request sequences from old releases, but maybe we should not just + * ignore that? Might easily lead to surprises. + * + * XXX Now there's a server_version check in libpqrcv_startstreaming. + */ + options->proto.logical.sequences = MySubscription->sequences; + /* * Assign the appropriate option value for streaming option according to * the 'streaming' mode and the publisher's ability to support that mode. diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 8b8f81c5f8e..5a19306d26c 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -291,6 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool sequences_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; @@ -404,6 +405,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "sequences") == 0) + { + if (sequences_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + sequences_option_given = true; + + data->sequences = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -442,6 +453,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* This plugin uses binary protocol. */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + opt->receive_sequences = false; /* * This is replication start and not slot initialization. @@ -540,6 +552,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ctx->streaming = false; ctx->twophase = false; } + + opt->receive_sequences = data->sequences; } /* diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index ca326255852..7205f9ed13b 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -93,6 +93,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + bool subsequences; /* True if replication stream should contain + * decoded sequence changes */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -139,6 +142,7 @@ typedef struct Subscription * occurs */ bool passwordrequired; /* Must connection use a password? */ bool runasowner; /* Run replication as subscription owner */ + bool sequences; /* Request sequences from the publisher */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 2ae25c6f4fe..f9ca2994fa7 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -27,6 +27,7 @@ typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; + bool receive_sequences; } OutputPluginOptions; /* diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e11472..a7461277a00 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool sequences; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 0899891cdb8..c0526fd2cd3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -186,6 +186,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool sequences; /* Request sequences from publisher */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl index 2e9317642fc..ca706631203 100644 --- a/src/test/subscription/t/034_sequences.pl +++ b/src/test/subscription/t/034_sequences.pl @@ -54,7 +54,7 @@ EXCEPTION WHEN others THEN NULL; END; \$\$ LANGUAGE plpgsql"); $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" + "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (sequences = true)" ); $node_publisher->wait_for_catchup('seq_sub'); -- 2.43.0