From a5f9714cb4b17ffd24fbbec0e449e0a01e86236f Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 28 Aug 2018 16:14:32 +0900 Subject: [PATCH v8] Copy function for logical and physical replication slots. --- contrib/test_decoding/expected/slot.out | 234 ++++++++++++++++++ contrib/test_decoding/sql/slot.sql | 94 ++++++++ doc/src/sgml/func.sgml | 41 ++++ src/backend/replication/logical/logical.c | 5 +- src/backend/replication/slot.c | 91 ++++--- src/backend/replication/slotfuncs.c | 380 ++++++++++++++++++++++++++---- src/backend/replication/walsender.c | 3 +- src/include/catalog/pg_proc.dat | 35 +++ src/include/replication/logical.h | 1 + src/include/replication/slot.h | 2 +- 10 files changed, 802 insertions(+), 84 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 523621a..40b200e 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -150,3 +150,237 @@ SELECT pg_drop_replication_slot('regression_slot3'); (1 row) +-- +-- Test copy functions for logical replication slots +-- +-- Create and copy logical slots +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', 'pgoutput', true); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | plugin | temporary | slot_name | plugin | temporary +------------+---------------+-----------+---------------------------------+---------------+----------- + orig_slot1 | test_decoding | f | copied_slot1_change_plugin | pgoutput | f + orig_slot1 | test_decoding | f | copied_slot1_change_plugin_temp | pgoutput | t + orig_slot1 | test_decoding | f | copied_slot1_no_change | test_decoding | f +(3 rows) + +-- Now we have maximum 4 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: all replication slots are in use +HINT: Free one or increase max_replication_slots. +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- Test based on the temporary logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', 'pgoutput', false); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | plugin | temporary | slot_name | plugin | temporary +------------+---------------+-----------+---------------------------------+---------------+----------- + orig_slot2 | test_decoding | t | copied_slot2_change_plugin | pgoutput | t + orig_slot2 | test_decoding | t | copied_slot2_change_plugin_temp | pgoutput | f + orig_slot2 | test_decoding | t | copied_slot2_no_change | test_decoding | t +(3 rows) + +-- Cannot copy a logical slot to a physical slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error +ERROR: cannot copy a logical replication slot to a physical replication slot +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- +-- Test copy functions for physical replication slots +-- +-- Create and copy physical slots +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + slot_name | slot_type | temporary +------------------------+-----------+----------- + orig_slot1 | physical | f + orig_slot2 | physical | f + copied_slot1_no_change | physical | f + copied_slot1_temp | physical | t +(4 rows) + +-- Cannot copy a physical slot to a logical slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error +ERROR: cannot copy a physical replication slot to a logical replication slot +-- Cannot copy a physical slot that doesn't reserve WAL +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error +ERROR: cannot copy a physical replication slot that doesn't reserve WAL +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('orig_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- Test based on the temporary physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); + ?column? +---------- + init +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); + ?column? +---------- + copy +(1 row) + +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); + ?column? +---------- + copy +(1 row) + +-- Check all copied slots status +SELECT + o.slot_name, o.temporary, c.slot_name, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + slot_name | temporary | slot_name | temporary +------------+-----------+------------------------+----------- + orig_slot2 | t | copied_slot2_no_change | t + orig_slot2 | t | copied_slot2_notemp | f +(2 rows) + +SELECT pg_drop_replication_slot('orig_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_no_change'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('copied_slot2_notemp'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index c8d08f8..c14937c 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -76,3 +76,97 @@ SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3'); SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error SELECT pg_drop_replication_slot('regression_slot3'); + +-- +-- Test copy functions for logical replication slots +-- + +-- Create and copy logical slots +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', 'pgoutput', true); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Now we have maximum 4 replication slots. Check slots are properly +-- released even when raise error during creating the target slot. +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); +SELECT pg_drop_replication_slot('copied_slot1_change_plugin'); + +-- Test based on the temporary logical slot +SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', 'pgoutput'); +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', 'pgoutput', false); + +-- Check all copied slots status +SELECT + o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +-- Cannot copy a logical slot to a physical slot +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp'); + +-- +-- Test copy functions for physical replication slots +-- + +-- Create and copy physical slots +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true); +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true); + +-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields. +SELECT slot_name, slot_type, temporary FROM pg_replication_slots; + +-- Cannot copy a physical slot to a logical slot +SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error + +-- Cannot copy a physical slot that doesn't reserve WAL +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'falied'); -- error + +-- temporary slots were dropped automatically +SELECT pg_drop_replication_slot('orig_slot1'); +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot1_no_change'); + +-- Test based on the temporary physical slot +SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change'); +SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false); + +-- Check all copied slots status +SELECT + o.slot_name, o.temporary, c.slot_name, c.temporary +FROM + (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o + LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn +WHERE + o.slot_name != c.slot_name +ORDER BY o.slot_name, c.slot_name; + +SELECT pg_drop_replication_slot('orig_slot2'); +SELECT pg_drop_replication_slot('copied_slot2_no_change'); +SELECT pg_drop_replication_slot('copied_slot2_notemp'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 4930ec1..9887553 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19505,6 +19505,47 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); + pg_copy_physical_replication_slot + + pg_copy_physical_replication_slot(src_slot_name name, dst_slot_name , temporary bool) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing physical replication slot name src_slot_name + to a physical replication slot named dst_slot_name. + The copied physical slot starts to reserve WAL from the same LSN as the + source slot. + temporary is optional. If temporary + is omitted, the same value as the source slot is used. + + + + + + + pg_copy_logical_replication_slot + + pg_copy_logical_replication_slot(src_slot_name name, dst_slot_name , plugin name , temporary boolean) + + + (slot_name name, lsn pg_lsn) + + + Copies an existing logical replication slot name src_slot_name + to a logical replication slot named dst_slot_name + while changing the output plugin and persistence. The copied logical slot starts + from the same LSN as the source logical slot. Both plugin and + temporary are optional. If plugin + or temporary are omitted, the same values as + the source logical slot are used. + + + + + + pg_logical_slot_get_changes pg_logical_slot_get_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[]) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6e5bc12..ae15800 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -228,6 +228,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -271,7 +272,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + ReplicationSlotReserveWal(restart_lsn); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -316,7 +317,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 33b23b6..98d1525 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -990,11 +990,13 @@ CheckSlotRequirements(void) /* * Reserve WAL for the currently active slot. * - * Compute and set restart_lsn in a manner that's appropriate for the type of - * the slot and concurrency safe. + * If an lsn to reserve is not requested, compute and set restart_lsn + * in a manner that's appropriate for the type of the slot and concurrency safe. + * If the reseved WAL is requested, set restart_lsn and check if the corresponding + * wal segment is available. */ void -ReplicationSlotReserveWal(void) +ReplicationSlotReserveWal(XLogRecPtr requested_lsn) { ReplicationSlot *slot = MyReplicationSlot; @@ -1005,47 +1007,57 @@ ReplicationSlotReserveWal(void) * The replication slot mechanism is used to prevent removal of required * WAL. As there is no interlock between this routine and checkpoints, WAL * segments could concurrently be removed when a now stale return value of - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that - * this happens we'll just retry. + * ReplicationSlotsComputeRequiredLSN() is used. If the lsn to reserve is + * not requested, in the unlikely case that this happens we'll just retry. */ while (true) { XLogSegNo segno; XLogRecPtr restart_lsn; - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more - * quickly. - * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. - */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) + if (!XLogRecPtrIsInvalid(requested_lsn)) { - XLogRecPtr flushptr; - - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); + /* Set the requested lsn */ SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; + slot->data.restart_lsn = requested_lsn; SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); } else { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + /* + * For logical slots log a standby snapshot and start logical decoding + * at exactly that position. That allows the slot to start up more + * quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return + * the location of the last redo LSN. While that slightly increases + * the chance that we have to retry, it's where a base backup has to + * start replay at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + restart_lsn = GetXLogInsertRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + else + { + restart_lsn = GetRedoRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } } /* prevent WAL removal as fast as possible */ @@ -1061,6 +1073,21 @@ ReplicationSlotReserveWal(void) XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); if (XLogGetLastRemovedSegno() < segno) break; + + /* + * The caller has requested a specific wal which we failed to reserve. + * We can't retry here as the requested wal is no longer available. + */ + if (!XLogRecPtrIsInvalid(requested_lsn)) + { + char filename[MAXFNAMELEN]; + + XLogFileName(filename, ThisTimeLineID, segno, wal_segment_size); + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FILE), + errmsg("requested WAL segment %s has already been removed", + filename))); + } } } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 224dd92..01897ff 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -17,10 +17,12 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "access/xlog_internal.h" #include "replication/decode.h" #include "replication/slot.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" +#include "storage/ipc.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -36,6 +38,66 @@ check_permissions(void) } /* + * Error cleanup callback for copy replication slot functions. Release + * both MyReplicationSlot and the saved replication slot. + */ +static void +copy_replication_slot_callback(int code, Datum arg) +{ + ReplicationSlot *savedslot = (ReplicationSlot *) DatumGetPointer(arg); + bool release_saved_slot = (savedslot && savedslot != MyReplicationSlot); + + if (MyReplicationSlot) + ReplicationSlotRelease(); + + /* Release the saved slot if exist while preventing double releasing */ + if (release_saved_slot) + { + Assert(MyReplicationSlot == NULL); + MyReplicationSlot = savedslot; + ReplicationSlotRelease(); + } +} + +/* + * Helper function for creating a new physical replication slot with + * given arguments. Return a restart_lsn of new replication slot or + * InvalidXLogRecPtr if WAL reservation is not required. + */ +static XLogRecPtr +create_physical_replication_slot(char *name, bool immediately_reserve, + bool temporary, XLogRecPtr restart_lsn) +{ + XLogRecPtr result = InvalidXLogRecPtr; + + Assert(!MyReplicationSlot); + + check_permissions(); + + CheckSlotRequirements(); + + /* acquire replication slot, this will check for conflicting names */ + ReplicationSlotCreate(name, false, + temporary ? RS_TEMPORARY : RS_PERSISTENT); + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + ReplicationSlotReserveWal(restart_lsn); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + result = MyReplicationSlot->data.restart_lsn; + } + + ReplicationSlotRelease(); + + return result; +} + +/* * SQL function for creating a new physical (streaming replication) * replication slot. */ @@ -47,75 +109,162 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); Datum values[2]; bool nulls[2]; + XLogRecPtr result_lsn; TupleDesc tupdesc; HeapTuple tuple; Datum result; - Assert(!MyReplicationSlot); + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + result_lsn = create_physical_replication_slot(NameStr(*name), + immediately_reserve, + temporary, + InvalidXLogRecPtr); + + values[0] = NameGetDatum(name); + nulls[0] = false; + + if (XLogRecPtrIsInvalid(result_lsn)) + nulls[1] = true; + else + { + values[1] = LSNGetDatum(result_lsn); + nulls[1] = false; + } + + tuple = heap_form_tuple(tupdesc, values, nulls); + result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} + +/* + * Copy physical replication slot (3 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_physical_replication_slot_no_temp(PG_FUNCTION_ARGS) +{ + return pg_copy_physical_replication_slot(fcinfo); +} + +/* + * SQL function for copying a physical replication slot. + */ +Datum +pg_copy_physical_replication_slot(PG_FUNCTION_ARGS) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + bool temporary; /* optional argument */ + bool immediately_reserve; + ReplicationSlot *saveslot = NULL; + XLogRecPtr restart_lsn; + XLogRecPtr result_lsn; + Datum values[2]; + bool nulls[2]; + TupleDesc tupdesc; + HeapTuple tuple; if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - check_permissions(); + /* Acquire the source slot so we own it */ + ReplicationSlotAcquire(NameStr(*src_name), true); - CheckSlotRequirements(); + /* Check type of replication slot */ + if (SlotIsLogical(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a logical replication slot to a physical replication slot")))); - /* acquire replication slot, this will check for conflicting names */ - ReplicationSlotCreate(NameStr(*name), false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + /* Copying non-reserved slot doesn't make sense */ + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a physical replication slot that doesn't reserve WAL")))); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; + /* Save values of the source slot */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY); + /* Reserve WAL at creation if the source slot already reserves */ + immediately_reserve = !XLogRecPtrIsInvalid(restart_lsn); + + /* check the optional argument */ + if (PG_NARGS() >= 3) + temporary = PG_GETARG_BOOL(2); + + /* + * To prevent the restart_lsn WAL of the source slot from removal + * during copying a new slot, we copy it while holding the source slot. + * Since we are not allowed to create a new one while holding another + * one, we temporarily save the acquired slot and restore it after + * creation. Set callback function to ensure we release replication + * slots if fail below. + */ if (immediately_reserve) + saveslot = MyReplicationSlot; + else + ReplicationSlotRelease(); + + PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); { - /* Reserve WAL as the user asked for it */ - ReplicationSlotReserveWal(); + if (immediately_reserve) + MyReplicationSlot = NULL; - /* Write this slot to disk */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); + result_lsn = create_physical_replication_slot(NameStr(*dst_name), + immediately_reserve, + temporary, + restart_lsn); + Assert(MyReplicationSlot == NULL); - values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); - nulls[1] = false; + /* + * Restore source slot, if saved. We must not change the saveslot + * to cancel the callback function. + */ + if (saveslot) + MyReplicationSlot = saveslot; } + PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + + /* Release the source slot, if not yet */ + if (immediately_reserve) + ReplicationSlotRelease(); + + values[0] = NameGetDatum(dst_name); + nulls[0] = false; + + if (XLogRecPtrIsInvalid(result_lsn)) + nulls[1] = true; else { - nulls[1] = true; + values[1] = LSNGetDatum(result_lsn); + nulls[1] = false; } tuple = heap_form_tuple(tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); - - PG_RETURN_DATUM(result); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); } - /* - * SQL function for creating a new logical replication slot. + * Helper function for creating a new logical replication slot with + * given arguments. Return a confirmed_lsn of new replication slot. */ -Datum -pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +static XLogRecPtr +create_logical_replication_slot(char *name, char *plugin, + bool temporary, XLogRecPtr start_lsn) { - Name name = PG_GETARG_NAME(0); - Name plugin = PG_GETARG_NAME(1); - bool temporary = PG_GETARG_BOOL(2); - LogicalDecodingContext *ctx = NULL; - - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; - Datum values[2]; - bool nulls[2]; + XLogRecPtr result; Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - check_permissions(); CheckLogicalDecodingRequirements(); @@ -128,39 +277,174 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * slots can be created as temporary from beginning as they get dropped on * error as well. */ - ReplicationSlotCreate(NameStr(*name), true, + ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ + start_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); - values[0] = NameGetDatum(&MyReplicationSlot->data.name); - values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); - /* don't need the decoding context anymore */ FreeDecodingContext(ctx); - memset(nulls, 0, sizeof(nulls)); - - tuple = heap_form_tuple(tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); + + result = MyReplicationSlot->data.confirmed_flush; + ReplicationSlotRelease(); - PG_RETURN_DATUM(result); + return result; } +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); + XLogRecPtr confirmed_flush; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + confirmed_flush = create_logical_replication_slot(NameStr(*name), + NameStr(*plugin), + temporary, + InvalidXLogRecPtr); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = NameGetDatum(name); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} + +/* + * Copy logical replication slot (2 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * Copy logical replication slot (3 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * SQL function for copying a logical replication slot. + */ +Datum +pg_copy_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + char *plugin; /* optional argument */ + bool temporary; /* optional argument */ + ReplicationSlot *saveslot = NULL; + XLogRecPtr confirmed_flush; + XLogRecPtr restart_lsn; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* Acquire the source slot so we own it */ + ReplicationSlotAcquire(NameStr(*src_name), true); + + /* Check type of replication slot */ + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + (errmsg("cannot copy a physical replication slot to a logical replication slot")))); + + /* Save values of the source slot */ + restart_lsn = MyReplicationSlot->data.restart_lsn; + plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin)); + temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY); + + /* Check the optional arguments */ + if (PG_NARGS() >= 3) + plugin = NameStr(*(PG_GETARG_NAME(2))); + if (PG_NARGS() >= 4) + temporary = PG_GETARG_BOOL(3); + + /* + * To prevent the restart_lsn WAL of the source slot from removal + * during copying a new slot, we copy it while holding the source slot. + * Since we are not allowed to create a new one while holding another + * one, we temporarily save the acquired slot and restore it after + * creation. Set callback function to ensure we release replication + * slots if fail below. + */ + saveslot = MyReplicationSlot; + PG_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + { + MyReplicationSlot = NULL; + + confirmed_flush = create_logical_replication_slot(NameStr(*dst_name), + plugin, + temporary, + restart_lsn); + Assert(MyReplicationSlot == NULL); + + /* + * Restore source slot. We must not change the saveslot to cancel the + * callback function. + */ + MyReplicationSlot = saveslot; + } + PG_END_ENSURE_ERROR_CLEANUP(copy_replication_slot_callback, (Datum) PointerGetDatum(saveslot)); + + /* Release the source slot */ + ReplicationSlotRelease(); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = NameGetDatum(dst_name); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} /* * SQL function for dropping a replication slot. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2d2eb23..63a9d03 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -930,6 +930,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); @@ -972,7 +973,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) { - ReplicationSlotReserveWal(); + ReplicationSlotReserveWal(InvalidXLogRecPtr); ReplicationSlotMarkDirty(); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3ecc2e1..622c917 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9656,6 +9656,20 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, +{ oid => '4008', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', + proallargtypes => '{name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,temporary,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot' }, +{ oid => '4009', descr => 'copy a physical replication slot', + proname => 'pg_copy_physical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{slot_name,dst_name,slot_name,lsn}', + prosrc => 'pg_copy_physical_replication_slot_no_temp' }, { oid => '3780', descr => 'drop a replication slot', proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', @@ -9676,6 +9690,27 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, +{ oid => '4005', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool', + proallargtypes => '{name,name,name,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot' }, +{ oid => '4006', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name', + proallargtypes => '{name,name,name,name,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin' }, +{ oid => '4007', descr => 'copy a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,name,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' }, { oid => '3782', descr => 'get changes from replication slot', proname => 'pg_logical_slot_get_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c8ffc4c..0a2a63a 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8f1d66..3b652d2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -193,7 +193,7 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern bool ReplicationSlotValidateName(const char *name, int elevel); -extern void ReplicationSlotReserveWal(void); +extern void ReplicationSlotReserveWal(XLogRecPtr requested_lsn); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); -- 2.10.5