diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 8140418..832a80d 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -167,6 +167,32 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2, break; } } +static Buffer +RelationAddOneBlock(Relation relation, Buffer otherBuffer, BulkInsertState bistate) +{ + Buffer buffer; + /* + * XXX This does an lseek - rather expensive - but at the moment it is the + * only way to accurately determine how many blocks are in a relation. Is + * it worth keeping an accurate file length in shared memory someplace, + * rather than relying on the kernel to do it for us? + */ + buffer = ReadBufferBI(relation, P_NEW, bistate); + + /* + * We can be certain that locking the otherBuffer first is OK, since + * it must have a lower page number. + */ + if (otherBuffer != InvalidBuffer) + LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * Now acquire lock on the new page. + */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + return buffer; +} /* * RelationGetBufferForTuple @@ -236,7 +262,8 @@ RelationGetBufferForTuple(Relation relation, Size len, Size pageFreeSpace, saveFreeSpace; BlockNumber targetBlock, - otherBlock; + otherBlock, + lastValidBlock; bool needLock; len = MAXALIGN(len); /* be conservative */ @@ -308,6 +335,7 @@ RelationGetBufferForTuple(Relation relation, Size len, } } +loop: while (targetBlock != InvalidBlockNumber) { /* @@ -388,6 +416,8 @@ RelationGetBufferForTuple(Relation relation, Size len, otherBlock, targetBlock, vmbuffer_other, vmbuffer); + lastValidBlock = targetBlock; + /* * Now we can check to see if there's enough free space here. If so, * we're done. @@ -441,36 +471,76 @@ RelationGetBufferForTuple(Relation relation, Size len, needLock = !RELATION_IS_LOCAL(relation); if (needLock) - LockRelationForExtension(relation, ExclusiveLock); - - /* - * XXX This does an lseek - rather expensive - but at the moment it is the - * only way to accurately determine how many blocks are in a relation. Is - * it worth keeping an accurate file length in shared memory someplace, - * rather than relying on the kernel to do it for us? - */ - buffer = ReadBufferBI(relation, P_NEW, bistate); - - /* - * We can be certain that locking the otherBuffer first is OK, since it - * must have a lower page number. - */ - if (otherBuffer != InvalidBuffer) - LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE); - - /* - * Now acquire lock on the new page. - */ - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - - /* - * Release the file-extension lock; it's now OK for someone else to extend - * the relation some more. Note that we cannot release this lock before - * we have buffer lock on the new page, or we risk a race condition - * against vacuumlazy.c --- see comments therein. - */ - if (needLock) - UnlockRelationForExtension(relation, ExclusiveLock); + { + if (TryLockRelationForExtension(relation, ExclusiveLock)) + { + buffer = RelationAddOneBlock(relation, otherBuffer, bistate); + UnlockRelationForExtension(relation, ExclusiveLock); + } + else + { + LockRelationForExtension(relation, ExclusiveLock); + + if (use_fsm) + { + Page page; + Size freespace; + BlockNumber blockNum; + int extraBlocks = 0; + int lockWaiters = 0; + Buffer buf; + + /* + * Update FSM as to condition of this page, and ask for another page + * to try. + */ + targetBlock = RecordAndGetPageWithFreeSpace(relation, + lastValidBlock, + pageFreeSpace, + len + saveFreeSpace); + + /* Other waiter has extended the block for us*/ + if (targetBlock != InvalidBlockNumber) + { + UnlockRelationForExtension(relation, ExclusiveLock); + goto loop; + } + + buffer = RelationAddOneBlock(relation, otherBuffer, bistate); + + lockWaiters = RelationExtensionLockWaiter(relation); + + extraBlocks = lockWaiters * 20; + + while (extraBlocks--) + { + /* + * XXX This does an lseek - rather expensive - but at the moment it is the + * only way to accurately determine how many blocks are in a relation. Is + * it worth keeping an accurate file length in shared memory someplace, + * rather than relying on the kernel to do it for us? + */ + buf = ReadBufferBI(relation, P_NEW, bistate); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + + page = BufferGetPage(buf); + PageInit(page, BufferGetPageSize(buf), 0); + freespace = PageGetHeapFreeSpace(page); + MarkBufferDirty(buf); + blockNum = BufferGetBlockNumber(buf); + UnlockReleaseBuffer(buf); + + RecordPageWithFreeSpace(relation, blockNum, freespace); + } + } + else + buffer = RelationAddOneBlock(relation, otherBuffer, bistate); + + UnlockRelationForExtension(relation, ExclusiveLock); + } + } + else + buffer = RelationAddOneBlock(relation, otherBuffer, bistate); /* * We need to initialize the empty new page. Double-check that it really diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 9d16afb..a56b203 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -340,6 +340,30 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode) (void) LockAcquire(&tag, lockmode, false, false); } +LockAcquireResult +TryLockRelationForExtension(Relation relation, LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_RELATION_EXTEND(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId); + + return LockAcquire(&tag, lockmode, false, true); +} + +int +RelationExtensionLockWaiter(Relation relation) +{ + LOCKTAG tag; + + SET_LOCKTAG_RELATION_EXTEND(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId); + + return LockWaiterCount(&tag); +} + /* * UnlockRelationForExtension */ diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index a458c68..8f49192 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -4380,3 +4380,35 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) LockRelease(&tag, ShareLock, false); return true; } + +int +LockWaiterCount(const LOCKTAG *locktag) +{ + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; + bool found; + uint32 hashcode; + int waiters = 0; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + + /* + * Find a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = ExclusiveLock; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_FIND, &found); + + if (found) + waiters = locallock->lock->nRequested; + + return waiters; +} diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index e9d41bf..a492bb1 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -101,4 +101,7 @@ extern void UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid /* Describe a locktag for error messages */ extern void DescribeLockTag(StringInfo buf, const LOCKTAG *tag); +extern LockAcquireResult TryLockRelationForExtension(Relation relation, LOCKMODE lockmode); +extern int RelationExtensionLockWaiter(Relation relation); + #endif /* LMGR_H */ diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 788d50a..3fd74fb 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -572,6 +572,8 @@ extern void RememberSimpleDeadLock(PGPROC *proc1, PGPROC *proc2); extern void InitDeadLockChecking(void); +extern int LockWaiterCount(const LOCKTAG *locktag); + #ifdef LOCK_DEBUG extern void DumpLocks(PGPROC *proc); extern void DumpAllLocks(void);