From 0392f3e3f5c371e0b11c853a419f48b5e402660b Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Thu, 13 Oct 2022 17:05:45 +0300 Subject: [PATCH 1/2] Add replication protocol cmd to create a snapshot Introduced REPLICATION_SLOT_SNAPSHOT to be able to create and use a snapshot without creating a new replication slot, but by using an existing slot. REPLICATION_SLOT_SNAPSHOT simply does what CREATE_REPLICATION_SLOT does without creating a new replication slot. REPLICATION_SLOT_SNAPSHOT command imports the snapshot into the current transaction and returns consistent_point. The changes earlier than the consistent_point will be applied by importing the snapshot. All changes later than the consistent_point will be available to be consumed from the replication slot. This is useful for reusing replication slots in logical replication. Otherwise, tablesync workers cannot start from a consistent point to copy a relation and then apply changes by consuming from replication slot. --- doc/src/sgml/protocol.sgml | 32 ++++++ .../libpqwalreceiver/libpqwalreceiver.c | 69 ++++++++++++- src/backend/replication/logical/logical.c | 39 +++++++- .../replication/logical/logicalfuncs.c | 1 + src/backend/replication/repl_gram.y | 18 +++- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slotfuncs.c | 1 + src/backend/replication/walsender.c | 97 ++++++++++++++++++- src/include/nodes/replnodes.h | 11 +++ src/include/replication/logical.h | 1 + src/include/replication/walreceiver.h | 13 +++ src/tools/pgindent/typedefs.list | 1 + 12 files changed, 281 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 93fc7167d4..93a3867996 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2613,6 +2613,38 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + REPLICATION_SLOT_SNAPSHOT slot_name [ ( option [, ...] ) ] + REPLICATION_SLOT_SNAPSHOT + + + + Creates a snapshot including all the changes from the replication slot until + the point at which the replication slot becomes consistent. Then the snapshot + is used in the currenct transaction. This command is currently only supported + for logical replication. + slots. + + + + In response to this command, the server will return a one-row result set, + containing the following field: + + + consistent_point (text) + + + The WAL location at which the slot became consistent. This is the + earliest location from which streaming can start on this replication + slot. + + + + + + + + BASE_BACKUP [ ( option [, ...] ) ] BASE_BACKUP diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index c40c6220db..9213fd2b1b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,6 +80,8 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const int nRetTypes, const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); +static void libpqrcv_slot_snapshot(WalReceiverConn *conn, char *slotname, + const WalRcvStreamOptions *options, XLogRecPtr *lsn); static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_connect = libpqrcv_connect, @@ -96,7 +98,8 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_create_slot = libpqrcv_create_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, - .walrcv_disconnect = libpqrcv_disconnect + .walrcv_disconnect = libpqrcv_disconnect, + .walrcv_slot_snapshot = libpqrcv_slot_snapshot }; /* Prototypes for private functions */ @@ -968,6 +971,70 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, return snapshot; } +/* + * TODO + */ +static void +libpqrcv_slot_snapshot(WalReceiverConn *conn, + char *slotname, + const WalRcvStreamOptions *options, + XLogRecPtr *lsn) +{ + StringInfoData cmd; + PGresult *res; + char *pubnames_str; + List *pubnames; + char *pubnames_literal; + + initStringInfo(&cmd); + + /* Build the command. */ + appendStringInfo(&cmd, "REPLICATION_SLOT_SNAPSHOT \"%s\"", slotname); + appendStringInfoString(&cmd, " ("); + appendStringInfo(&cmd, " proto_version '%u'", + options->proto.logical.proto_version); + + /* Add publication names. */ + pubnames = options->proto.logical.publication_names; + pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); + if (!pubnames_str) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str, + strlen(pubnames_str)); + if (!pubnames_literal) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */ + errmsg("could not start WAL streaming: %s", + pchomp(PQerrorMessage(conn->streamConn))))); + appendStringInfo(&cmd, ", publication_names %s", pubnames_literal); + PQfreemem(pubnames_literal); + pfree(pubnames_str); + + appendStringInfoString(&cmd, " )"); + + /* Execute the command. */ + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Could not create a snapshot by replication slot \"%s\": %s", + slotname, pchomp(PQerrorMessage(conn->streamConn))))); + } + + if (lsn) + *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, + CStringGetDatum(PQgetvalue(res, 0, 0)))); + + PQclear(res); +} + /* * Return PID of remote backend process. */ diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 52d1fe6269..b62babe359 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -461,6 +461,10 @@ CreateInitDecodingContext(const char *plugin, * fast_forward * bypass the generation of logical changes. * + * need_full_snapshot + * if true, create a snapshot able to read all tables, + * otherwise do not create any snapshot. + * * xl_routine * XLogReaderRoutine used by underlying xlogreader * @@ -479,6 +483,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, + bool need_full_snapshot, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -487,6 +492,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, LogicalDecodingContext *ctx; ReplicationSlot *slot; MemoryContext old_context; + TransactionId xmin_horizon = InvalidTransactionId; /* shorter lines... */ slot = MyReplicationSlot; @@ -533,8 +539,39 @@ CreateDecodingContext(XLogRecPtr start_lsn, start_lsn = slot->data.confirmed_flush; } + + /* + * We need to determine a safe xmin horizon to start decoding from if we + * want to create a snapshot too. Otherwise we would end up with a + * snapshot that cannot be imported since xmin value from the snapshot may + * be less than the oldest safe xmin. To avoid this call + * GetOldestSafeDecodingTransactionId() to return a safe xmin value, which + * can be used while exporting/importing the snapshot. + * + * So we have to acquire the ProcArrayLock to prevent computation of new + * xmin horizons by other backends, get the safe decoding xid, and inform + * the slot machinery about the new limit. Once that's done the + * ProcArrayLock can be released as the slot machinery now is protecting + * against vacuum. + */ + if (need_full_snapshot) + { + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + slot->effective_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + + xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + } + ctx = StartupDecodingContext(output_plugin_options, - start_lsn, InvalidTransactionId, false, + start_lsn, xmin_horizon, need_full_snapshot, fast_forward, xl_routine, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..1191c70eb0 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -208,6 +208,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin ctx = CreateDecodingContext(InvalidXLogRecPtr, options, false, + false, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, .segment_close = wal_segment_close), diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..e5f0235d1e 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -65,6 +65,7 @@ Node *replication_parse_result; %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT %token K_TIMELINE_HISTORY +%token K_REPLICATION_SLOT_SNAPSHOT %token K_WAIT %token K_TIMELINE %token K_PHYSICAL @@ -80,7 +81,7 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - read_replication_slot timeline_history show + read_replication_slot timeline_history show replication_slot_snapshot %type generic_option_list %type generic_option %type opt_timeline @@ -114,6 +115,7 @@ command: | read_replication_slot | timeline_history | show + | replication_slot_snapshot ; /* @@ -307,6 +309,19 @@ timeline_history: } ; +/* + * REPLICATION_SLOT_SNAPSHOT %s options + */ +replication_slot_snapshot: + K_REPLICATION_SLOT_SNAPSHOT var_name plugin_options + { + ReplicationSlotSnapshotCmd *n = makeNode(ReplicationSlotSnapshotCmd); + n->slotname = $2; + n->options = $3; + $$ = (Node *) n; + } + ; + opt_physical: K_PHYSICAL | /* EMPTY */ @@ -400,6 +415,7 @@ ident_or_keyword: | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } + | K_REPLICATION_SLOT_SNAPSHOT { $$ = "replication_slot_snapshot"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } | K_PHYSICAL { $$ = "physical"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index cb467ca46f..1988a6203b 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -126,6 +126,7 @@ START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } +REPLICATION_SLOT_SNAPSHOT { return K_REPLICATION_SLOT_SNAPSHOT; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } LOGICAL { return K_LOGICAL; } @@ -303,6 +304,7 @@ replication_scanner_is_replication_command(void) case K_DROP_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: + case K_REPLICATION_SLOT_SNAPSHOT: case K_SHOW: /* Yes; push back the first token so we can parse later. */ repl_pushed_back_token = first_token; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c964824..b3ae11b2c8 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -478,6 +478,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) ctx = CreateDecodingContext(InvalidXLogRecPtr, NIL, true, /* fast_forward */ + false, XL_ROUTINE(.page_read = read_local_xlog_page, .segment_open = wal_segment_open, .segment_close = wal_segment_close), diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 015ae2995d..70d926f4f0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -238,6 +238,7 @@ static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); static void StartLogicalReplication(StartReplicationCmd *cmd); +static void ReplicationSlotSnapshot(ReplicationSlotSnapshotCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); @@ -1280,7 +1281,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) * are reported early. */ logical_decoding_ctx = - CreateDecodingContext(cmd->startpoint, cmd->options, false, + CreateDecodingContext(cmd->startpoint, cmd->options, false, false, XL_ROUTINE(.page_read = logical_read_xlog_page, .segment_open = WalSndSegmentOpen, .segment_close = wal_segment_close), @@ -1332,6 +1333,91 @@ StartLogicalReplication(StartReplicationCmd *cmd) EndCommand(&qc, DestRemote, false); } +/* + * Create a snapshot from an existing replication slot. + */ +static void +ReplicationSlotSnapshot(ReplicationSlotSnapshotCmd *cmd) +{ + Snapshot snap; + LogicalDecodingContext *ctx; + char xloc[MAXFNAMELEN]; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[1]; + bool nulls[1] = {0}; + + Assert(!MyReplicationSlot); + + if (!IsTransactionBlock()) + ereport(ERROR, + (errmsg("%s must be called inside a transaction", + "REPLICATION_SLOT_SNAPSHOT ..."))); + + if (XactIsoLevel != XACT_REPEATABLE_READ) + ereport(ERROR, + (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", + "REPLICATION_SLOT_SNAPSHOT ..."))); + + if (FirstSnapshotSet) + ereport(ERROR, + (errmsg("%s must be called before any query", + "REPLICATION_SLOT_SNAPSHOT ..."))); + + if (IsSubTransaction()) + ereport(ERROR, + (errmsg("%s must not be called in a subtransaction", + "REPLICATION_SLOT_SNAPSHOT ..."))); + + ReplicationSlotAcquire(cmd->slotname, false); + + ctx = CreateDecodingContext(MyReplicationSlot->data.restart_lsn, + cmd->options, + false, + true, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); + + /* + * Signal that we don't need the timeout mechanism. We're just creating + * the replication slot and don't yet accept feedback messages or send + * keepalives. As we possibly need to wait for further WAL the walsender + * would otherwise possibly be killed too soon. + */ + last_reply_timestamp = 0; + + /* build initial snapshot, might take a while */ + DecodingContextFindStartpoint(ctx); + + snap = SnapBuildInitialSnapshot(ctx->snapshot_builder); + RestoreTransactionSnapshot(snap, MyProc); + + /* Don't need the decoding context anymore */ + FreeDecodingContext(ctx); + + /* Create a tuple to send consisten WAL location */ + snprintf(xloc, sizeof(xloc), "%X/%X", + LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush)); + + dest = CreateDestReceiver(DestRemoteSimple); + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "consistent_point", + TEXTOID, -1, 0); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* consistent wal location */ + values[0] = CStringGetTextDatum(xloc); + + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); + + ReplicationSlotRelease(); +} + /* * LogicalDecodingContext 'prepare_write' callback. * @@ -1860,6 +1946,15 @@ exec_replication_command(const char *cmd_string) } break; + case T_ReplicationSlotSnapshotCmd: + { + cmdtag = "REPLICATION_SLOT_SNAPSHOT"; + set_ps_display(cmdtag); + ReplicationSlotSnapshot((ReplicationSlotSnapshotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + } + default: elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 4321ba8f86..44a4580671 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -108,4 +108,15 @@ typedef struct TimeLineHistoryCmd TimeLineID timeline; } TimeLineHistoryCmd; +/* ---------------------- + * REPLICATION_SLOT_SNAPSHOT command + * ---------------------- + */ +typedef struct ReplicationSlotSnapshotCmd +{ + NodeTag type; + char *slotname; + List *options; +} ReplicationSlotSnapshotCmd; + #endif /* REPLNODES_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea0..6535786a0e 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -125,6 +125,7 @@ extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin, extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, + bool need_full_snapshot, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index decffe352d..bd11f9f31e 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -384,6 +384,16 @@ typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); +/* + * walrcv_slot_snapshot_fn + * + * Create a snapshot by an existing replication slot + */ +typedef void (*walrcv_slot_snapshot_fn) (WalReceiverConn *conn, + char *slotname, + const WalRcvStreamOptions *options, + XLogRecPtr *lsn); + typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; @@ -401,6 +411,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; + walrcv_slot_snapshot_fn walrcv_slot_snapshot; } WalReceiverFunctionsType; extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; @@ -435,6 +446,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) +#define walrcv_slot_snapshot(conn, slotname, options, lsn) \ + WalReceiverFunctions->walrcv_slot_snapshot(conn, slotname, options, lsn) static inline void walrcv_clear_result(WalRcvExecResult *walres) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 23bafec5f7..209918c380 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2322,6 +2322,7 @@ ReplicationSlotCtlData ReplicationSlotOnDisk ReplicationSlotPersistency ReplicationSlotPersistentData +ReplicationSlotSnapshotCmd ReplicationState ReplicationStateCtl ReplicationStateOnDisk -- 2.25.1