From 9011698ae800e0f45f960e91f6b16eab15634fac Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Tue, 13 Jun 2017 19:26:51 +0200 Subject: [PATCH 1/2] Improve the pg_subscription_rel handling Split the SetSubscriptionRelState into separate Add and Update functions, removing the unsafe upsert logic as callers are supposed to know whether they are updating or adding new row. Reorder the code in the above mentioned functions to avoid "tuple updated concurrently" warnings. --- src/backend/catalog/pg_subscription.c | 131 +++++++++++++++------------- src/backend/commands/subscriptioncmds.c | 14 +-- src/backend/replication/logical/tablesync.c | 33 ++++--- src/include/catalog/pg_subscription_rel.h | 6 +- 4 files changed, 98 insertions(+), 86 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index c5b2541..fd19675 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -225,24 +225,15 @@ textarray_to_stringlist(ArrayType *textarray) } /* - * Set the state of a subscription table. - * - * If update_only is true and the record for given table doesn't exist, do - * nothing. This can be used to avoid inserting a new record that was deleted - * by someone else. Generally, subscription DDL commands should use false, - * workers should use true. - * - * The insert-or-update logic in this function is not concurrency safe so it - * might raise an error in rare circumstances. But if we took a stronger lock - * such as ShareRowExclusiveLock, we would risk more deadlocks. + * Add new state record for a subscription table. */ Oid -SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only) +AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) { Relation rel; HeapTuple tup; - Oid subrelid = InvalidOid; + Oid subrelid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; @@ -252,57 +243,79 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); + if (HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u already exists", + relid, subid); - /* - * If the record for given table does not exist yet create new record, - * otherwise update the existing one. - */ - if (!HeapTupleIsValid(tup) && !update_only) - { - /* Form the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); - values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - - tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - - /* Insert tuple into catalog. */ - subrelid = CatalogTupleInsert(rel, tup); - - heap_freetuple(tup); - } - else if (HeapTupleIsValid(tup)) - { - bool replaces[Natts_pg_subscription_rel]; + /* Form the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - /* Update the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replaces, false, sizeof(replaces)); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + /* Insert tuple into catalog. */ + subrelid = CatalogTupleInsert(rel, tup); - replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + heap_freetuple(tup); - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Cleanup. */ + heap_close(rel, NoLock); - /* Update the catalog. */ - CatalogTupleUpdate(rel, &tup->t_self, tup); + return subrelid; +} - subrelid = HeapTupleGetOid(tup); - } +/* + * Update the state of a subscription table. + */ +Oid +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + HeapTuple tup; + Oid subrelid; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + subrelid = HeapTupleGetOid(tup); /* Cleanup. */ heap_close(rel, NoLock); @@ -377,6 +390,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid) HeapTuple tup; int nkeys = 0; + Assert(OidIsValid(subid) || OidIsValid(relid)); + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); if (OidIsValid(subid)) @@ -400,9 +415,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid) /* Do the search and delete what we found. */ scan = heap_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) - { simple_heap_delete(rel, &tup->t_self); - } heap_endscan(scan); heap_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5aae7b6..14c8f3f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr, false); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); } /* @@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { - SetSubscriptionRelState(sub->oid, relid, + AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr, false); + InvalidXLogRecPtr); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), @@ -906,15 +906,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* Kill the apply worker associated with the subscription. */ + logicalrep_worker_stop(subid, InvalidOid); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3ff08bf..28accda 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -285,11 +285,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -414,9 +413,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; } - SetSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn, true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); } } else @@ -844,11 +843,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); pgstat_report_stat(false); @@ -933,11 +931,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. */ - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_SYNCDONE, - *origin_startpos, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_SYNCDONE, + *origin_startpos); finish_sync_worker(); } break; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f5f6191..e6a2dd5 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -70,8 +70,10 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; -extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only); +extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); +extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 2.7.4