From 9434805e5d357b48c43d0d91d33a0a58e42d13ce Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 26 Jun 2018 21:51:50 +0900 Subject: [PATCH v2] Copy logical replication slot. --- doc/src/sgml/func.sgml | 21 ++++ src/backend/replication/logical/logical.c | 13 ++- src/backend/replication/slotfuncs.c | 152 +++++++++++++++++++++++----- src/backend/replication/walsender.c | 1 + src/include/catalog/pg_proc.dat | 21 ++++ src/include/replication/logical.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 13 ++- 7 files changed, 191 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 5dce8ef..253f0e3 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19252,6 +19252,27 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); + pg_copy_logical_replication_slot + + pg_copy_logical_replication_slot(src_slot_name name, dst_slot_name , plugin name , temporary boolean) + + + (slot_name name, lsn pg_lsn) + + + Copy a existing src_slot_name logical (decoding) slot + to dst_slot_name while changing the output plugin + and persistence. Copied logical slot starts from the same LSN as the + source logical slot. Both plugin and + temporary are optional. If plugin + or temporary are omitted, the same values of + the source logical slot are set. + + + + + + pg_logical_slot_get_changes pg_logical_slot_get_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[]) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 61588d6..7e7d54a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -223,6 +223,7 @@ LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr start_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -266,7 +267,15 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - ReplicationSlotReserveWal(); + /* Find start location to read WAL if not specified */ + if (XLogRecPtrIsInvalid(start_lsn)) + ReplicationSlotReserveWal(); + else + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = start_lsn; + SpinLockRelease(&slot->mutex); + } /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start @@ -311,7 +320,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, + ctx = StartupDecodingContext(NIL, start_lsn, xmin_horizon, need_full_snapshot, true, read_page, prepare_write, do_write, update_progress); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2806e10..6872588 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -92,30 +92,19 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } - /* - * SQL function for creating a new logical replication slot. + * Helper function for creating a new logical replication slot with + * given arguments. Return a confirmed_lsn of new replication slot. */ -Datum -pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +static XLogRecPtr +create_logical_replication_slot(char *name, char *plugin, + bool temporary, XLogRecPtr start_lsn) { - Name name = PG_GETARG_NAME(0); - Name plugin = PG_GETARG_NAME(1); - bool temporary = PG_GETARG_BOOL(2); - LogicalDecodingContext *ctx = NULL; - - TupleDesc tupdesc; - HeapTuple tuple; - Datum result; - Datum values[2]; - bool nulls[2]; + XLogRecPtr result; Assert(!MyReplicationSlot); - if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - check_permissions(); CheckLogicalDecodingRequirements(); @@ -128,39 +117,146 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * slots can be created as temporary from beginning as they get dropped on * error as well. */ - ReplicationSlotCreate(NameStr(*name), true, + ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL); /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(plugin, NIL, false, /* do not build snapshot */ + start_lsn, logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); - values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); - values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush); - /* don't need the decoding context anymore */ FreeDecodingContext(ctx); - memset(nulls, 0, sizeof(nulls)); - - tuple = heap_form_tuple(tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); - /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); + + result = MyReplicationSlot->data.confirmed_flush; + ReplicationSlotRelease(); - PG_RETURN_DATUM(result); + return result; } +/* + * SQL function for creating a new logical replication slot. + */ +Datum +pg_create_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + Name plugin = PG_GETARG_NAME(1); + bool temporary = PG_GETARG_BOOL(2); + XLogRecPtr confirmed_flush; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + confirmed_flush = create_logical_replication_slot(NameStr(*name), + NameStr(*plugin), + temporary, + InvalidXLogRecPtr); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = CStringGetTextDatum(NameStr(*name)); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} + +/* + * Copy logical replication slot (2 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin_temp(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * Copy logical replication slot (3 arguments) + * + * note: this wrapper is necessary to pass the sanity check in opr_sanity, + * which checks that all built-in functions that share the implementing C + * function take the same number of arguments + */ +Datum +pg_copy_logical_replication_slot_no_plugin(PG_FUNCTION_ARGS) +{ + return pg_copy_logical_replication_slot(fcinfo); +} + +/* + * SQL function for copying a logical replication slot. + */ +Datum +pg_copy_logical_replication_slot(PG_FUNCTION_ARGS) +{ + Name src_name = PG_GETARG_NAME(0); + Name dst_name = PG_GETARG_NAME(1); + char *plugin; /* optional argment */ + bool temporary; /* optional argment */ + XLogRecPtr confirmed_flush; + XLogRecPtr start_lsn; + TupleDesc tupdesc; + HeapTuple tuple; + Datum values[2]; + bool nulls[2]; + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + /* Acquire the source slot so we own it */ + ReplicationSlotAcquire(NameStr(*src_name), true); + + /* Save some fields before releasing */ + start_lsn = MyReplicationSlot->data.restart_lsn; + plugin = pstrdup(NameStr(MyReplicationSlot->data.plugin)); + temporary = (MyReplicationSlot->data.persistency == RS_TEMPORARY); + + /* Release it */ + ReplicationSlotRelease(); + + /* Check the optional arguments */ + if (PG_NARGS() >= 3) + plugin = NameStr(*(PG_GETARG_NAME(2))); + if (PG_NARGS() >= 4) + temporary = PG_GETARG_BOOL(3); + + confirmed_flush = create_logical_replication_slot(NameStr(*dst_name), + plugin, + temporary, + start_lsn); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = CStringGetTextDatum(NameStr(*dst_name)); + values[1] = LSNGetDatum(confirmed_flush); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_POINTER(HeapTupleGetDatum(tuple)); +} /* * SQL function for dropping a replication slot. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e47ddca..c49f2d2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -918,6 +918,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, + InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 40d54ed..862b89d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -9807,6 +9807,27 @@ proargmodes => '{i,i,i,o,o}', proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, +{ oid => '4005', descr => 'copy up a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name bool', + proallargtypes => '{name,name,name,bool,text,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,plugin,temporary,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot' }, +{ oid => '4006', descr => 'copy up a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name name', + proallargtypes => '{name,name,name,text,pg_lsn}', + proargmodes => '{i,i,i,o,o}', + proargnames => '{src_slot_name,plugin,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin' }, +{ oid => '4007', descr => 'copy up a logical replication slot', + proname => 'pg_copy_logical_replication_slot', provolatile => 'v', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name', + proallargtypes => '{name,name,text,pg_lsn}', + proargmodes => '{i,i,o,o}', + proargnames => '{src_slot_name,dst_slot_name,slot_name,lsn}', + prosrc => 'pg_copy_logical_replication_slot_no_plugin_temp' }, { oid => '3782', descr => 'get changes from replication slot', proname => 'pg_logical_slot_get_changes', procost => '1000', prorows => '1000', provariadic => 'text', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c25ac1f..d1643e5 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -97,6 +97,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, + XLogRecPtr start_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index e3a5fe9..76e91f5 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -7,7 +7,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 16; +use Test::More tests => 17; use Config; # Initialize master node @@ -27,6 +27,11 @@ $node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');] ); +# Copy logical slot +$node_master->safe_psql('postgres', + qq[SELECT pg_copy_logical_replication_slot('test_slot', 'copy_slot', 'test_decoding', false);] +); + $node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;] ); @@ -37,6 +42,12 @@ my ($result) = $node_master->safe_psql('postgres', is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); +# Basic decoding works with copied slot +$result = $node_master->safe_psql('postgres', + qq[SELECT pg_logical_slot_get_changes('copy_slot', NULL, NULL);]); +is(scalar(@foobar = split /^/m, $result), + 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); + # If we immediately crash the server we might lose the progress we just made # and replay the same changes again. But a clean shutdown should never repeat # the same changes when we use the SQL decoding interface. -- 2.10.5