From bca6a7c9e2b09638376d179452863006d1ebc20f Mon Sep 17 00:00:00 2001 From: Julien Tachoires Date: Thu, 18 Jul 2024 07:45:43 -0700 Subject: [PATCH 5/6] Add the subscription option spill_compression --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 24 +++ src/backend/catalog/pg_subscription.c | 6 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 31 +++- .../libpqwalreceiver/libpqwalreceiver.c | 5 + src/backend/replication/logical/logical.c | 4 + .../replication/logical/reorderbuffer.c | 15 +- .../logical/reorderbuffer_compression.c | 57 +++++++ src/backend/replication/logical/worker.c | 13 +- src/backend/replication/pgoutput/pgoutput.c | 28 ++++ src/bin/pg_dump/pg_dump.c | 18 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 8 +- src/bin/psql/describe.c | 7 +- src/bin/psql/tab-complete.c | 5 +- src/include/catalog/pg_subscription.h | 4 + src/include/replication/logical.h | 2 + src/include/replication/pgoutput.h | 1 + .../replication/reorderbuffer_compression.h | 4 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 156 +++++++++--------- src/test/regress/sql/subscription.sql | 4 + 23 files changed, 308 insertions(+), 94 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 476f195622..44df09b854 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -228,8 +228,9 @@ ALTER SUBSCRIPTION name RENAME TO < disable_on_error, password_required, run_as_owner, - origin, and - failover. + origin, + failover, and + spill_compression. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 740b7d9421..56f733eaf8 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -428,6 +428,30 @@ CREATE SUBSCRIPTION subscription_name + + + spill_compression (enum) + + + Specifies whether the decoded changes that eventually need to be + temporarily written on disk by the publisher are compressed or not. + Default value is off meaning no data compression + involved. Setting spill_compression to + on or pglz means that the + decoded changes are compressed using the internal + PGLZ compression algorithm. + + + + If the PostgreSQL server running the + publisher node supports the external compression libraries + LZ4 or + Zstandard, + spill_compression can be set respectively to + lz4 or zstd. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9efc9159f2..a3329043dc 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -110,6 +110,12 @@ GetSubscription(Oid subid, bool missing_ok) /* Is the subscription owner a superuser? */ sub->ownersuperuser = superuser_arg(sub->owner); + /* Get spill_compression */ + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subspillcompression); + sub->spill_compression = TextDatumGetCString(datum); + ReleaseSysCache(tup); return sub; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 19cabc9a47..c84c283b9f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1356,7 +1356,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subslotname, subsynccommit, subpublications, suborigin, + subspillcompression) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 16d83b3253..b2d35eac53 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -39,6 +39,7 @@ #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/reorderbuffer_compression.h" #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -72,6 +73,7 @@ #define SUBOPT_FAILOVER 0x00002000 #define SUBOPT_LSN 0x00004000 #define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_SPILL_COMPRESSION 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -99,6 +101,7 @@ typedef struct SubOpts bool failover; char *origin; XLogRecPtr lsn; + char *spill_compression; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -161,6 +164,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_SPILL_COMPRESSION)) + opts->spill_compression = "off"; /* Parse options */ foreach(lc, stmt_options) @@ -366,6 +371,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_SPILL_COMPRESSION) && + strcmp(defel->defname, "spill_compression") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_SPILL_COMPRESSION)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_SPILL_COMPRESSION; + opts->spill_compression = defGetString(defel); + + ReorderBufferValidateCompressionMethod(opts->spill_compression, + ERROR); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -603,7 +620,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN | + SUBOPT_SPILL_COMPRESSION); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -723,6 +741,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subspillcompression - 1] = + CStringGetTextDatum(opts.spill_compression); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -1148,7 +1168,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | SUBOPT_SPILL_COMPRESSION); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1265,6 +1285,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_suborigin - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SPILL_COMPRESSION)) + { + values[Anum_pg_subscription_subspillcompression - 1] = + CStringGetTextDatum(opts.spill_compression); + replaces[Anum_pg_subscription_subspillcompression - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6c42c209d2..fda751da14 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -620,6 +620,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", binary 'true'"); + if (options->proto.logical.spill_compression && + PQserverVersion(conn->streamConn) >= 180000) + appendStringInfo(&cmd, ", spill_compression '%s'", + options->proto.logical.spill_compression); + appendStringInfoChar(&cmd, ')'); } else diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f8ef5d56d2..05eae65cca 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/reorderbuffer_compression.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/proc.h" @@ -298,6 +299,9 @@ StartupDecodingContext(List *output_plugin_options, ctx->fast_forward = fast_forward; + /* No spill files compression by default */ + ctx->spill_compression_method = REORDER_BUFFER_NO_COMPRESSION; + MemoryContextSwitchTo(old_context); return ctx; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 964a861bbf..328c26e994 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -404,6 +404,15 @@ ReorderBufferFree(ReorderBuffer *rb) ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); } +/* Returns spill files compression method */ +static inline uint8 +ReorderBufferSpillCompressionMethod(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->spill_compression_method; +} + /* * Get an unused, possibly preallocated, ReorderBufferTXN. */ @@ -425,7 +434,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) txn->command_id = InvalidCommandId; txn->output_plugin_private = NULL; txn->compressor_state = ReorderBufferNewCompressorState(rb->context, - logical_decoding_spill_compression); + ReorderBufferSpillCompressionMethod(rb)); return txn; } @@ -464,7 +473,7 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } ReorderBufferFreeCompressorState(rb->context, - logical_decoding_spill_compression, + ReorderBufferSpillCompressionMethod(rb), txn->compressor_state); /* Reset the toast hash */ @@ -3958,7 +3967,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* Inplace ReorderBuffer content compression before writing it on disk */ - ReorderBufferCompress(rb, &disk_hdr, logical_decoding_spill_compression, + ReorderBufferCompress(rb, &disk_hdr, ReorderBufferSpillCompressionMethod(rb), sz, txn->compressor_state); errno = 0; diff --git a/src/backend/replication/logical/reorderbuffer_compression.c b/src/backend/replication/logical/reorderbuffer_compression.c index 9bda286cb8..bf0d206095 100644 --- a/src/backend/replication/logical/reorderbuffer_compression.c +++ b/src/backend/replication/logical/reorderbuffer_compression.c @@ -922,3 +922,60 @@ ReorderBufferDecompress(ReorderBuffer *rb, char *data, break; } } + +/* + * According to a given compression method (as string representation), returns + * the corresponding ReorderBufferCompressionMethod + */ +ReorderBufferCompressionMethod +ReorderBufferParseCompressionMethod(const char *method) +{ + if (pg_strcasecmp(method, "on") == 0) + return REORDER_BUFFER_PGLZ_COMPRESSION; + else if (pg_strcasecmp(method, "pglz") == 0) + return REORDER_BUFFER_PGLZ_COMPRESSION; + else if (pg_strcasecmp(method, "off") == 0) + return REORDER_BUFFER_NO_COMPRESSION; +#ifdef USE_LZ4 + else if (pg_strcasecmp(method, "lz4") == 0) + return REORDER_BUFFER_LZ4_COMPRESSION; +#endif +#ifdef USE_ZSTD + else if (pg_strcasecmp(method, "zstd") == 0) + return REORDER_BUFFER_ZSTD_COMPRESSION; +#endif + else + return REORDER_BUFFER_INVALID_COMPRESSION; +} + +/* + * Check whether the passed compression method is valid and report errors at + * elevel. + * + * As this validation is intended to be executed on subscriber side, then we + * actually don't know if the server running the publisher supports external + * compression libraries. We only check if the compression method is + * potentially supported. The real validation is done by the publisher when + * the replication starts, an error is then triggered if the compression method + * is not supported. + */ +void +ReorderBufferValidateCompressionMethod(const char *method, int elevel) +{ + bool valid = false; + char methods[5][5] = {"on", "off", "pglz", "lz4", "zstd"}; + + for (int i = 0; i < 5; i++) + { + if (pg_strcasecmp(method, methods[i]) == 0) + { + valid = true; + break; + } + } + + if (!valid) + ereport(elevel, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("compression method \"%s\" not valid", method))); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c0bda6269b..dea3be009a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3929,7 +3929,8 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + strcmp(newsub->spill_compression, MySubscription->spill_compression) != 0) { if (am_parallel_apply_worker()) ereport(LOG, @@ -4377,6 +4378,16 @@ set_stream_options(WalRcvStreamOptions *options, MyLogicalRepWorker->parallel_apply = false; } + if (server_version >= 180000 && + MySubscription->stream == LOGICALREP_STREAM_OFF && + MySubscription->spill_compression != NULL) + { + options->proto.logical.spill_compression = + pstrdup(MySubscription->spill_compression); + } + else + options->proto.logical.spill_compression = NULL; + options->proto.logical.twophase = false; options->proto.logical.origin = pstrdup(MySubscription->origin); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index abef4eaf68..d9469f45f6 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -27,6 +27,7 @@ #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "replication/reorderbuffer_compression.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -283,11 +284,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool spill_compression_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; data->messages = false; data->two_phase = false; + data->spill_compression_method = REORDER_BUFFER_NO_COMPRESSION; foreach(lc, options) { @@ -396,6 +399,28 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "spill_compression") == 0) + { + uint8 method; + char *method_str; + + if (spill_compression_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + spill_compression_option_given = true; + + method_str = defGetString(defel); + method = ReorderBufferParseCompressionMethod(method_str); + + if (method == REORDER_BUFFER_INVALID_COMPRESSION) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid spill files compression method: \"%s\"", + method_str)); + + data->spill_compression_method = method; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -508,6 +533,9 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->publications = NIL; publications_valid = false; + /* Init spill files compression method */ + ctx->spill_compression_method = data->spill_compression_method; + /* * Register callback for pg_publication if we didn't already do that * during some previous call in this process. diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index b8b1888bd3..d8508bb684 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4760,6 +4760,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subspillcompression; int i, ntups; @@ -4832,10 +4833,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBuffer(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (fout->remoteVersion >= 180000) + appendPQExpBufferStr(query, + " s.subspillcompression\n"); + else + appendPQExpBuffer(query, + " 'off' AS subspillcompression\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -4875,6 +4883,7 @@ getSubscriptions(Archive *fout) i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn"); i_subenabled = PQfnumber(res, "subenabled"); i_subfailover = PQfnumber(res, "subfailover"); + i_subspillcompression = PQfnumber(res, "subspillcompression"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4921,6 +4930,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subenabled)); subinfo[i].subfailover = pg_strdup(PQgetvalue(res, i, i_subfailover)); + subinfo[i].subspillcompression = + pg_strdup(PQgetvalue(res, i, i_subspillcompression)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -5167,6 +5178,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (pg_strcasecmp(subinfo->suborigin, LOGICALREP_ORIGIN_ANY) != 0) appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin); + if (strcmp(subinfo->subspillcompression, "off") != 0) + appendPQExpBuffer(query, ", spill_compression = %s", subinfo->subspillcompression); + appendPQExpBufferStr(query, ");\n"); /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 4b2e5870a9..12588070f4 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -671,6 +671,7 @@ typedef struct _SubscriptionInfo char *suborigin; char *suboriginremotelsn; char *subfailover; + char *subspillcompression; } SubscriptionInfo; /* diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index d3dd8784d6..8fd71f5cf6 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2965,9 +2965,9 @@ my %tests = ( create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub2 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 - WITH (connect = false, origin = none);', + WITH (connect = false, origin = none, spill_compression = on);', regexp => qr/^ - \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none);\E + \QCREATE SUBSCRIPTION sub2 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub2', origin = none, spill_compression = on);\E /xm, like => { %full_runs, section_post_data => 1, }, }, @@ -2976,9 +2976,9 @@ my %tests = ( create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub3 CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 - WITH (connect = false, origin = any);', + WITH (connect = false, origin = any, spill_compression = pglz);', regexp => qr/^ - \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3');\E + \QCREATE SUBSCRIPTION sub3 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (connect = false, slot_name = 'sub3', spill_compression = pglz);\E /xm, like => { %full_runs, section_post_data => 1, }, }, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 7c9a1f234c..495a065849 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6539,7 +6539,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6619,6 +6619,11 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subskiplsn AS \"%s\"\n", gettext_noop("Skip LSN")); + + if (pset.sversion >= 180000) + appendPQExpBuffer(&buf, + ", subspillcompression AS \"%s\"\n", + gettext_noop("Spill files compression")); } /* Only display subscriptions in current database. */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..011ec52550 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "spill_compression", "streaming", "synchronous_commit"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3365,7 +3365,8 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "spill_compression", "streaming", "synchronous_commit", + "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0aa14ec4a2..61c284349c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -113,6 +113,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); + + /* Spill files compression algorithm */ + text subspillcompression BKI_FORCE_NOT_NULL; #endif } FormData_pg_subscription; @@ -157,6 +160,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char *spill_compression; /* Spill files compression algorithm */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index aff38e8d04..75c17866c3 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -112,6 +112,8 @@ typedef struct LogicalDecodingContext /* Do we need to process any change in fast_forward mode? */ bool processing_required; + /* Compression method used to compress spill files */ + uint8 spill_compression_method; } LogicalDecodingContext; diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 89f94e1147..eabcca62af 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + uint8 spill_compression_method; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/reorderbuffer_compression.h b/src/include/replication/reorderbuffer_compression.h index b668f33d5a..2f0d5b8bd9 100644 --- a/src/include/replication/reorderbuffer_compression.h +++ b/src/include/replication/reorderbuffer_compression.h @@ -26,6 +26,7 @@ /* ReorderBuffer on disk compression algorithms */ typedef enum ReorderBufferCompressionMethod { + REORDER_BUFFER_INVALID_COMPRESSION, REORDER_BUFFER_NO_COMPRESSION, REORDER_BUFFER_LZ4_COMPRESSION, REORDER_BUFFER_PGLZ_COMPRESSION, @@ -132,5 +133,8 @@ extern void ReorderBufferCompress(ReorderBuffer *rb, extern void ReorderBufferDecompress(ReorderBuffer *rb, char *data, ReorderBufferDiskHeader *header, void *compressor_state); +extern ReorderBufferCompressionMethod ReorderBufferParseCompressionMethod(const char *method); +extern void ReorderBufferValidateCompressionMethod(const char *method, + int elevel); #endif /* REORDERBUFFER_COMPRESSION_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 12f71fa99b..b027e4ce89 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -186,6 +186,7 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + char *spill_compression; /* Spill files compression algo */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 5c2f1ee517..b108ed3113 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 | off (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 | off (1 row) BEGIN; @@ -222,11 +222,15 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = local); ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = pglz); +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = off); +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = foobar); +ERROR: compression method "foobar" not valid \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+----------+------------------------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 | off (1 row) -- rename back to keep the rest simple @@ -255,19 +259,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +283,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) -- fail - publication already exists @@ -314,10 +318,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) -- fail - publication used more than once @@ -332,10 +336,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +375,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +387,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +400,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +416,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN | Spill files compression +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+----------+------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 | off (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 3e5ba4cb8c..2d891b2c06 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -140,6 +140,10 @@ ALTER SUBSCRIPTION regress_testsub RENAME TO regress_testsub_foo; ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = local); ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = pglz); +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = off); +ALTER SUBSCRIPTION regress_testsub_foo SET (spill_compression = foobar); + \dRs+ -- rename back to keep the rest simple -- 2.43.0