From 7bbf7bc3278b3ad9e6071dca9eb78c8c6b80f4b0 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 4 Mar 2025 16:51:19 +0900 Subject: [PATCH v9 1/2] Introduce a new invalidation message to invalidate caches in output plugins A new invalidation message is generated when either ALTER PUBLICATION RENAME TO or is executed. The primal use-case of the message is to invalidate caches on the logical decoding output plugin. Plugins can register callback functions for the message via CacheRegisterRelSyncCallback(), and the function can invalidate the cache for the specified relation. A new invalidation message is transactional, and decoder processes should recognize the message and invalidate specified caches. Thus, the messages are stored in InvalMessageArray and serialized at the end of the transaction. --- src/backend/access/rmgrdesc/standbydesc.c | 2 + src/backend/utils/cache/inval.c | 127 ++++++++++++++++++++++ src/include/pg_config_manual.h | 8 +- src/include/storage/sinval.h | 11 ++ src/include/utils/inval.h | 10 ++ 5 files changed, 154 insertions(+), 4 deletions(-) diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c index d849f8e54b..81eff5f31c 100644 --- a/src/backend/access/rmgrdesc/standbydesc.c +++ b/src/backend/access/rmgrdesc/standbydesc.c @@ -132,6 +132,8 @@ standby_desc_invalidations(StringInfo buf, appendStringInfo(buf, " relmap db %u", msg->rm.dbId); else if (msg->id == SHAREDINVALSNAPSHOT_ID) appendStringInfo(buf, " snapshot %u", msg->sn.relId); + else if (msg->id == SHAREDINVALRELSYNC_ID) + appendStringInfo(buf, " relsync %u", msg->rs.relid); else appendStringInfo(buf, " unrecognized id %d", msg->id); } diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 700ccb6df9..35df9be5e5 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -271,6 +271,7 @@ int debug_discard_caches = 0; #define MAX_SYSCACHE_CALLBACKS 64 #define MAX_RELCACHE_CALLBACKS 10 +#define MAX_RELSYNC_CALLBACKS 10 static struct SYSCACHECALLBACK { @@ -292,6 +293,15 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; +static struct RELSYNCCALLBACK +{ + RelSyncCallbackFunction function; + Datum arg; +} relsync_callback_list[MAX_RELSYNC_CALLBACKS]; + +static int relsync_callback_count = 0; + + /* ---------------------------------------------------------------- * Invalidation subgroup support functions * ---------------------------------------------------------------- @@ -484,6 +494,34 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group, AddInvalidationMessage(group, RelCacheMsgs, &msg); } +/* + * Add a relsync inval entry + * + * We put these into the relcache subgroup for simplicity. + */ +static void +AddRelsyncInvalidationMessage(InvalidationMsgsGroup *group, + Oid dbId, Oid relId) +{ + SharedInvalidationMessage msg; + + /* Don't add a duplicate item */ + /* We assume dbId need not be checked because it will never change */ + ProcessMessageSubGroup(group, RelCacheMsgs, + if (msg->rc.id == SHAREDINVALRELSYNC_ID && + msg->rc.relId == relId) + return); + + /* OK, add the item */ + msg.rc.id = SHAREDINVALRELSYNC_ID; + msg.rc.dbId = dbId; + msg.rc.relId = relId; + /* check AddCatcacheInvalidationMessage() for an explanation */ + VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); + + AddInvalidationMessage(group, RelCacheMsgs, &msg); +} + /* * Add a snapshot inval entry * @@ -611,6 +649,18 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) info->RelcacheInitFileInval = true; } +/* + * RegisterRelcacheInvalidation + * + * As above, but register a relsync invalidation event. + */ +static void +RegisterRelsyncInvalidation(InvalidationInfo *info, Oid dbId, Oid relId) +{ + AddRelsyncInvalidationMessage(&info->CurrentCmdInvalidMsgs, dbId, relId); +} + + /* * RegisterSnapshotInvalidation * @@ -751,6 +801,13 @@ InvalidateSystemCachesExtended(bool debug_discard) ccitem->function(ccitem->arg, InvalidOid); } + + for (i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, InvalidOid); + } } /* @@ -832,6 +889,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) else if (msg->sn.dbId == MyDatabaseId) InvalidateCatalogSnapshot(); } + else if (msg->id == SHAREDINVALRELSYNC_ID) + { + /* We only care about our own database */ + if (msg->rs.dbId == MyDatabaseId) + CallRelSyncCallbacks(msg->rs.relid); + } else elog(FATAL, "unrecognized SI message ID: %d", msg->id); } @@ -1622,6 +1685,35 @@ CacheInvalidateRelcacheByRelid(Oid relid) } +/* + * RelationCacheInvalidate + * Register invalidation of the cache in logical decoding output plugin + * for a database. + * + * This type of invalidation message is used for the specific purpose of output + * plugins. Processes which do not decode WALs would do nothing even when it + * receives the message. + */ +void +CacheInvalidateRelSync(Oid relid) +{ + RegisterRelsyncInvalidation(PrepareInvalidationState(), + MyDatabaseId, relid); +} + + +/* + * CacheInvalidateRelSyncAll + * Register invalidation of the whole cache in logical decoding output + * plugin. + */ +void +CacheInvalidateRelSyncAll(void) +{ + CacheInvalidateRelSync(InvalidOid); +} + + /* * CacheInvalidateSmgr * Register invalidation of smgr references to a physical relation. @@ -1763,6 +1855,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, ++relcache_callback_count; } +/* + * CacheRegisterRelSyncCallback + * Register the specified function to be called for all future + * decoding-cache invalidation events. + * + * This function is intended to be call from the logical decoding output + * plugins. + */ +void +CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg) +{ + if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS) + elog(FATAL, "out of relsync_callback_list slots"); + + relsync_callback_list[relsync_callback_count].function = func; + relsync_callback_list[relsync_callback_count].arg = arg; + + ++relsync_callback_count; +} + /* * CallSyscacheCallbacks * @@ -1788,6 +1901,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) } } +/* + * CallSyscacheCallbacks + */ +void +CallRelSyncCallbacks(Oid relid) +{ + for (int i = 0; i < relsync_callback_count; i++) + { + struct RELSYNCCALLBACK *ccitem = relsync_callback_list + i; + + ccitem->function(ccitem->arg, relid); + } +} + /* * LogLogicalInvalidations * diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 449e50bd78..23308f1de1 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -282,10 +282,10 @@ /* * For cache-invalidation debugging, define DISCARD_CACHES_ENABLED to enable - * use of the debug_discard_caches GUC to aggressively flush syscache/relcache - * entries whenever it's possible to deliver invalidations. See - * AcceptInvalidationMessages() in src/backend/utils/cache/inval.c for - * details. + * use of the debug_discard_caches GUC to aggressively flush + * syscache/relcache/relsync cache entries whenever it's possible to deliver + * invalidations. See AcceptInvalidationMessages() in + * src/backend/utils/cache/inval.c for details. * * USE_ASSERT_CHECKING builds default to enabling this. It's possible to use * DISCARD_CACHES_ENABLED without a cassert build and the implied diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 2463c0f9fa..90a5af4ed8 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -27,6 +27,7 @@ * * invalidate an smgr cache entry for a specific physical relation * * invalidate the mapped-relation mapping for a given database * * invalidate any saved snapshot that might be used to scan a given relation + * * invalidate a specific entry for specific output plugin * More types could be added if needed. The message type is identified by * the first "int8" field of the message struct. Zero or positive means a * specific-catcache inval message (and also serves as the catcache ID field). @@ -110,6 +111,15 @@ typedef struct Oid relId; /* relation ID */ } SharedInvalSnapshotMsg; +#define SHAREDINVALRELSYNC_ID (-6) + +typedef struct +{ + int8 id; /* type field --- must be first */ + Oid dbId; /* database ID */ + Oid relid; /* relation ID, or 0 if whole relcache */ +} SharedInvalRelSyncMsg; + typedef union { int8 id; /* type field --- must be first */ @@ -119,6 +129,7 @@ typedef union SharedInvalSmgrMsg sm; SharedInvalRelmapMsg rm; SharedInvalSnapshotMsg sn; + SharedInvalRelSyncMsg rs; } SharedInvalidationMessage; diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 40658ba2ff..9b871caef6 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -22,6 +22,7 @@ extern PGDLLIMPORT int debug_discard_caches; typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue); typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid); +typedef void (*RelSyncCallbackFunction) (Datum arg, Oid relid); extern void AcceptInvalidationMessages(void); @@ -55,6 +56,10 @@ extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple); extern void CacheInvalidateRelcacheByRelid(Oid relid); +extern void CacheInvalidateRelSync(Oid relid); + +extern void CacheInvalidateRelSyncAll(void); + extern void CacheInvalidateSmgr(RelFileLocatorBackend rlocator); extern void CacheInvalidateRelmap(Oid databaseId); @@ -66,8 +71,13 @@ extern void CacheRegisterSyscacheCallback(int cacheid, extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg); +extern void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, + Datum arg); + extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); +extern void CallRelSyncCallbacks(Oid relid); + extern void InvalidateSystemCaches(void); extern void InvalidateSystemCachesExtended(bool debug_discard); -- 2.43.5