From f7bc51504919a5f34265a0f02720f1b0b34fc480 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Wed, 23 Feb 2022 11:37:30 +0530 Subject: [PATCH v1] Skip replication of non local data. Add an option only_local which will subscribe only to the locally generated data in the publisher node. If subscriber is created with this option, publisher will skip publishing the data that was subscribed from other nodes. It can be created using following syntax: ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (only_local = on); --- contrib/test_decoding/test_decoding.c | 13 +++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 20 +++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 18 ++++++++-- src/backend/replication/logical/decode.c | 36 ++++++++++++++----- src/backend/replication/logical/logical.c | 35 ++++++++++++++++++ src/backend/replication/logical/tablesync.c | 2 +- src/backend/replication/logical/worker.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 25 +++++++++++++ src/backend/replication/slot.c | 4 ++- src/backend/replication/slotfuncs.c | 18 +++++++--- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 21 ++++++++--- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 6 ++-- src/include/catalog/pg_subscription.h | 3 ++ src/include/replication/logical.h | 4 +++ src/include/replication/output_plugin.h | 7 ++++ src/include/replication/pgoutput.h | 1 + src/include/replication/slot.h | 5 ++- src/include/replication/walreceiver.h | 8 +++-- src/test/regress/expected/rules.out | 5 +-- src/tools/pgindent/typedefs.list | 1 + 24 files changed, 205 insertions(+), 36 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index ea22649e41..58bc5dbc1c 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_remotedata(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, @@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_remotedata_cb = pg_decode_filter_remotedata; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->sequence_cb = pg_decode_sequence; @@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static bool +pg_decode_filter_remotedata(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ca65a8bd20..94e096e5fb 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->binary = subform->subbinary; sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; + sub->onlylocaldata = subform->subonlylocaldata; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..931c549f7c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -958,7 +958,8 @@ CREATE VIEW pg_replication_slots AS L.confirmed_flush_lsn, L.wal_status, L.safe_wal_size, - L.two_phase + L.two_phase, + L.only_local FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3ef6607d24..3f5cbe2c20 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -61,6 +61,7 @@ #define SUBOPT_BINARY 0x00000080 #define SUBOPT_STREAMING 0x00000100 #define SUBOPT_TWOPHASE_COMMIT 0x00000200 +#define SUBOPT_ONLYLOCAL_DATA 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -82,6 +83,7 @@ typedef struct SubOpts bool binary; bool streaming; bool twophase; + bool onlylocal_data; } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); @@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->streaming = false; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; + if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA)) + opts->onlylocal_data = false; /* Parse options */ foreach(lc, stmt_options) @@ -228,6 +232,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA) && + strcmp(defel->defname, "only_local") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ONLYLOCAL_DATA)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ONLYLOCAL_DATA; + opts->onlylocal_data = defGetBoolean(defel); + } else if (strcmp(defel->defname, "two_phase") == 0) { /* @@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_ONLYLOCAL_DATA); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -460,6 +474,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_subonlylocaldata - 1] = BoolGetDatum(opts.onlylocal_data); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -565,7 +580,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + CRS_NOEXPORT_SNAPSHOT, NULL, + opts.onlylocal_data); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..326f60414e 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -75,7 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn); + XLogRecPtr *lsn, + bool onlylocal_data); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -453,6 +454,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.onlylocal_data && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", only_local 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -869,7 +874,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + XLogRecPtr *lsn, bool onlylocal_data) { PGresult *res; StringInfoData cmd; @@ -899,6 +904,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoChar(&cmd, ' '); } + if (onlylocal_data) + { + appendStringInfoString(&cmd, "ONLY_LOCAL"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } + if (use_new_options_syntax) { switch (snapshot_action) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 18cf931822..6305b93fc7 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -555,6 +555,15 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) return filter_by_origin_cb_wrapper(ctx, origin_id); } +static inline bool +FilterRemoteOriginData(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_remotedata_cb == NULL) + return false; + + return filter_remotedata_cb_wrapper(ctx, origin_id); +} + /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ @@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message = (xl_logical_message *) XLogRecGetData(r); if (message->dbId != ctx->slot->data.database || - FilterByOrigin(ctx, origin_id)) + FilterByOrigin(ctx, origin_id) || + FilterRemoteOriginData(ctx, origin_id)) return; if (message->transactional && @@ -864,7 +874,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -914,7 +925,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -980,7 +992,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1032,7 +1045,8 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1082,7 +1096,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; /* @@ -1175,7 +1190,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; change = ReorderBufferGetChange(ctx->reorder); @@ -1250,7 +1266,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)); + ctx->fast_forward || FilterByOrigin(ctx, origin_id) || + FilterRemoteOriginData(ctx, origin_id)); } /* @@ -1335,7 +1352,8 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) || + FilterRemoteOriginData(ctx, XLogRecGetOrigin(r))) return; tupledata = XLogRecGetData(r); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 934aa13f2d..19584eaea7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); + ctx->onlylocal_data = ctx->callbacks.filter_remotedata_cb != NULL; + /* * streaming callbacks * @@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin, */ ctx->twophase &= slot->data.two_phase; + ctx->onlylocal_data &= slot->data.onlylocal_data; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; return ctx; @@ -1178,6 +1182,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) return ret; } +bool +filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + bool ret; + + Assert(!ctx->fast_forward); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "filter_remoteorigin"; + state.report_location = InvalidXLogRecPtr; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = false; + + /* do the actual work: call callback */ + ret = ctx->callbacks.filter_remotedata_cb(ctx, origin_id); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + return ret; +} + static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571..f5093ce8c9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1224,7 +1224,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) HOLD_INTERRUPTS(); walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + CRS_USE_SNAPSHOT, origin_startpos, false /* only_local */); RESUME_INTERRUPTS(); /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5d9acc6173..15385fb614 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3575,6 +3575,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.onlylocal_data = MySubscription->onlylocaldata; if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ea57a0477f..0c9b60bd65 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx, Size sz, const char *message); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, @@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_prepared_cb = pgoutput_commit_prepared_txn; cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; + cb->filter_remotedata_cb = pgoutput_remoteorigin_filter; cb->shutdown_cb = pgoutput_shutdown; /* transaction streaming */ @@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool onlylocal_data_given = false; data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; + data->onlylocal_data = false; foreach(lc, options) { @@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "only_local") == 0) + { + if (onlylocal_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + onlylocal_data_given = true; + + data->onlylocal_data = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx, return false; } +static bool +pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->onlylocal_data && origin_id != InvalidRepOriginId) + return true; + return false; +} /* * Shutdown the output plugin. * diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 3d39fddaae..429bc1328c 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -253,7 +253,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, bool two_phase, + bool onlylocal_data) { ReplicationSlot *slot = NULL; int i; @@ -313,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.onlylocal_data = onlylocal_data; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 886899afd2..0e0bc1e940 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, + false, false); if (immediately_reserve) { @@ -118,7 +119,8 @@ static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, XLogRecPtr restart_lsn, - bool find_startpoint) + bool find_startpoint, + bool onlylocal_data) { LogicalDecodingContext *ctx = NULL; @@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + onlylocal_data); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool onlylocal_data = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -189,7 +193,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, InvalidXLogRecPtr, - true); + true, + onlylocal_data); values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); @@ -231,7 +236,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 14 +#define PG_GET_REPLICATION_SLOTS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -429,6 +434,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + values[i++] = BoolGetDatum(slot_contents.data.onlylocal_data); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -794,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, src_restart_lsn, + false, false); } else diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ceaff097b9..cfdefb1f22 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -374,7 +374,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, 0, NULL, false); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5a718b1fe9..b826326b98 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -963,12 +963,14 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, + bool *onlylocal_data) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool onlylocal_data_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1019,6 +1021,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "only_local") == 0) + { + if (onlylocal_data_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + onlylocal_data_given = true; + *onlylocal_data = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1035,6 +1046,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool onlylocal_data = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1044,13 +1056,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &onlylocal_data); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); } else { @@ -1065,7 +1078,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, onlylocal_data); } if (cmd->kind == REPLICATION_KIND_LOGICAL) diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 6957567264..d7a4e24167 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "enabled", "slot_name", "streaming", - "synchronous_commit", "two_phase"); + "synchronous_commit", "two_phase", "only_local"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7f1ee97f55..e002563a2a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10782,9 +10782,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,only_local}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 18c291289f..6e3119247c 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool subonlylocaldata; /* skip copying of remote origin data */ + char subtwophasestate; /* Stream two-phase transactions */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ @@ -102,6 +104,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool onlylocaldata; /* Skip copying of remote orging data */ char twophasestate; /* Allow streaming two-phase transactions */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 1097cc9799..82014fe252 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + bool onlylocal_data; + /* * State for writing output. */ @@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); +extern bool filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx, + RepOriginId origin_id); extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index a16bebf76c..52b5de3eb8 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id); +/* + * Filter remote origin changes. + */ +typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx, + RepOriginId origin_id); + /* * Called to shutdown an output plugin. */ @@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeMessageCB message_cb; LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; + LogicalDecodeFilterRemoteOriginCB filter_remotedata_cb; LogicalDecodeShutdownCB shutdown_cb; /* streaming of changes at prepare time */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a..e8fac6b3f8 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool onlylocal_data; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 24b30210c3..833d380b0f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -94,6 +94,8 @@ typedef struct ReplicationSlotPersistentData */ bool two_phase; + bool onlylocal_data; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; @@ -195,7 +197,8 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency p, bool two_phase); + ReplicationSlotPersistency p, bool two_phase, + bool onlylocal_data); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 92f73a55b8..e62dca9b45 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,7 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + bool onlylocal_data; } logical; } proto; } WalRcvStreamOptions; @@ -351,7 +352,8 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn); + XLogRecPtr *lsn, + bool onlylocal_data); /* * walrcv_get_backend_pid_fn @@ -423,8 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288d67..dc677e5c67 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1456,8 +1456,9 @@ pg_replication_slots| SELECT l.slot_name, l.confirmed_flush_lsn, l.wal_status, l.safe_wal_size, - l.two_phase - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) + l.two_phase, + l.only_local + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, only_local) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c6b302c7b2..0bf093858b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB LogicalDecodeCommitPreparedCB LogicalDecodeFilterByOriginCB LogicalDecodeFilterPrepareCB +LogicalDecodeFilterRemoteOriginCB LogicalDecodeMessageCB LogicalDecodePrepareCB LogicalDecodeRollbackPreparedCB -- 2.30.2