From 94fd46c9ef30ba5e8ac1a8873fce577a4be425f4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 29 Jan 2024 22:57:49 +0200 Subject: [PATCH v8 3/5] Replace 'lastBackend' with an array of in-use slots Now that the procState array is indexed by pgprocno, the 'lastBackend' optimization is useless, because aux processes are assigned PGPROC slots and hence have numbers higher than max_connection. So 'lastBackend' was always set to almost the end of the array. To replace that optimization, mantain a dense array of in-use indexes. This's redundant with ProgGlobal->procarray, but I was afraid of adding any more contention to ProcArrayLock, and this keeps the code isolated to sinvaladt.c too. It's not clear if we need that optimization at all. I was able to write a test case that become slower without this: set max_connections to a very high number (> 5000), and create+truncate a table in the same transaction thousands of times to send invalidation messages, with fsync=off. That became about 20% slower on my laptop. Arguably that's so unrealistic that it doesn't matter, but nevertheless, this commit restores the performance of that. --- src/backend/storage/ipc/sinvaladt.c | 58 +++++++++++++++++------------ 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 8105717c578..4d44b90e7b8 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -171,7 +171,6 @@ typedef struct SISeg int minMsgNum; /* oldest message still needed */ int maxMsgNum; /* next message number to be assigned */ int nextThreshold; /* # of messages to call SICleanupQueue */ - int lastBackend; /* index of last active procState entry, +1 */ slock_t msgnumLock; /* spinlock protecting maxMsgNum */ @@ -181,8 +180,14 @@ typedef struct SISeg SharedInvalidationMessage buffer[MAXNUMMESSAGES]; /* - * Per-backend invalidation state info (has NumProcStateSlots entries). + * Per-backend invalidation state info. + * + * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno. + * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is + * a dense array of their indexes, to speed up scanning all in-use slots. */ + int numProcs; + int *pgprocnos; ProcState procState[FLEXIBLE_ARRAY_MEMBER]; } SISeg; @@ -210,7 +215,8 @@ SInvalShmemSize(void) Size size; size = offsetof(SISeg, procState); - size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); + size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */ + size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */ return size; } @@ -235,7 +241,6 @@ CreateSharedInvalidationState(void) shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; - shmInvalBuffer->lastBackend = 0; SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ @@ -250,6 +255,8 @@ CreateSharedInvalidationState(void) shmInvalBuffer->procState[i].hasMessages = false; shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; } + shmInvalBuffer->numProcs = 0; + shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i]; } /* @@ -262,13 +269,15 @@ SharedInvalBackendInit(bool sendOnly) ProcState *stateP; pid_t oldPid; SISeg *segP = shmInvalBuffer; + int pgprocno; if (MyBackendId <= 0) elog(ERROR, "MyBackendId not set"); if (MyBackendId > NumProcStateSlots) elog(PANIC, "unexpected MyBackendId %d in SharedInvalBackendInit (max %d)", MyBackendId, NumProcStateSlots); - stateP = &segP->procState[MyBackendId - 1]; + pgprocno = MyBackendId - 1; + stateP = &segP->procState[pgprocno]; /* * This can run in parallel with read operations, but not with write @@ -285,8 +294,7 @@ SharedInvalBackendInit(bool sendOnly) MyBackendId, oldPid); } - if (MyBackendId > segP->lastBackend) - segP->lastBackend = MyBackendId; + shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = pgprocno; /* Fetch next local transaction ID into local memory */ nextLocalTransactionId = stateP->nextLXID; @@ -318,13 +326,14 @@ CleanupInvalidationState(int status, Datum arg) { SISeg *segP = (SISeg *) DatumGetPointer(arg); ProcState *stateP; + int pgprocno = MyBackendId - 1; int i; Assert(PointerIsValid(segP)); LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); - stateP = &segP->procState[MyBackendId - 1]; + stateP = &segP->procState[pgprocno]; /* Update next local transaction ID for next holder of this backendID */ stateP->nextLXID = nextLocalTransactionId; @@ -335,13 +344,18 @@ CleanupInvalidationState(int status, Datum arg) stateP->resetState = false; stateP->signaled = false; - /* Recompute index of last active backend */ - for (i = segP->lastBackend; i > 0; i--) + for (i = segP->numProcs - 1; i >= 0; i--) { - if (segP->procState[i - 1].procPid != 0) + if (segP->pgprocnos[i] == pgprocno) + { + if (i != segP->numProcs - 1) + segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1]; break; + } } - segP->lastBackend = i; + if (i < 0) + elog(PANIC, "could not find entry in sinval array"); + segP->numProcs--; LWLockRelease(SInvalWriteLock); } @@ -414,9 +428,9 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) * these (unlocked) changes will be committed to memory before we exit * the function. */ - for (i = 0; i < segP->lastBackend; i++) + for (i = 0; i < segP->numProcs; i++) { - ProcState *stateP = &segP->procState[i]; + ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; stateP->hasMessages = true; } @@ -584,13 +598,14 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree; - for (i = 0; i < segP->lastBackend; i++) + for (i = 0; i < segP->numProcs; i++) { - ProcState *stateP = &segP->procState[i]; + ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; int n = stateP->nextMsgNum; - /* Ignore if inactive or already in reset state */ - if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) + /* Ignore if already in reset state */ + Assert(stateP->procPid != 0); + if (stateP->resetState || stateP->sendOnly) continue; /* @@ -626,11 +641,8 @@ SICleanupQueue(bool callerHasWriteLock, int minFree) { segP->minMsgNum -= MSGNUMWRAPAROUND; segP->maxMsgNum -= MSGNUMWRAPAROUND; - for (i = 0; i < segP->lastBackend; i++) - { - /* we don't bother skipping inactive entries here */ - segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; - } + for (i = 0; i < segP->numProcs; i++) + segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND; } /* -- 2.39.2