From f7bc02999302c5c714672c8b904a21982e597483 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Thu, 20 Jul 2023 14:24:25 +0530 Subject: [PATCH v1] Remove list_slots command Now list_sots has been replaced by a select query. --- .../libpqwalreceiver/libpqwalreceiver.c | 20 +- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/repl_gram.y | 32 +-- src/backend/replication/repl_scanner.l | 2 - src/backend/replication/walsender.c | 195 ------------------ src/include/nodes/replnodes.h | 10 - 6 files changed, 16 insertions(+), 245 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0e13cc2417..6580b3ef5b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -425,7 +425,11 @@ libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names) WalRecvReplicationSlotData *slot_data; initStringInfo(&s); - appendStringInfoString(&s, "LIST_SLOTS"); + appendStringInfo(&s, + "SELECT slot_name, plugin, slot_type, " + "datoid, database, temporary, xmin, " + "catalog_xmin, restart_lsn, confirmed_flush_lsn " + "FROM pg_catalog.pg_replication_slots"); if (strcmp(slot_names, "") != 0 && strcmp(slot_names, "*") != 0) { @@ -433,16 +437,18 @@ libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names) List *namelist; ListCell *lc; - appendStringInfoChar(&s, ' '); rawname = pstrdup(slot_names); SplitIdentifierString(rawname, ',', &namelist); - foreach (lc, namelist) + + appendStringInfoString(&s, " AND slot_name IN ("); + foreach(lc, namelist) { if (lc != list_head(namelist)) appendStringInfoChar(&s, ','); appendStringInfo(&s, "%s", - quote_identifier(lfirst(lc))); + quote_literal_cstr(lfirst(lc))); } + appendStringInfoChar(&s, ')'); } res = libpqrcv_PQexec(conn->streamConn, s.data); @@ -482,10 +488,12 @@ libpqrcv_list_slots(WalReceiverConn *conn, const char *slot_names) if (OidIsValid(slot_data->persistent_data.database)) elog(ERROR, "unexpected physical replication slot with database set"); } - if (pg_strtoint32(PQgetvalue(res, i, 5)) == 1) + + if (strcmp(PQgetvalue(res, i, 5), "t") == 0) slot_data->persistent_data.persistency = RS_TEMPORARY; - else + else if (strcmp(PQgetvalue(res, i, 5), "f") == 0) slot_data->persistent_data.persistency = RS_PERSISTENT; + if (!PQgetisnull(res, i, 6)) slot_data->persistent_data.xmin = atooid(PQgetvalue(res, i, 6)); if (!PQgetisnull(res, i, 7)) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 640f7647cc..0bf9ed8504 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1117,7 +1117,7 @@ ApplyLauncherStartSlotSync(long *wait_time) if (strcmp(synchronize_slot_names, "") == 0) return; - wrconn = walrcv_connect(PrimaryConnInfo, false, false, + wrconn = walrcv_connect(PrimaryConnInfo, true, false, "Logical Replication Launcher", &err); if (!wrconn) ereport(ERROR, diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 12a4b74368..0c874e33cf 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -76,12 +76,11 @@ Node *replication_parse_result; %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT -%token K_LIST_SLOTS %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show list_slots + read_replication_slot timeline_history show %type generic_option_list %type generic_option %type opt_timeline @@ -92,7 +91,6 @@ Node *replication_parse_result; %type opt_temporary %type create_slot_options create_slot_legacy_opt_list %type create_slot_legacy_opt -%type slot_name_list slot_name_list_opt %% @@ -116,7 +114,6 @@ command: | read_replication_slot | timeline_history | show - | list_slots ; /* @@ -129,33 +126,6 @@ identify_system: } ; -slot_name_list: - IDENT - { - $$ = list_make1($1); - } - | slot_name_list ',' IDENT - { - $$ = lappend($1, $3); - } - -slot_name_list_opt: - slot_name_list { $$ = $1; } - | /* EMPTY */ { $$ = NIL; } - ; - -/* - * LIST_SLOTS - */ -list_slots: - K_LIST_SLOTS slot_name_list_opt - { - ListSlotsCmd *cmd = makeNode(ListSlotsCmd); - cmd->slot_names = $2; - $$ = (Node *) cmd; - } - ; - /* * READ_REPLICATION_SLOT %s */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 11064feb86..1cc7fb858c 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -128,7 +128,6 @@ DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } -LIST_SLOTS { return K_LIST_SLOTS; } LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } TEMPORARY { return K_TEMPORARY; } @@ -305,7 +304,6 @@ replication_scanner_is_replication_command(void) case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: - case K_LIST_SLOTS: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; return true; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 26d07ae549..d27ef2985d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -473,194 +473,6 @@ IdentifySystem(void) end_tup_output(tstate); } -static int -pg_qsort_namecmp(const void *a, const void *b) -{ - return strncmp(NameStr(*(Name) a), NameStr(*(Name) b), NAMEDATALEN); -} - -/* - * Handle the LIST_SLOTS command. - */ -static void -ListSlots(ListSlotsCmd *cmd) -{ - DestReceiver *dest; - TupOutputState *tstate; - TupleDesc tupdesc; - NameData *slot_names; - int numslot_names; - - numslot_names = list_length(cmd->slot_names); - if (numslot_names) - { - ListCell *lc; - int i = 0; - - slot_names = palloc(numslot_names * sizeof(NameData)); - foreach(lc, cmd->slot_names) - { - char *slot_name = lfirst(lc); - - ReplicationSlotValidateName(slot_name, ERROR); - namestrcpy(&slot_names[i++], slot_name); - } - - qsort(slot_names, numslot_names, sizeof(NameData), pg_qsort_namecmp); - } - - dest = CreateDestReceiver(DestRemoteSimple); - - /* need a tuple descriptor representing four columns */ - tupdesc = CreateTemplateTupleDesc(10); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", - TEXTOID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "plugin", - TEXTOID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "slot_type", - TEXTOID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "datoid", - INT8OID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 5, "database", - TEXTOID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 6, "temporary", - INT4OID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 7, "xmin", - INT8OID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 8, "catalog_xmin", - INT8OID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 9, "restart_lsn", - TEXTOID, -1, 0); - TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 10, "confirmed_flush", - TEXTOID, -1, 0); - - /* prepare for projection of tuples */ - tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int slotno = 0; slotno < max_replication_slots; slotno++) - { - ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; - char restart_lsn_str[MAXFNAMELEN]; - char confirmed_flush_lsn_str[MAXFNAMELEN]; - Datum values[10]; - bool nulls[10]; - - ReplicationSlotPersistency persistency; - TransactionId xmin; - TransactionId catalog_xmin; - XLogRecPtr restart_lsn; - XLogRecPtr confirmed_flush_lsn; - Oid datoid; - NameData slot_name; - NameData plugin; - int i; - int64 tmpbigint; - - if (!slot->in_use) - continue; - - SpinLockAcquire(&slot->mutex); - - xmin = slot->data.xmin; - catalog_xmin = slot->data.catalog_xmin; - datoid = slot->data.database; - restart_lsn = slot->data.restart_lsn; - confirmed_flush_lsn = slot->data.confirmed_flush; - namestrcpy(&slot_name, NameStr(slot->data.name)); - namestrcpy(&plugin, NameStr(slot->data.plugin)); - persistency = slot->data.persistency; - - SpinLockRelease(&slot->mutex); - - if (numslot_names && - !bsearch((void *) &slot_name, (void *) slot_names, - numslot_names, sizeof(NameData), pg_qsort_namecmp)) - continue; - - memset(nulls, 0, sizeof(nulls)); - - i = 0; - values[i++] = CStringGetTextDatum(NameStr(slot_name)); - - if (datoid == InvalidOid) - nulls[i++] = true; - else - values[i++] = CStringGetTextDatum(NameStr(plugin)); - - if (datoid == InvalidOid) - values[i++] = CStringGetTextDatum("physical"); - else - values[i++] = CStringGetTextDatum("logical"); - - if (datoid == InvalidOid) - nulls[i++] = true; - else - { - tmpbigint = datoid; - values[i++] = Int64GetDatum(tmpbigint); - } - - if (datoid == InvalidOid) - nulls[i++] = true; - else - { - MemoryContext cur = CurrentMemoryContext; - - /* syscache access needs a transaction env. */ - StartTransactionCommand(); - /* make dbname live outside TX context */ - MemoryContextSwitchTo(cur); - values[i++] = CStringGetTextDatum(get_database_name(datoid)); - CommitTransactionCommand(); - /* CommitTransactionCommand switches to TopMemoryContext */ - MemoryContextSwitchTo(cur); - } - - values[i++] = Int32GetDatum(persistency == RS_TEMPORARY ? 1 : 0); - - if (xmin != InvalidTransactionId) - { - tmpbigint = xmin; - values[i++] = Int64GetDatum(tmpbigint); - } - else - nulls[i++] = true; - - if (catalog_xmin != InvalidTransactionId) - { - tmpbigint = catalog_xmin; - values[i++] = Int64GetDatum(tmpbigint); - } - else - nulls[i++] = true; - - if (restart_lsn != InvalidXLogRecPtr) - { - snprintf(restart_lsn_str, sizeof(restart_lsn_str), "%X/%X", - LSN_FORMAT_ARGS(restart_lsn)); - values[i++] = CStringGetTextDatum(restart_lsn_str); - } - else - nulls[i++] = true; - - if (confirmed_flush_lsn != InvalidXLogRecPtr) - { - snprintf(confirmed_flush_lsn_str, sizeof(confirmed_flush_lsn_str), - "%X/%X", LSN_FORMAT_ARGS(confirmed_flush_lsn)); - values[i++] = CStringGetTextDatum(confirmed_flush_lsn_str); - } - else - nulls[i++] = true; - - /* send it to dest */ - do_tup_output(tstate, values, nulls); - } - LWLockRelease(ReplicationSlotControlLock); - - end_tup_output(tstate); -} - /* Handle READ_REPLICATION_SLOT command */ static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd) @@ -2007,13 +1819,6 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; - case T_ListSlotsCmd: - cmdtag = "LIST_SLOTS"; - set_ps_display(cmdtag); - ListSlots((ListSlotsCmd *) cmd_node); - EndReplicationCommand(cmdtag); - break; - case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 980e0b2ee2..b9c7ed61c6 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,16 +33,6 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; -/* ---------------------- - * LIST_SLOTS command - * ---------------------- - */ -typedef struct ListSlotsCmd -{ - NodeTag type; - List *slot_names; -} ListSlotsCmd; - /* ---------------------- * BASE_BACKUP command * ---------------------- -- 2.34.1