From 0b2b9a774fc636807f9515eecc2bf01476415d37 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Mon, 8 May 2017 23:53:02 +0200 Subject: [PATCH] Remove the NODROP SLOT option from DROP SUBSCRIPTION --- doc/src/sgml/ref/create_subscription.sgml | 8 + doc/src/sgml/ref/drop_subscription.sgml | 22 +-- src/backend/catalog/pg_subscription.c | 9 +- src/backend/commands/subscriptioncmds.c | 195 ++++++++++++++++----- src/backend/nodes/copyfuncs.c | 2 +- src/backend/nodes/equalfuncs.c | 2 +- src/backend/parser/gram.y | 43 ++--- src/backend/replication/logical/worker.c | 35 ++-- src/backend/tcop/utility.c | 3 + src/include/catalog/pg_subscription.h | 2 +- src/include/nodes/parsenodes.h | 2 +- .../dummy_seclabel/expected/dummy_seclabel.out | 4 +- .../modules/dummy_seclabel/sql/dummy_seclabel.sql | 4 +- src/test/regress/expected/object_address.out | 4 +- src/test/regress/expected/subscription.out | 13 +- src/test/regress/sql/object_address.sql | 4 +- src/test/regress/sql/subscription.sql | 12 +- src/test/subscription/t/001_rep_changes.pl | 2 +- src/test/subscription/t/004_sync.pl | 8 +- 19 files changed, 242 insertions(+), 132 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 3c51012..c22bb20 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -133,6 +133,14 @@ CREATE SUBSCRIPTION subscription_namesubscription_name for slot name. + + + When SLOT NAME is set to + NONE there will be no slot associated with the + subscription. Such subscriptions must also have both + ENABLED and CREATE SLOT set + to false. + diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml index f1ac125..8b00c45 100644 --- a/doc/src/sgml/ref/drop_subscription.sgml +++ b/doc/src/sgml/ref/drop_subscription.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -DROP SUBSCRIPTION [ IF EXISTS ] name [ DROP SLOT | NODROP SLOT ] +DROP SUBSCRIPTION [ IF EXISTS ] name [ CASCADE | RESTRICT ] @@ -57,20 +57,16 @@ DROP SUBSCRIPTION [ IF EXISTS ] name - DROP SLOT - NODROP SLOT - - - Specifies whether to drop the replication slot on the publisher. The - default is - DROP SLOT. - + CASCADE + RESTRICT + - If the publisher is not reachable when the subscription is to be - dropped, then it is useful to specify NODROP SLOT. - But the replication slot on the publisher will then have to be removed - manually. + These key words are used to determine what to do with when there is a + replication slot associated with the subscription. The + RESTRICT will refuse to drop the subscription in + such case, while CASCADE will drop the associated + slot. RESTRICT is the default. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 22587a4..7dc21f1 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -82,8 +82,10 @@ GetSubscription(Oid subid, bool missing_ok) tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + sub->slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + sub->slotname = NULL; /* Get synccommit */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, @@ -147,7 +149,8 @@ FreeSubscription(Subscription *sub) { pfree(sub->name); pfree(sub->conninfo); - pfree(sub->slotname); + if (sub->slotname) + pfree(sub->slotname); list_free_deep(sub->publications); pfree(sub); } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index fde9e6e..ba0d4b8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,7 +60,8 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); */ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name, + bool *enabled, bool *create_slot, + bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit) { ListCell *lc; @@ -78,7 +79,10 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (create_slot) *create_slot = true; if (slot_name) + { + *slot_name_given = false; *slot_name = NULL; + } if (copy_data) *copy_data = true; if (synchronous_commit) @@ -141,12 +145,17 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, } else if (strcmp(defel->defname, "slot name") == 0 && slot_name) { - if (*slot_name) + if (*slot_name_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + *slot_name_given = true; *slot_name = defGetString(defel); + + /* Setting slot_name = NONE is treated as no slot name. */ + if (strcmp(*slot_name, "none") == 0) + *slot_name = NULL; } else if (strcmp(defel->defname, "copy data") == 0 && copy_data) { @@ -194,26 +203,43 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, if (connect && !*connect) { /* Check for incompatible options from the user. */ - if (*enabled_given && *enabled) + if (enabled && *enabled_given && *enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("noconnect and enabled are mutually exclusive options"))); + errmsg("connect = false and enabled are mutually exclusive options"))); - if (create_slot_given && *create_slot) + if (create_slot && create_slot_given && *create_slot) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("noconnect and create slot are mutually exclusive options"))); + errmsg("connect = false and create_slot are mutually exclusive options"))); - if (copy_data_given && *copy_data) + if (copy_data && copy_data_given && *copy_data) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("noconnect and copy data are mutually exclusive options"))); + errmsg("connect = false and copy_data are mutually exclusive options"))); /* Change the defaults of other options. */ *enabled = false; *create_slot = false; *copy_data = false; } + + /* + * Do additional checking for disallowed combination when + * slot_name = NONE was used. + */ + if (slot_name && *slot_name_given && !*slot_name) + { + if (enabled && *enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and enabled are mutually exclusive options"))); + + if (create_slot && create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name = NONE and create_slot are mutually exclusive options"))); + } } /* @@ -290,6 +316,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char *synchronous_commit; char *conninfo; char *slotname; + bool slotname_given; char originname[NAMEDATALEN]; bool create_slot; List *publications; @@ -299,8 +326,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data, - &synchronous_commit); + &enabled, &create_slot, &slotname_given, + &slotname, ©_data, &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -329,8 +356,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) stmt->subname))); } - if (slotname == NULL) + if (!slotname_given && slotname == NULL) slotname = stmt->subname; + /* The default for synchronous_commit of subscriptions is off. */ if (synchronous_commit == NULL) synchronous_commit = "off"; @@ -355,8 +383,11 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slotname)); + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; values[Anum_pg_subscription_subsynccommit - 1] = CStringGetTextDatum(synchronous_commit); values[Anum_pg_subscription_subpublications - 1] = @@ -426,6 +457,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ if (create_slot) { + Assert(slotname); + walrcv_create_slot(wrconn, slotname, false, CRS_NOEXPORT_SNAPSHOT, &lsn); ereport(NOTICE, @@ -578,6 +611,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) HeapTuple tup; Oid subid; bool update_tuple = false; + Subscription *sub; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -597,6 +631,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) stmt->subname); subid = HeapTupleGetOid(tup); + sub = GetSubscription(subid, false); /* Form a new tuple. */ memset(values, 0, sizeof(values)); @@ -607,19 +642,29 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; - char *synchronous_commit; + char *slotname; + bool slotname_given; + char *synchronous_commit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL, - &synchronous_commit); + NULL, &slotname_given, &slotname, + NULL, &synchronous_commit); - if (slot_name) + if (slotname_given) { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + if (sub->enabled && !slotname) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot set slot_name = NONE for enabled subscription"))); + + if (slotname) + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slotname)); + else + nulls[Anum_pg_subscription_subslotname - 1] = true; replaces[Anum_pg_subscription_subslotname - 1] = true; } + if (synchronous_commit) { values[Anum_pg_subscription_subsynccommit - 1] = @@ -638,9 +683,14 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); Assert(enabled_given); + if (!sub->slotname && enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot enable subscription which does not have a slot_name"))); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -668,10 +718,10 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -682,6 +732,11 @@ AlterSubscription(AlterSubscriptionStmt *stmt) /* Refresh if user asked us to. */ if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -694,10 +749,15 @@ AlterSubscription(AlterSubscriptionStmt *stmt) case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; - Subscription *sub = GetSubscription(subid, false); + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data, NULL); + NULL, NULL, NULL, ©_data, + NULL); AlterSubscription_refresh(sub, copy_data); @@ -756,8 +816,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * run DROP SUBSCRIPTION inside a transaction block if dropping the * replication slot. */ - if (stmt->drop_slot) - PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT"); + if (stmt->behavior == DROP_CASCADE) + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... CASCADE"); /* * Lock pg_subscription with AccessExclusiveLock to ensure @@ -817,8 +877,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subslotname, &isnull); - Assert(!isnull); - slotname = pstrdup(NameStr(*DatumGetName(datum))); + if (!isnull) + slotname = pstrdup(NameStr(*DatumGetName(datum))); + else + slotname = NULL; ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); @@ -843,43 +905,84 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) if (originid != InvalidRepOriginId) replorigin_drop(originid); - /* If the user asked to not drop the slot, we are done mow.*/ - if (!stmt->drop_slot) + /* If there is no slot associated with subscription we can finish here. */ + if (!slotname) { heap_close(rel, NoLock); return; } /* - * Otherwise drop the replication slot at the publisher node using + * Otherwise check for the replication slot at the publisher node using * the replication connection. */ load_file("libpqwalreceiver", false); - initStringInfo(&cmd); - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname); - wrconn = walrcv_connect(conninfo, true, subname, &err); if (wrconn == NULL) ereport(ERROR, - (errmsg("could not connect to publisher when attempting to " - "drop the replication slot \"%s\"", slotname), - errdetail("The error was: %s", err))); + (errmsg("could not connect to publisher when attempting to fetch " + "information about replication slot \"%s\"", slotname), + errdetail("The error was: %s", err), + errhint("Use ALTER SUBSCRIPTION ... WITH (slot_name = NONE) " + "to disassociate the subscription from slot."))); PG_TRY(); { WalRcvExecResult *res; - res = walrcv_exec(wrconn, cmd.data, 0, NULL); - - if (res->status != WALRCV_OK_COMMAND) + TupleTableSlot *tupslot; + Oid slot_row_desc[1] = {BOOLOID}; + bool found; + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT true FROM pg_catalog.pg_replication_slots WHERE slot_name = %s", + quote_literal_cstr(slotname)); + res = walrcv_exec(wrconn, cmd.data, 1, slot_row_desc); + if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, - (errmsg("could not drop the replication slot \"%s\" on publisher", - slotname), + (errmsg("could not fetch the replication slot info from publisher"), errdetail("The error was: %s", res->err))); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc); + found = tuplestore_gettupleslot(res->tuplestore, true, false, + tupslot); + ExecDropSingleTupleTableSlot(tupslot); + walrcv_clear_result(res); + + /* If slot was not found on publisher, we are done. */ + if (!found) + { + walrcv_disconnect(wrconn); + pfree(cmd.data); + heap_close(rel, NoLock); + return; + } + + /* Otherwise the next action depends on the drop_behavior. */ + if (stmt->behavior == DROP_CASCADE) + { + resetStringInfo(&cmd); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", + quote_identifier(slotname)); + res = walrcv_exec(wrconn, cmd.data, 0, NULL); + + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("could not drop the replication slot \"%s\" on publisher", + slotname), + errdetail("The error was: %s", res->err))); + else + ereport(NOTICE, + (errmsg("dropped replication slot \"%s\" on publisher", + slotname))); + } else - ereport(NOTICE, - (errmsg("dropped replication slot \"%s\" on publisher", - slotname))); + ereport(ERROR, + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop subscription \"%s\" because there is still replication slot associated with it", + subname), + errhint("Use DROP ... CASCADE to drop the slot too."))); walrcv_clear_result(res); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 35a237a..2d2a9d0 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4537,8 +4537,8 @@ _copyDropSubscriptionStmt(const DropSubscriptionStmt *from) DropSubscriptionStmt *newnode = makeNode(DropSubscriptionStmt); COPY_STRING_FIELD(subname); - COPY_SCALAR_FIELD(drop_slot); COPY_SCALAR_FIELD(missing_ok); + COPY_SCALAR_FIELD(behavior); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 21dfbb0..b5459cd 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2246,8 +2246,8 @@ _equalDropSubscriptionStmt(const DropSubscriptionStmt *a, const DropSubscriptionStmt *b) { COMPARE_STRING_FIELD(subname); - COMPARE_SCALAR_FIELD(drop_slot); COMPARE_SCALAR_FIELD(missing_ok); + COMPARE_SCALAR_FIELD(behavior); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2cad8b2..2537f84 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -415,7 +415,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type arg_class %type func_return func_type -%type opt_trusted opt_restart_seqs opt_drop_slot +%type opt_trusted opt_restart_seqs %type OptTemp %type OptNoLog %type OnCommitOption @@ -467,7 +467,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr AexprConst indirection_el opt_slice_bound columnref in_expr having_clause func_table xmltable array_expr - ExclusionWhereClause + ExclusionWhereClause operator_def_arg %type rowsfrom_item rowsfrom_list opt_col_def_list %type opt_ordinality %type ExclusionConstraintList ExclusionConstraintElem @@ -5694,6 +5694,7 @@ def_arg: func_type { $$ = (Node *)$1; } | qual_all_Op { $$ = (Node *)$1; } | NumericOnly { $$ = (Node *)$1; } | Sconst { $$ = (Node *)makeString($1); } + | NONE { $$ = (Node *)makeString(pstrdup($1)); } ; old_aggr_definition: '(' old_aggr_list ')' { $$ = $2; } @@ -8933,8 +8934,16 @@ operator_def_list: operator_def_elem { $$ = list_make1($1); } operator_def_elem: ColLabel '=' NONE { $$ = makeDefElem($1, NULL, @1); } - | ColLabel '=' def_arg - { $$ = makeDefElem($1, (Node *) $3, @1); } + | ColLabel '='operator_def_arg + { $$ = makeDefElem($1, (Node *) $3, @1); } + ; + +operator_def_arg: + func_type { $$ = (Node *)$1; } + | reserved_keyword { $$ = (Node *)makeString(pstrdup($1)); } + | qual_all_Op { $$ = (Node *)$1; } + | NumericOnly { $$ = (Node *)$1; } + | Sconst { $$ = (Node *)makeString($1); } ; /***************************************************************************** @@ -9324,42 +9333,24 @@ AlterSubscriptionStmt: * *****************************************************************************/ -DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_slot +DropSubscriptionStmt: DROP SUBSCRIPTION name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $3; - n->drop_slot = $4; n->missing_ok = false; + n->behavior = $4; $$ = (Node *) n; } - | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_slot + | DROP SUBSCRIPTION IF_P EXISTS name opt_drop_behavior { DropSubscriptionStmt *n = makeNode(DropSubscriptionStmt); n->subname = $5; - n->drop_slot = $6; n->missing_ok = true; + n->behavior = $6; $$ = (Node *) n; } ; -opt_drop_slot: - DROP SLOT - { - $$ = TRUE; - } - | IDENT SLOT - { - if (strcmp($1, "nodrop") == 0) - $$ = FALSE; - else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized option \"%s\"", $1), - parser_errposition(@1))); - } - | /*EMPTY*/ { $$ = TRUE; } - ; - /***************************************************************************** * * QUERY: Define Rewrite Rule diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a61240c..362de12 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1329,6 +1329,22 @@ reread_subscription(void) } /* + * Exit if the subscription was disabled. + * This normally should not happen as the worker gets killed + * during ALTER SUBSCRIPTION ... DISABLE. + */ + if (!newsub->enabled) + { + ereport(LOG, + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + /* * Exit if connection string was changed. The launcher will start * new worker. */ @@ -1358,6 +1374,9 @@ reread_subscription(void) proc_exit(0); } + /* !slotname should never happen when enabled is true. */ + Assert(newsub->slotname); + /* * We need to make new connection to new slot if slot name has changed * so exit here as well if that's the case. @@ -1388,22 +1407,6 @@ reread_subscription(void) proc_exit(0); } - /* - * Exit if the subscription was disabled. - * This normally should not happen as the worker gets killed - * during ALTER SUBSCRIPTION ... DISABLE. - */ - if (!newsub->enabled) - { - ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); - - walrcv_disconnect(wrconn); - proc_exit(0); - } - /* Check for other changes that should never happen too. */ if (newsub->dbid != MySubscription->dbid) { diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 24e5c42..d4fa5a7 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -2303,6 +2303,9 @@ CreateCommandTag(Node *parsetree) case OBJECT_PUBLICATION: tag = "DROP PUBLICATION"; break; + case OBJECT_SUBSCRIPTION: + tag = "DROP SUBSCRIPTION"; + break; case OBJECT_STATISTIC_EXT: tag = "DROP STATISTICS"; break; diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 5550f19..d4f3979 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -45,7 +45,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE text subconninfo BKI_FORCE_NOT_NULL; /* Slot name on publisher */ - NameData subslotname BKI_FORCE_NOT_NULL; + NameData subslotname; /* Synchronous commit setting for worker */ text subsynccommit BKI_FORCE_NOT_NULL; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e1d454a..46c23c2 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3393,8 +3393,8 @@ typedef struct DropSubscriptionStmt { NodeTag type; char *subname; /* Name of of the subscription */ - bool drop_slot; /* Should we drop the slot on remote side? */ bool missing_ok; /* Skip error if missing? */ + DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; #endif /* PARSENODES_H */ diff --git a/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out b/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out index 27c8ec5..5f37681 100644 --- a/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out +++ b/src/test/modules/dummy_seclabel/expected/dummy_seclabel.out @@ -69,7 +69,7 @@ CREATE SCHEMA dummy_seclabel_test; SECURITY LABEL ON SCHEMA dummy_seclabel_test IS 'unclassified'; -- OK SET client_min_messages = error; CREATE PUBLICATION dummy_pub; -CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (NOCONNECT); +CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (NOCONNECT, SLOT NAME = NONE); RESET client_min_messages; SECURITY LABEL ON PUBLICATION dummy_pub IS 'classified'; SECURITY LABEL ON SUBSCRIPTION dummy_sub IS 'classified'; @@ -111,7 +111,7 @@ NOTICE: event ddl_command_end: SECURITY LABEL DROP EVENT TRIGGER always_start, always_end, always_drop, always_rewrite; DROP VIEW dummy_seclabel_view1; DROP TABLE dummy_seclabel_tbl1, dummy_seclabel_tbl2; -DROP SUBSCRIPTION dummy_sub NODROP SLOT; +DROP SUBSCRIPTION dummy_sub; DROP PUBLICATION dummy_pub; DROP ROLE regress_dummy_seclabel_user1; DROP ROLE regress_dummy_seclabel_user2; diff --git a/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql b/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql index 8d43244..97311c7 100644 --- a/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql +++ b/src/test/modules/dummy_seclabel/sql/dummy_seclabel.sql @@ -73,7 +73,7 @@ SECURITY LABEL ON SCHEMA dummy_seclabel_test IS 'unclassified'; -- OK SET client_min_messages = error; CREATE PUBLICATION dummy_pub; -CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (NOCONNECT); +CREATE SUBSCRIPTION dummy_sub CONNECTION '' PUBLICATION foo WITH (NOCONNECT, SLOT NAME = NONE); RESET client_min_messages; SECURITY LABEL ON PUBLICATION dummy_pub IS 'classified'; SECURITY LABEL ON SUBSCRIPTION dummy_sub IS 'classified'; @@ -108,7 +108,7 @@ DROP EVENT TRIGGER always_start, always_end, always_drop, always_rewrite; DROP VIEW dummy_seclabel_view1; DROP TABLE dummy_seclabel_tbl1, dummy_seclabel_tbl2; -DROP SUBSCRIPTION dummy_sub NODROP SLOT; +DROP SUBSCRIPTION dummy_sub; DROP PUBLICATION dummy_pub; DROP ROLE regress_dummy_seclabel_user1; diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out index 814e05e..40eeeed 100644 --- a/src/test/regress/expected/object_address.out +++ b/src/test/regress/expected/object_address.out @@ -37,7 +37,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( FROM SQL WITH FUNCTION varchar_transform(internal), TO SQL WITH FUNCTION int4recv(internal)); CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; -CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (DISABLED, NOCONNECT); +CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (NOCONNECT, SLOT NAME = NONE); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables CREATE STATISTICS addr_nsp.gentable_stat ON (a,b) FROM addr_nsp.gentable; -- test some error cases @@ -477,7 +477,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, SET client_min_messages TO 'warning'; DROP FOREIGN DATA WRAPPER addr_fdw CASCADE; DROP PUBLICATION addr_pub; -DROP SUBSCRIPTION addr_sub NODROP SLOT; +DROP SUBSCRIPTION addr_sub; DROP SCHEMA addr_nsp CASCADE; DROP OWNED BY regress_addr_user; DROP USER regress_addr_user; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index fd09f54..154e913 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -113,17 +113,18 @@ HINT: The owner of a subscription must be a superuser. ALTER ROLE regress_subscription_user2 SUPERUSER; -- now it works ALTER SUBSCRIPTION testsub OWNER TO regress_subscription_user2; --- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block +ALTER SUBSCRIPTION testsub WITH (SLOT NAME = NONE); +-- fail - cannot do DROP SUBSCRIPTION CASCADE inside transaction block BEGIN; -DROP SUBSCRIPTION testsub DROP SLOT; -ERROR: DROP SUBSCRIPTION ... DROP SLOT cannot run inside a transaction block +DROP SUBSCRIPTION testsub CASCADE; +ERROR: DROP SUBSCRIPTION ... CASCADE cannot run inside a transaction block COMMIT; BEGIN; -DROP SUBSCRIPTION testsub NODROP SLOT; +DROP SUBSCRIPTION testsub; COMMIT; -DROP SUBSCRIPTION IF EXISTS testsub NODROP SLOT; +DROP SUBSCRIPTION IF EXISTS testsub; NOTICE: subscription "testsub" does not exist, skipping -DROP SUBSCRIPTION testsub NODROP SLOT; -- fail +DROP SUBSCRIPTION testsub; -- fail ERROR: subscription "testsub" does not exist RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql index c9219e4..6940392 100644 --- a/src/test/regress/sql/object_address.sql +++ b/src/test/regress/sql/object_address.sql @@ -40,7 +40,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( FROM SQL WITH FUNCTION varchar_transform(internal), TO SQL WITH FUNCTION int4recv(internal)); CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; -CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (DISABLED, NOCONNECT); +CREATE SUBSCRIPTION addr_sub CONNECTION '' PUBLICATION bar WITH (NOCONNECT, SLOT NAME = NONE); CREATE STATISTICS addr_nsp.gentable_stat ON (a,b) FROM addr_nsp.gentable; -- test some error cases @@ -205,7 +205,7 @@ SET client_min_messages TO 'warning'; DROP FOREIGN DATA WRAPPER addr_fdw CASCADE; DROP PUBLICATION addr_pub; -DROP SUBSCRIPTION addr_sub NODROP SLOT; +DROP SUBSCRIPTION addr_sub; DROP SCHEMA addr_nsp CASCADE; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index db05f52..e38a33a 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -83,17 +83,19 @@ ALTER ROLE regress_subscription_user2 SUPERUSER; -- now it works ALTER SUBSCRIPTION testsub OWNER TO regress_subscription_user2; --- fail - cannot do DROP SUBSCRIPTION DROP SLOT inside transaction block +ALTER SUBSCRIPTION testsub WITH (SLOT NAME = NONE); + +-- fail - cannot do DROP SUBSCRIPTION CASCADE inside transaction block BEGIN; -DROP SUBSCRIPTION testsub DROP SLOT; +DROP SUBSCRIPTION testsub CASCADE; COMMIT; BEGIN; -DROP SUBSCRIPTION testsub NODROP SLOT; +DROP SUBSCRIPTION testsub; COMMIT; -DROP SUBSCRIPTION IF EXISTS testsub NODROP SLOT; -DROP SUBSCRIPTION testsub NODROP SLOT; -- fail +DROP SUBSCRIPTION IF EXISTS testsub; +DROP SUBSCRIPTION testsub; -- fail RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index d1817f5..aa150a9 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -191,7 +191,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart"; # check all the cleanup -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed DROP SLOT"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed CASCADE"); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index fa0bf7f..7f309ae 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -52,7 +52,7 @@ my $result = is($result, qq(10), 'initial data synced for first sub'); # drop subscription so that there is unreplicated data -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_rep SELECT generate_series(11,20)"); @@ -89,8 +89,8 @@ $node_subscriber->poll_query_until('postgres', "SELECT pid IS NOT NULL FROM pg_s or die "Timed out while waiting for subscriber to start"; # and drop both subscriptions -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2 CASCADE"); # check subscriptions are removed $result = @@ -154,7 +154,7 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); is($result, qq(20), 'changes for table added after subscription initialized replicated'); -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub CASCADE"); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.7.4