From ed0512f321bc98dc16a2bc2cc84a919325060685 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 21 Apr 2023 17:14:04 +0900 Subject: [PATCH] Poc: initial table structure synchronization in logical replication. The key idea is to postpone creation of pg_subscription_rel entry to the point where the tablesync worker synchronizes and creates the table on the subscriber. It is a PoC patch so mixed the following changes: - Add oid column to the pg_subscription_rel. - use it as the primary key. - use it in the names of origin and slot the tablesync workers use. - Add copy_schema = on/off option to CREATE SUBSCRIPTION. - not yet support for ALTER SUBSCRIPTION. - Add CRS_EXPORT_USE_SNAPSHOT new action. - to use the same snapshot by walsender AND other processes (e.g. pg_dump). - the snapshot is exported for pg_dump and is used for COPY. --- src/backend/catalog/heap.c | 2 +- src/backend/catalog/pg_subscription.c | 270 +++++++++++++++--- src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 107 +++++-- .../libpqwalreceiver/libpqwalreceiver.c | 6 + .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/launcher.c | 34 +-- src/backend/replication/logical/relation.c | 6 +- src/backend/replication/logical/snapbuild.c | 25 +- src/backend/replication/logical/tablesync.c | 246 ++++++++++++---- src/backend/replication/logical/worker.c | 18 +- src/backend/replication/walsender.c | 11 +- src/backend/utils/cache/syscache.c | 7 +- src/include/catalog/pg_proc.dat | 2 +- src/include/catalog/pg_subscription_rel.h | 39 ++- src/include/replication/snapbuild.h | 2 +- src/include/replication/walsender.h | 3 +- src/include/replication/worker_internal.h | 10 +- src/include/utils/syscache.h | 2 +- 19 files changed, 629 insertions(+), 165 deletions(-) diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 2a0d82aedd..32414c30cf 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1865,7 +1865,7 @@ heap_drop_with_catalog(Oid relid) /* * Remove any associated relation synchronization states. */ - RemoveSubscriptionRel(InvalidOid, relid); + RemoveSubscriptionRel(InvalidOid, relid, InvalidOid); /* * Forget any ON COMMIT action for the rel diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d07f88ce28..b52168ce9b 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -35,6 +35,7 @@ #include "utils/syscache.h" static List *textarray_to_stringlist(ArrayType *textarray); +static SubscriptionRelState *deconstruct_subrelstate(HeapTuple tup); /* * Fetch the subscription from the syscache. @@ -227,8 +228,8 @@ textarray_to_stringlist(ArrayType *textarray) * Add new state record for a subscription table. */ void -AddSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) +AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema, bool syncdata, + XLogRecPtr sublsn, char *nspname, char *relname) { Relation rel; HeapTuple tup; @@ -239,25 +240,40 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); - /* Try finding existing mapping. */ - tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, - ObjectIdGetDatum(relid), - ObjectIdGetDatum(subid)); - if (HeapTupleIsValid(tup)) - elog(ERROR, "subscription table %u in subscription %u already exists", - relid, subid); + /* XXX: existence check */ /* Form the tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_oid - 1] = + GetNewOidWithIndex(rel, SubscriptionRelObjectIdIndexId, + Anum_pg_subscription_rel_oid); values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); - values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + + if (OidIsValid(relid)) + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + else + nulls[Anum_pg_subscription_rel_srrelid - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); if (sublsn != InvalidXLogRecPtr) values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (nspname) + values[Anum_pg_subscription_rel_srnspname - 1] = CStringGetDatum(nspname); + else + nulls[Anum_pg_subscription_rel_srnspname - 1] = true; + + if (relname) + values[Anum_pg_subscription_rel_srrelname - 1] = CStringGetDatum(relname); + else + nulls[Anum_pg_subscription_rel_srrelname - 1] = true; + + values[Anum_pg_subscription_rel_srsyncschema - 1] = BoolGetDatum(syncschema); + values[Anum_pg_subscription_rel_srsyncdata - 1] = BoolGetDatum(syncdata); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -269,11 +285,49 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, table_close(rel, NoLock); } +/* Update srrelid to the given relid */ +void +UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid) +{ + Relation rel; + HeapTuple tup; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid)); + + /* XXX: need to distinguish from message in UpdateSubscriptionRelState() */ + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + subrelid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srrelid - 1] = true; + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + table_close(rel, NoLock); +} + /* * Update the state of a subscription table. */ void -UpdateSubscriptionRelState(Oid subid, Oid relid, char state, +UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state, XLogRecPtr sublsn) { Relation rel; @@ -287,12 +341,10 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); /* Try finding existing mapping. */ - tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, - ObjectIdGetDatum(relid), - ObjectIdGetDatum(subid)); + tup = SearchSysCacheCopy1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "subscription table %u in subscription %u does not exist", - relid, subid); + subrelid, subid); /* Update the tuple. */ memset(values, 0, sizeof(values)); @@ -318,13 +370,76 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, table_close(rel, NoLock); } +/* + * Similar to GetSubscriptionRelState, but search the entry by the relid. + * + * XXX: Note that we cannot use syscache for searching the pg_subscription_rel entry + * by (srsubid, srrelid) because cache key columns should always be not NULL (see + * CatalogCacheInitializeCache() for details). + * + * XXX: duplicated with GetSubscriptionRelState(). + */ +char +GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn) +{ + Relation rel; + ScanKeyData skey[2]; + SysScanDesc scan; + HeapTuple tup; + char substate; + Datum d; + bool isnull; + + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[0], + Anum_pg_subscription_rel_srrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relid)); + ScanKeyInit(&skey[1], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + scan = systable_beginscan(rel, SubscriptionRelSrrelidSrsubidIndexId, true, + NULL, 2, skey); + + tup = systable_getnext(scan); + + + if (!HeapTupleIsValid(tup)) + { + systable_endscan(scan); + table_close(rel, AccessShareLock); + *sublsn = InvalidXLogRecPtr; + return SUBREL_STATE_UNKNOWN; + } + + /* Get the state. */ + substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate; + + /* Get the LSN */ + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, + Anum_pg_subscription_rel_srsublsn, &isnull); + if (isnull) + *sublsn = InvalidXLogRecPtr; + else + *sublsn = DatumGetLSN(d); + + /* Cleanup */ + systable_endscan(scan); + table_close(rel, AccessShareLock); + + return substate; +} + /* * Get state of subscription table. * * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription. */ char -GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) +GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn) { HeapTuple tup; char substate; @@ -339,9 +454,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) rel = table_open(SubscriptionRelRelationId, AccessShareLock); /* Try finding the mapping. */ - tup = SearchSysCache2(SUBSCRIPTIONRELMAP, - ObjectIdGetDatum(relid), - ObjectIdGetDatum(subid)); + tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid)); if (!HeapTupleIsValid(tup)) { @@ -354,7 +467,7 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate; /* Get the LSN */ - d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, Anum_pg_subscription_rel_srsublsn, &isnull); if (isnull) *sublsn = InvalidXLogRecPtr; @@ -369,16 +482,105 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) return substate; } +/* Get palloc'ed SubscriptionRelState of the given subrelid */ +SubscriptionRelState * +GetSubscriptionRelByOid(Oid subrelid) +{ + Relation rel; + HeapTuple tup; + SubscriptionRelState *relstate; + + /* + * This is to avoid the race condition with AlterSubscription which tries + * to remove this relstate. + */ + rel = table_open(SubscriptionRelRelationId, AccessShareLock); + + tup = SearchSysCache1(SUBSCRIPTIONRELOID, ObjectIdGetDatum(subrelid)); + + if (!HeapTupleIsValid(tup)) + { + table_close(rel, AccessShareLock); + return NULL; + } + + relstate = deconstruct_subrelstate(tup); + + /* Cleanup */ + ReleaseSysCache(tup); + table_close(rel, AccessShareLock); + + return relstate; +} + +/* + * Extract subscription relation state information from the heap tuple and return palloc'ed + * SubscriptionRelState. + */ +static SubscriptionRelState * +deconstruct_subrelstate(HeapTuple tup) +{ + Form_pg_subscription_rel subrel_form; + SubscriptionRelState *relstate; + Datum d; + bool isnull; + + subrel_form = (Form_pg_subscription_rel) GETSTRUCT(tup); + + relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); + relstate->oid = subrel_form->oid; + + /* Get the LSN */ + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, + Anum_pg_subscription_rel_srrelid, &isnull); + if (isnull) + relstate->relid = InvalidOid; + else + relstate->relid = DatumGetObjectId(d); + + /* Get the LSN */ + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, + Anum_pg_subscription_rel_srsublsn, &isnull); + if (isnull) + relstate->lsn = InvalidXLogRecPtr; + else + relstate->lsn = DatumGetLSN(d); + + relstate->state = subrel_form->srsubstate; + + /* srnspname */ + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, + Anum_pg_subscription_rel_srnspname, &isnull); + if (isnull) + relstate->nspname = NULL; + else + relstate->nspname = pstrdup(NameStr(*DatumGetName(d))); + + /* srrelname */ + d = SysCacheGetAttr(SUBSCRIPTIONRELOID, tup, + Anum_pg_subscription_rel_srrelname, &isnull); + if (isnull) + relstate->relname = NULL; + else + relstate->relname = pstrdup(NameStr(*DatumGetName(d))); + + /* syncflags */ + relstate->syncflags = + (((subrel_form->srsyncschema) ? SUBREL_SYNC_KIND_SCHEMA : 0) | + ((subrel_form->srsyncdata) ? SUBREL_SYNC_KIND_DATA : 0)); + + return relstate; +} /* * Drop subscription relation mapping. These can be for a particular * subscription, or for a particular relation, or both. */ void -RemoveSubscriptionRel(Oid subid, Oid relid) +RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid) { Relation rel; TableScanDesc scan; - ScanKeyData skey[2]; + ScanKeyData skey[3]; HeapTuple tup; int nkeys = 0; @@ -402,6 +604,15 @@ RemoveSubscriptionRel(Oid subid, Oid relid) ObjectIdGetDatum(relid)); } + if (OidIsValid(subrelid)) + { + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_oid, + BTEqualStrategyNumber, + F_OIDEQ, + ObjectIdGetDatum(subrelid)); + } + /* Do the search and delete what we found. */ scan = table_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) @@ -511,22 +722,9 @@ GetSubscriptionRelations(Oid subid, bool not_ready) while (HeapTupleIsValid(tup = systable_getnext(scan))) { - Form_pg_subscription_rel subrel; SubscriptionRelState *relstate; - Datum d; - bool isnull; - - subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); - relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState)); - relstate->relid = subrel->srrelid; - relstate->state = subrel->srsubstate; - d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, - Anum_pg_subscription_rel_srsublsn, &isnull); - if (isnull) - relstate->lsn = InvalidXLogRecPtr; - else - relstate->lsn = DatumGetLSN(d); + relstate = deconstruct_subrelstate(tup); res = lappend(res, relstate); } diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 48aacf66ee..de9988e72e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -951,7 +951,7 @@ CREATE VIEW pg_stat_subscription AS su.subname, st.pid, st.leader_pid, - st.relid, + st.subrelid, st.received_lsn, st.last_msg_send_time, st.last_msg_receipt_time, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 56eafbff10..a08412fb11 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_COPY_SCHEMA 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -88,6 +89,8 @@ typedef struct SubOpts bool enabled; bool create_slot; bool copy_data; + /* XXX: want to choose synchronizing only tables or all objects? */ + bool copy_schema; bool refresh; bool binary; char streaming; @@ -141,6 +144,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->create_slot = true; if (IsSet(supported_opts, SUBOPT_COPY_DATA)) opts->copy_data = true; + if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA)) + opts->copy_data = true; if (IsSet(supported_opts, SUBOPT_REFRESH)) opts->refresh = true; if (IsSet(supported_opts, SUBOPT_BINARY)) @@ -214,6 +219,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_COPY_DATA; opts->copy_data = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_COPY_SCHEMA) && + strcmp(defel->defname, "copy_schema") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_COPY_SCHEMA; + opts->copy_schema = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) && strcmp(defel->defname, "synchronous_commit") == 0) { @@ -388,10 +402,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("%s and %s are mutually exclusive options", "connect = false", "copy_data = true"))); + if (opts->copy_schema && + IsSet(opts->specified_opts, SUBOPT_COPY_SCHEMA)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s are mutually exclusive options", + "connect = false", "copy_schema = true"))); + /* Change the defaults of other options. */ opts->enabled = false; opts->create_slot = false; opts->copy_data = false; + opts->copy_schema = false; } /* @@ -587,7 +609,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Connection and publication should not be specified here. */ supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT | - SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | + SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_COPY_SCHEMA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | @@ -752,7 +774,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Set sync state based on if we were asked to do data copy or * not. */ - table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + if (opts.copy_data || opts.copy_schema) + table_state = SUBREL_STATE_INIT; + else + table_state = SUBREL_STATE_READY; /* * Get the table list from publisher and build local table status @@ -762,16 +787,27 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; + Oid relid = InvalidOid; + char *nspname, *relname = NULL; - relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (opts.copy_schema) + { + nspname = rv->schemaname; + relname = rv->relname; + } + else + { + /* The relation should already be present on the subscriber */ + relid = RangeVarGetRelid(rv, AccessShareLock, false); - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + } - AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + AddSubscriptionRelState(subid, relid, table_state, opts.copy_schema, + opts.copy_data, InvalidXLogRecPtr, nspname, + relname); } /* @@ -898,7 +934,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); - subrel_local_oids[off++] = relstate->relid; + subrel_local_oids[off++] = relstate->oid; } qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); @@ -939,9 +975,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (!bsearch(&relid, subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp)) { + /* XXX: support sync schema */ AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); + false, copy_data, + InvalidXLogRecPtr, NULL, NULL); ereport(DEBUG1, (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"", rv->schemaname, rv->relname, sub->name))); @@ -958,13 +996,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, remove_rel_len = 0; for (off = 0; off < subrel_count; off++) { - Oid relid = subrel_local_oids[off]; + Oid subrelid = subrel_local_oids[off]; - if (!bsearch(&relid, pubrel_local_oids, + if (!bsearch(&subrelid, pubrel_local_oids, list_length(pubrel_names), sizeof(Oid), oid_cmp)) { + SubscriptionRelState *relstate; char state; - XLogRecPtr statelsn; /* * Lock pg_subscription_rel with AccessExclusiveLock to @@ -984,14 +1022,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); /* Last known rel state. */ - state = GetSubscriptionRelState(sub->oid, relid, &statelsn); + relstate = GetSubscriptionRelByOid(subrelid); - sub_remove_rels[remove_rel_len].relid = relid; + state = relstate->state; + + sub_remove_rels[remove_rel_len].relid = subrelid; sub_remove_rels[remove_rel_len++].state = state; - RemoveSubscriptionRel(sub->oid, relid); + RemoveSubscriptionRel(InvalidOid, InvalidOid, subrelid); - logicalrep_worker_stop(sub->oid, relid); + logicalrep_worker_stop(sub->oid, subrelid); /* * For READY state, we would have already dropped the @@ -1011,16 +1051,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * origin and by this time the origin might be already * removed. For these reasons, passing missing_ok = true. */ - ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, + ReplicationOriginNameForLogicalRep(sub->oid, subrelid, originname, sizeof(originname)); replorigin_drop_by_name(originname, true, false); } - ereport(DEBUG1, - (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", - get_namespace_name(get_rel_namespace(relid)), - get_rel_name(relid), - sub->name))); + if (OidIsValid(relstate->relid)) + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relstate->relid)), + get_rel_name(relstate->relid), + sub->name))); + else + ereport(DEBUG1, + (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"", + relstate->nspname, relstate->relname, + sub->name))); } } @@ -1250,6 +1296,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, case ALTER_SUBSCRIPTION_SET_PUBLICATION: { + /* + * XXX support SET PUBLICATION WITH (copy_schema = xx) + */ supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1297,6 +1346,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, List *publist; bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION; + /* + * XXX support ADD/DROP PUBLICATION WITH (copy_schema = xx) + */ supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA; parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1357,6 +1409,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + /* + * XXX support REFRESH PUBLICATION WITH (copy_schema = xx) + */ parse_subscription_options(pstate, stmt->options, SUBOPT_COPY_DATA, &opts); @@ -1589,7 +1644,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); - logicalrep_worker_stop(w->subid, w->relid); + logicalrep_worker_stop(w->subid, w->subrelid); } list_free(subworkers); @@ -1638,7 +1693,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ - RemoveSubscriptionRel(subid, InvalidOid); + RemoveSubscriptionRel(subid, InvalidOid, InvalidOid); /* Remove the origin tracking if exists. */ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 052505e46f..8bc5bbe935 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -927,6 +927,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, case CRS_USE_SNAPSHOT: appendStringInfoString(&cmd, "SNAPSHOT 'use'"); break; + case CRS_EXPORT_USE_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'export-use'"); + break; } } else @@ -942,6 +945,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, case CRS_USE_SNAPSHOT: appendStringInfoString(&cmd, "USE_SNAPSHOT"); break; + case CRS_EXPORT_USE_SNAPSHOT: + elog(ERROR, "XXX CREATE_REPLICATION_SLOT ... EXPORT_USE_SNAPSHOT is not supported yet"); + break; } } diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 4518683779..489205436d 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -958,7 +958,7 @@ ParallelApplyWorkerMain(Datum main_arg) * Setup callback for syscache so that we know when something changes in * the subscription relation state. */ - CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID, invalidate_syncing_table_states, (Datum) 0); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 970d170e73..9e0291d9d9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -241,12 +241,12 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id and relid. + * subscription id and subrelid. * * We are only interested in the leader apply worker or table sync worker. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(Oid subid, Oid subrelid, bool only_running) { int i; LogicalRepWorker *res = NULL; @@ -262,7 +262,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && + if (w->in_use && w->subid == subid && w->subrelid == subrelid && (!only_running || w->proc)) { res = w; @@ -304,7 +304,7 @@ logicalrep_workers_find(Oid subid, bool only_running) */ bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid subrelid, dsm_handle subworker_dsm) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -318,7 +318,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); /* Sanity check - tablesync worker cannot be a subworker */ - Assert(!(is_parallel_apply_worker && OidIsValid(relid))); + Assert(!(is_parallel_apply_worker && OidIsValid(subrelid))); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -393,7 +393,7 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + if (OidIsValid(subrelid) && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -434,7 +434,8 @@ retry: worker->dbid = dbid; worker->userid = userid; worker->subid = subid; - worker->relid = relid; + worker->subrelid = subrelid; + worker->relid = InvalidOid; /* will be filled by the tablesync worker */ worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; worker->stream_fileset = NULL; @@ -463,9 +464,9 @@ retry: else snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - if (OidIsValid(relid)) + if (OidIsValid(subrelid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); + "logical replication worker for subscription %u sync %u", subid, subrelid); else if (is_parallel_apply_worker) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication parallel apply worker for subscription %u", subid); @@ -591,13 +592,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(Oid subid, Oid subrelid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, false); + worker = logicalrep_worker_find(subid, subrelid, false); if (worker) { @@ -640,13 +641,13 @@ logicalrep_pa_worker_stop(int slot_no, uint16 generation) * Wake up (using latch) any logical replication worker for specified sub/rel. */ void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(Oid subid, Oid subrelid) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, true); + worker = logicalrep_worker_find(subid, subrelid, true); if (worker) logicalrep_worker_wakeup_ptr(worker); @@ -760,6 +761,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->userid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; + worker->subrelid = InvalidOid; worker->leader_pid = InvalidPid; worker->parallel_apply = false; } @@ -820,7 +822,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && OidIsValid(w->relid)) + if (w->subid == subid && OidIsValid(w->subrelid)) res++; } @@ -1263,8 +1265,8 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (OidIsValid(worker.relid)) - values[1] = ObjectIdGetDatum(worker.relid); + if (OidIsValid(worker.subrelid)) + values[1] = ObjectIdGetDatum(worker.subrelid); else nulls[1] = true; values[2] = Int32GetDatum(worker_pid); diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 57ad22b48a..78c91f6f36 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -457,9 +457,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) } if (entry->state != SUBREL_STATE_READY) - entry->state = GetSubscriptionRelState(MySubscription->oid, - entry->localreloid, - &entry->statelsn); + entry->state = GetSubscriptoinRelStateByRelid(MySubscription->oid, + entry->localreloid, + &entry->statelsn); return entry; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 62542827e4..d4e1738df6 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -277,6 +277,7 @@ struct SnapBuild */ static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; +static bool UsingExportedSnapshot = false; /* ->committed and ->catchange manipulation */ static void SnapBuildPurgeOlderTxn(SnapBuild *builder); @@ -661,12 +662,12 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * sure the xmin horizon hasn't advanced since then. */ const char * -SnapBuildExportSnapshot(SnapBuild *builder) +SnapBuildExportSnapshot(SnapBuild *builder, bool use_it) { Snapshot snap; char *snapname; - if (IsTransactionOrTransactionBlock()) + if (!use_it && IsTransactionOrTransactionBlock()) elog(ERROR, "cannot export a snapshot from within a transaction"); if (SavedResourceOwnerDuringExport) @@ -674,15 +675,24 @@ SnapBuildExportSnapshot(SnapBuild *builder) SavedResourceOwnerDuringExport = CurrentResourceOwner; ExportInProgress = true; + UsingExportedSnapshot = use_it; - StartTransactionCommand(); + if (!use_it) + { + StartTransactionCommand(); - /* There doesn't seem to a nice API to set these */ - XactIsoLevel = XACT_REPEATABLE_READ; - XactReadOnly = true; + /* There doesn't seem to a nice API to set these */ + XactIsoLevel = XACT_REPEATABLE_READ; + XactReadOnly = true; + } + else + Assert(IsTransactionBlock()); snap = SnapBuildInitialSnapshot(builder); + if (use_it) + RestoreTransactionSnapshot(snap, MyProc); + /* * now that we've built a plain snapshot, make it active and use the * normal mechanisms for exporting it @@ -727,7 +737,7 @@ SnapBuildClearExportedSnapshot(void) ResourceOwner tmpResOwner; /* nothing exported, that is the usual case */ - if (!ExportInProgress) + if (!ExportInProgress || UsingExportedSnapshot) return; if (!IsTransactionState()) @@ -753,6 +763,7 @@ SnapBuildResetExportedSnapshotState(void) { SavedResourceOwnerDuringExport = NULL; ExportInProgress = false; + UsingExportedSnapshot = false; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 0c71ae9ba7..6710d4bbcc 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -97,9 +97,12 @@ #include "access/table.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "executor/spi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -127,6 +130,8 @@ static bool FetchTableStates(bool *started_tx); static StringInfo copybuf = NULL; +static Oid synchronize_table_schema(char *nspname, char *relname, char *snapshot_name); + /* * Exit routine for synchronization worker. */ @@ -306,7 +311,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); @@ -324,7 +329,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * able to rollback dropped slot. */ ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, syncslotname, sizeof(syncslotname)); @@ -495,7 +500,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Update the state to READY only after the origin cleanup. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, + rstate->oid, rstate->state, rstate->lsn); } } @@ -509,7 +514,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, - rstate->relid, false); + rstate->oid, false); if (syncworker) { @@ -578,7 +583,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) struct tablesync_start_time_mapping *hentry; bool found; - hentry = hash_search(last_start_times, &rstate->relid, + hentry = hash_search(last_start_times, &rstate->oid, HASH_ENTER, &found); if (!found || @@ -589,7 +594,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, - rstate->relid, + rstate->oid, DSM_HANDLE_INVALID); hentry->last_start_time = now; } @@ -1225,11 +1230,11 @@ copy_table(Relation rel) * had changed. */ void -ReplicationSlotNameForTablesync(Oid suboid, Oid relid, +ReplicationSlotNameForTablesync(Oid suboid, Oid subrelid, char *syncslotname, Size szslot) { snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid, - relid, GetSystemIdentifier()); + subrelid, GetSystemIdentifier()); } /* @@ -1245,32 +1250,50 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { char *slotname; char *err; - char relstate; - XLogRecPtr relstate_lsn; + char *snapshot; Relation rel; AclResult aclresult; + SubscriptionRelState *relstate; + CRSSnapshotAction action; WalRcvExecResult *res; char originname[NAMEDATALEN]; + char nspname[NAMEDATALEN]; + char relname[NAMEDATALEN]; + uint32 syncflags; RepOriginId originid; bool must_use_password; + bool set_snapshot = false; - /* Check the state of the table synchronization. */ + /* + * Fetch the state of the table synchronization. + * + * XXX: Currently we copy some relation state data from relstate + * before freeing the memory at the commit. Probably we need to find + * a better way. + */ StartTransactionCommand(); - relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - &relstate_lsn); - CommitTransactionCommand(); + relstate = GetSubscriptionRelByOid(MyLogicalRepWorker->subrelid); SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->relstate = relstate; - MyLogicalRepWorker->relstate_lsn = relstate_lsn; + MyLogicalRepWorker->relstate = relstate->state; + MyLogicalRepWorker->relstate_lsn = relstate->lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); + if (relstate->nspname) + strlcpy(nspname, relstate->nspname, NAMEDATALEN); + + if (relstate->relname) + strlcpy(relname, relstate->relname, NAMEDATALEN); + + syncflags = relstate->syncflags; + + CommitTransactionCommand(); + /* * If synchronization is already done or no longer necessary, exit now * that we've updated shared memory state. */ - switch (relstate) + switch (MyLogicalRepWorker->relstate) { case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: @@ -1281,7 +1304,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Calculate the name of the tablesync slot. */ slotname = (char *) palloc(NAMEDATALEN); ReplicationSlotNameForTablesync(MySubscription->oid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, slotname, NAMEDATALEN); @@ -1309,7 +1332,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Assign the origin tracking record name. */ ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, originname, sizeof(originname)); @@ -1358,7 +1381,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); @@ -1366,6 +1389,62 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) StartTransactionCommand(); + /* + * Start a transaction in the remote node in REPEATABLE READ mode. This + * ensures that both the replication slot we create (see below) and the + * COPY are consistent with each other. + */ + res = walrcv_exec(LogRepWorkerWalRcvConn, + "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", + 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not start transaction on publisher: %s", + res->err))); + walrcv_clear_result(res); + + if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0) + action = CRS_EXPORT_USE_SNAPSHOT; + else + action = CRS_USE_SNAPSHOT; + + /* + * Create a new permanent logical decoding slot. This slot will be used + * for the catchup phase after COPY is done, so tell it to use the + * snapshot to make the final data consistent. + */ + snapshot = walrcv_create_slot(LogRepWorkerWalRcvConn, + slotname, false /* permanent */ , false /* two_phase */ , + action, origin_startpos); + + if ((syncflags & SUBREL_SYNC_KIND_SCHEMA) != 0) + { + Oid newrelid; + + Assert(snapshot); + + set_snapshot = true; + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Create the empty table */ + newrelid = synchronize_table_schema(nspname, relname, snapshot); + + /* Update the srrelid of the catalog */ + UpdateSubscriptionRelRelid(MyLogicalRepWorker->subid, MyLogicalRepWorker->subrelid, + newrelid); + MyLogicalRepWorker->relid = newrelid; + + /* + * XXX: Currently, schema sync has to be performed in the same transaction + * as we do COPY (i.e. initial data sync). It might be preferable that we + * do schema sync in a separate transaction so that we can continue from + * the initial data sync even in case where we failed between them. To do + * that, probably we need a new relstate (say) SUBREL_STATE_SCHEMASYNC. + */ + CommandCounterIncrement(); + } + /* * Use a standard write lock here. It might be better to disallow access * to the table while it's being synchronized. But we don't want to block @@ -1399,30 +1478,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) GetUserNameFromId(GetUserId(), true), RelationGetRelationName(rel)))); - /* - * Start a transaction in the remote node in REPEATABLE READ mode. This - * ensures that both the replication slot we create (see below) and the - * COPY are consistent with each other. - */ - res = walrcv_exec(LogRepWorkerWalRcvConn, - "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", - 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("table copy could not start transaction on publisher: %s", - res->err))); - walrcv_clear_result(res); - - /* - * Create a new permanent logical decoding slot. This slot will be used - * for the catchup phase after COPY is done, so tell it to use the - * snapshot to make the final data consistent. - */ - walrcv_create_slot(LogRepWorkerWalRcvConn, - slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); - /* * Setup replication origin tracking. The purpose of doing this before the * copy is to avoid doing the copy again due to any error in setting up @@ -1457,8 +1512,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) } /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); + if ((syncflags & SUBREL_SYNC_KIND_DATA) != 0) + { + if (!set_snapshot) + PushActiveSnapshot(GetTransactionSnapshot()); + + copy_table(rel); + } + PopActiveSnapshot(); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); @@ -1479,7 +1540,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * visible to others. */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, + MyLogicalRepWorker->subrelid, SUBREL_STATE_FINISHEDCOPY, MyLogicalRepWorker->relstate_lsn); @@ -1637,3 +1698,92 @@ UpdateTwoPhaseState(Oid suboid, char new_state) heap_freetuple(tup); table_close(rel, RowExclusiveLock); } + +/* + * Fetch the given table definition using pg_dump and restore it. snapshot_name is + * the name of the snapshot exported on the publisher. Return the oid of the newly + * created table. + */ +static Oid +synchronize_table_schema(char *nspname, char *relname, char *snapshot_name) +{ + FILE *handle; + Oid relid; + Oid nspoid; + StringInfoData command; + StringInfoData querybuf; + char full_path[MAXPGPATH]; + char buf[1024]; + int ret; + + if (find_my_exec("pg_dump", full_path) < 0) + elog(ERROR, "\"%s\" was not found", "pg_dump"); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* Create namespace if not exist */ + nspoid = get_namespace_oid(nspname, true); + if (!OidIsValid(nspoid)) + { + /* XXX who should be the owner of the new schema? */ + nspoid = NamespaceCreate(nspname, GetUserId(), false); + CommandCounterIncrement(); + } + + /* Construct pg_dump command */ + initStringInfo(&command); + appendStringInfo(&command, "%s -Fp --schema-only -U %s -d \"%s\" --snapshot=%s -f - -t %s.%s", + full_path, GetUserNameFromId(GetUserId(), true), + MySubscription->conninfo, snapshot_name, nspname, relname); + elog(LOG, "XXX pg_dump command \"%s\"", command.data); + + /* + * Execute the pg_dump command. + * + * XXX what if the table already doesn't exist? + */ + PG_TRY(); + { + /* + * XXX: Currently we execute the pg_dump command with a pipe, but with this way + * we cannot handle the command failure. So probably we should dump schema to + * the file and perform DDL while reading the dump file. + */ + handle = OpenPipeStream(command.data, "r"); + if (handle == NULL) + elog(ERROR, "command \"%s\" failed", command.data); + + initStringInfo(&querybuf); + while (fgets(buf, sizeof(buf), handle)) + { + appendStringInfoString(&querybuf, buf); + + if (buf[strlen(buf) - 2] != ';') + continue; + + ret = SPI_exec(querybuf.data, 0); + if (ret != SPI_OK_UTILITY && ret != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed %d: %s", ret, querybuf.data); + + resetStringInfo(&querybuf); + } + } + PG_FINALLY(); + { + ClosePipeStream(handle); + } + PG_END_TRY(); + + CommandCounterIncrement(); + + /* Close SPI context */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + relid = get_relname_relid(relname, nspoid); + Assert(OidIsValid(relid)); + + return relid; +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 37bb884127..ff81c65aa2 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4502,10 +4502,18 @@ InitializeApplyWorker(void) (Datum) 0); if (am_tablesync_worker()) - ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + { + if (OidIsValid(MyLogicalRepWorker->relid)) + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); + else + ereport(LOG, + (errmsg("logical replication table synchronization worker for subscription \"%s\", relid %u has started", + MySubscription->name, + MyLogicalRepWorker->subrelid))); + } else ereport(LOG, /* translator: first %s is the name of logical replication worker */ @@ -4621,7 +4629,7 @@ ApplyWorkerMain(Datum main_arg) * Setup callback for syscache so that we know when something changes in * the subscription relation state. */ - CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID, invalidate_syncing_table_states, (Datum) 0); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 45b8b3684f..7f959d2765 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1004,6 +1004,8 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, *snapshot_action = CRS_NOEXPORT_SNAPSHOT; else if (strcmp(action, "use") == 0) *snapshot_action = CRS_USE_SNAPSHOT; + else if (strcmp(action, "export-use") == 0) + *snapshot_action = CRS_EXPORT_USE_SNAPSHOT; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1097,7 +1099,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) need_full_snapshot = true; } - else if (snapshot_action == CRS_USE_SNAPSHOT) + else if (snapshot_action == CRS_USE_SNAPSHOT || + snapshot_action == CRS_EXPORT_USE_SNAPSHOT) { if (!IsTransactionBlock()) ereport(ERROR, @@ -1158,9 +1161,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * snapshot when doing this. */ if (snapshot_action == CRS_EXPORT_SNAPSHOT) - { - snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); - } + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, false); + else if (snapshot_action == CRS_EXPORT_USE_SNAPSHOT) + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder, true); else if (snapshot_action == CRS_USE_SNAPSHOT) { Snapshot snap; diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index 4e4a34bde8..a975d169fe 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -565,11 +565,10 @@ static const struct cachedesc cacheinfo[] = { KEY(Anum_pg_subscription_oid), 4 }, - [SUBSCRIPTIONRELMAP] = { + [SUBSCRIPTIONRELOID] = { SubscriptionRelRelationId, - SubscriptionRelSrrelidSrsubidIndexId, - KEY(Anum_pg_subscription_rel_srrelid, - Anum_pg_subscription_rel_srsubid), + SubscriptionRelObjectIdIndexId, + KEY(Anum_pg_subscription_rel_oid), 64 }, [TABLESPACEOID] = { diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b2bc81b15f..3600b4d7d5 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5493,7 +5493,7 @@ prorettype => 'record', proargtypes => 'oid', proallargtypes => '{oid,oid,oid,int4,int4,pg_lsn,timestamptz,timestamptz,pg_lsn,timestamptz}', proargmodes => '{i,o,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', + proargnames => '{subid,subid,subrelid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', prosrc => 'pg_stat_get_subscription' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 60a2bcca23..530c12f148 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -30,16 +30,28 @@ */ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId) { + Oid oid; /* Oid */ Oid srsubid BKI_LOOKUP(pg_subscription); /* Oid of subscription */ - Oid srrelid BKI_LOOKUP(pg_class); /* Oid of relation */ char srsubstate; /* state of the relation in subscription */ + /* + * schema name and table name used only when the table is not created + * on the subscriber yet. + */ + NameData srnspname BKI_FORCE_NULL; + NameData srrelname BKI_FORCE_NULL; + + /* What part do we need to synchronize? */ + bool srsyncschema; + bool srsyncdata; + /* * Although srsublsn is a fixed-width type, it is allowed to be NULL, so * we prevent direct C code access to it just as for a varlena field. */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ + Oid srrelid BKI_FORCE_NULL; XLogRecPtr srsublsn BKI_FORCE_NULL; /* remote LSN of the state change * used for synchronization * coordination, or NULL if not @@ -49,7 +61,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId) typedef FormData_pg_subscription_rel *Form_pg_subscription_rel; -DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops)); +DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_oid_index, 9161, SubscriptionRelObjectIdIndexId, on pg_subscription_rel using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, SubscriptionRelSrrelidSrsubidIndexId, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops)); #ifdef EXPOSE_TO_CLIENT_CODE @@ -73,19 +86,31 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_subscription_rel_srrelid_srsubid_index, 6117, Subsc #endif /* EXPOSE_TO_CLIENT_CODE */ +#define SUBREL_SYNC_KIND_SCHEMA 0x01 +#define SUBREL_SYNC_KIND_DATA 0x02 + typedef struct SubscriptionRelState { + Oid oid; Oid relid; XLogRecPtr lsn; char state; + + char *nspname; + char *relname; + uint32 syncflags; /* OR of SUBREL_SYNC_KIND_XXX */ } SubscriptionRelState; -extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); -extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, +extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, bool syncschema, + bool syncdata, XLogRecPtr sublsn, char *nspname, + char *relname); +extern void UpdateSubscriptionRelRelid(Oid subid, Oid subrelid, Oid relid); +extern void UpdateSubscriptionRelState(Oid subid, Oid subrelid, char state, XLogRecPtr sublsn); -extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); -extern void RemoveSubscriptionRel(Oid subid, Oid relid); +extern char GetSubscriptionRelState(Oid subid, Oid subrelid, XLogRecPtr *sublsn); +extern char GetSubscriptoinRelStateByRelid(Oid subid, Oid relid, XLogRecPtr *sublsn); +extern SubscriptionRelState *GetSubscriptionRelByOid(Oid subrelid); +extern void RemoveSubscriptionRel(Oid subid, Oid relid, Oid subrelid); extern bool HasSubscriptionRelations(Oid subid); extern List *GetSubscriptionRelations(Oid subid, bool not_ready); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index f49b941b53..de43703443 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -68,7 +68,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); -extern const char *SnapBuildExportSnapshot(SnapBuild *builder); +extern const char *SnapBuildExportSnapshot(SnapBuild *builder, bool use_it); extern void SnapBuildClearExportedSnapshot(void); extern void SnapBuildResetExportedSnapshotState(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 9df7e50f94..b48f1c0d2a 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -21,7 +21,8 @@ typedef enum { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, - CRS_USE_SNAPSHOT + CRS_USE_SNAPSHOT, + CRS_EXPORT_USE_SNAPSHOT } CRSSnapshotAction; /* global state */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index dce71d2c50..8f982d4a0d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -50,7 +50,13 @@ typedef struct LogicalRepWorker /* Subscription id for the worker. */ Oid subid; - /* Used for initial table synchronization. */ + /* + * Used for initial table synchronization. + * + * relid is an invalid oid if the table is not created on the subscriber + * yet. + */ + Oid subrelid; Oid relid; char relstate; XLogRecPtr relstate_lsn; @@ -308,7 +314,7 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, static inline bool am_tablesync_worker(void) { - return OidIsValid(MyLogicalRepWorker->relid); + return OidIsValid(MyLogicalRepWorker->subrelid); } static inline bool diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index 67ea6e4945..1c08ff3c92 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -97,7 +97,7 @@ enum SysCacheIdentifier STATRELATTINH, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, - SUBSCRIPTIONRELMAP, + SUBSCRIPTIONRELOID, TABLESPACEOID, TRFOID, TRFTYPELANG, -- 2.31.1