From 632505be60af48a8d9514e58d536f3acaff48550 Mon Sep 17 00:00:00 2001 From: Japin Li Date: Sun, 7 Mar 2021 12:56:55 +0000 Subject: [PATCH v8 1/4] Introduce a new syntax to add/drop publications At present, if we want to update publications in subscription, we can use SET PUBLICATION, however, it requires supply all publications that exists and the new publications if we want to add new publications, it's inconvenient. The new syntax only supply the new publications. When the refresh is true, it only refresh the new publications. --- src/backend/commands/subscriptioncmds.c | 153 ++++++++++++++++++++++++ src/backend/parser/gram.y | 20 ++++ src/include/nodes/parsenodes.h | 2 + 3 files changed, 175 insertions(+) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index bfd3514546..368ee36961 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -47,6 +47,7 @@ #include "utils/syscache.h" static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *merge_publications(List *oldpublist, List *newpublist, bool addpub); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -964,6 +965,53 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) break; } + case ALTER_SUBSCRIPTION_ADD_PUBLICATION: + case ALTER_SUBSCRIPTION_DROP_PUBLICATION: + { + bool copy_data = false; + bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; + bool refresh; + List *publist = NIL; + + publist = merge_publications(sub->publications, stmt->publication, isadd); + + parse_subscription_options(stmt->options, + NULL, /* no "connect" */ + NULL, NULL, /* no "enabled" */ + NULL, /* no "create_slot" */ + NULL, NULL, /* no "slot_name" */ + isadd ? ©_data : NULL, /* for drop, no "copy_data" */ + NULL, /* no "synchronous_commit" */ + &refresh, + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ + + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(publist); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Refresh if user asked us to. */ + if (refresh) + { + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); + + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); + + /* Only refresh the added/dropped list of publications. */ + sub->publications = stmt->publication; + + AlterSubscription_refresh(sub, copy_data); + } + + break; + } + case ALTER_SUBSCRIPTION_REFRESH: { bool copy_data; @@ -1551,3 +1599,108 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err) errhint("Use %s to disassociate the subscription from the slot.", "ALTER SUBSCRIPTION ... SET (slot_name = NONE)"))); } + +/* + * Merge current subscription's publications and user specified publications + * by ADD/DROP PUBLICATIONS. + * + * If addpub is true, we will add the list of publications into oldpublist. + * Otherwise, we will delete the list of publications from oldpublist. + */ +static List * +merge_publications(List *oldpublist, List *newpublist, bool addpub) +{ + StringInfoData errstr; + int errstrcnt = 0; + ListCell *lc; + + foreach(lc, newpublist) + { + char *name = strVal(lfirst(lc)); + ListCell *plc; + + /* Check for duplicates. */ + foreach(plc, newpublist) + { + char *pname = strVal(lfirst(plc)); + + if (plc == lc) + break; + + if (strcmp(name, pname) == 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("publication name \"%s\" used more than once", + pname))); + } + } + + initStringInfo(&errstr); + + foreach(lc, newpublist) + { + char *name = strVal(lfirst(lc)); + ListCell *cell = NULL; + + foreach(cell, oldpublist) + { + char *pubname = strVal(lfirst(cell)); + + if (strcmp(name, pubname) == 0) + { + if (addpub) + { + errstrcnt++; + + if (errstrcnt == 1) + appendStringInfo(&errstr, _("\"%s\""), name); + else + appendStringInfo(&errstr, _(", \"%s\""), name); + } + else + oldpublist = list_delete_cell(oldpublist, cell); + + break; + } + } + + if (addpub && cell == NULL) + oldpublist = lappend(oldpublist, makeString(name)); + else if (!addpub && cell == NULL) + { + errstrcnt++; + + if (errstrcnt == 1) + appendStringInfo(&errstr, _("\"%s\""), name); + else + appendStringInfo(&errstr, _(", \"%s\""), name); + } + } + + if (errstrcnt >= 1) + { + if (addpub) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg_plural("publication %s is already present in the subscription", + "publications %s are already present in the subscription", + errstrcnt, errstr.data))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg_plural("publication %s doesn't exist in the subscription", + "publications %s do not exist in the subscription", + errstrcnt, errstr.data))); + } + } + + if (oldpublist == NIL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("subscription must contain at least one publication"))); + + return oldpublist; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index bc43641ffe..ab0468520c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9640,6 +9640,26 @@ AlterSubscriptionStmt: n->options = $6; $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; + $$ = (Node *)n; + } | ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition { AlterSubscriptionStmt *n = diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 68425eb2c0..83cb7d9fe4 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3593,6 +3593,8 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_PUBLICATION, + ALTER_SUBSCRIPTION_ADD_PUBLICATION, + ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_ENABLED } AlterSubscriptionType; -- 2.25.1