/*-------------------------------------------------------------------------
 *
 * multixact.c
 *		PostgreSQL multi-transaction-log manager
 *
 * The pg_multixact manager is a pg_clog-like manager that stores an array
 * of TransactionIds for each MultiXactId.  It is a fundamental part of the
 * shared-row lock implementation.  A share-locked tuple stores a
 * MultiXactId in its Xmax, and a transaction that needs to wait for the
 * tuple to be unlocked can sleep on the potentially-several TransactionIds
 * that compose the MultiXactId.
 *
 * We use two SLRU areas, one for storing the offsets on which the data
 * starts for each MultiXactId in the other one.  This trick allows us to
 * store variable length arrays of TransactionIds.
 *
 * This code is based on clog.c, but the robustness requirements are
 * completely different from pg_clog, because we only need to remember
 * pg_multixact information for currently-open transactions.  Thus, there is
 * no need to preserve data over a crash and restart.
 *
 * The only XLOG interaction we need to take care of is that sequential
 * MultiXactId generation must not overlap across a system crash.  Thus we
 * log groups of MultiXactIds acquisition in the same fashion we do for Oids
 * (see XLogPutNextMultiXactId.)
 *
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/multixact.h"
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "utils/memutils.h"
#include "storage/backendid.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/sinval.h"


/*
 * Defines for MultiXactOffset page sizes.  A page is the same BLCKSZ as is
 * used everywhere else in Postgres.
 *
 * Note: because both uint32 and TransactionIds are 32 bits and wrap around at
 * 0xFFFFFFFF, MultiXact page numbering also wraps around at
 * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE, and segment numbering at
 * 0xFFFFFFFF/MULTIXACT_*_PER_PAGE/SLRU_SEGMENTS_PER_PAGE.  We need take no
 * explicit notice of that fact in this module, except when comparing segment
 * and page numbers in TruncateMultiXactOffset and TruncateMultiXactMember
 * (see MultiXact{Offset,Member}PagePrecedes).
 */

/* We need four bytes per offset and another four bytes per member */
#define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(uint32))
#define MULTIXACT_MEMBERS_PER_PAGE (BLCKSZ / sizeof(TransactionId))

#define MultiXactIdToOffsetPage(xid) \
	((xid) / (uint32) MULTIXACT_OFFSETS_PER_PAGE)
#define MultiXactIdToOffsetEntry(xid) \
	((xid) % (uint32) MULTIXACT_OFFSETS_PER_PAGE)

#define MultiXactIdToMemberPage(xid) \
	((xid) / (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)
#define MultiXactIdToMemberEntry(xid) \
	((xid) % (TransactionId) MULTIXACT_MEMBERS_PER_PAGE)

/* Arbitrary number of MultiXactId to allocate at each XLog call */
#define MXACT_PREFETCH	1024

/*
 * Links to shared-memory data structures for MultiXact control
 */
static SlruCtlData MultiXactOffsetCtlData;
static SlruCtlData MultiXactMemberCtlData;

#define MultiXactOffsetCtl  (&MultiXactOffsetCtlData)
#define MultiXactMemberCtl  (&MultiXactMemberCtlData)

/*
 * MultiXact state shared across all backends.
 */
typedef struct MultiXactStateData
{
	/* global minimum assigned MultiXactId */
	MultiXactId		GlobalMinMXactId;

	/* next to-be-assigned MultiXactId */
	MultiXactId		nextMXact;

	/* next to-be-assigned offset */
	uint32			nextOffset;

	/* MultiXactIds we have left before logging */
	uint32			mXactCount;

	/* MultiXact at which we can truncate the Offset SLRU area */
	uint32		offsetTruncationMulti;

	/* the Offset SLRU area was last truncated at this MultiXactId */
	uint32		lastTruncationMulti;

	/*
	 * Per-backend minimum assigned MultiXactId.
	 *
	 * We declare it with length 1 because C wants a fixed size array,
	 * but actually it is MaxBackends entries long.
	 */
	MultiXactId	   minMXactId[1];
} MultiXactStateData;

static MultiXactStateData	*MultiXactState;


/*
 * Definitions for the backend-local MultiXactId cache.
 *
 * We use this cache to store known MultiXacts, so we don't need to go to
 * SLRU areas everytime.  We also use this when we need to create MultiXacts
 * for a TransactionId set that was already used.
 *
 * The cache lasts for the duration of a single transaction, the rationale
 * for this being that most entries will contain our own TransactionId and
 * so they will be invalid by the time our next transaction starts.
 *
 * We allocate the cache entries on a memory context that is reset at
 * transaction end, so we don't need retail freeing of entries.
 */
typedef struct mXactCacheEnt
{
	MultiXactId		multi;
	int				nxids;
	TransactionId  *xids;
	struct mXactCacheEnt *next;
} mXactCacheEnt;

static mXactCacheEnt   *MXactCache = NULL;
static MemoryContext	MXactContext = NULL;


#ifdef MULTIXACT_DEBUG
#define debug_elog2(a,b) elog(a,b)
#define debug_elog3(a,b,c) elog(a,b,c)
#define debug_elog4(a,b,c,d) elog(a,b,c,d)
#define debug_elog5(a,b,c,d,e) elog(a,b,c,d,e)
#else
#define debug_elog2(a,b)
#define debug_elog3(a,b,c)
#define debug_elog4(a,b,c,d)
#define debug_elog5(a,b,c,d,e)
#endif

/* internal MultiXactId management */
static MultiXactId CreateMultiXactId(int nxids, TransactionId *xids);
static int GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids);
static MultiXactId GetNewMultiXactId(int nxids, uint32 *offset);
static int xidComparator(const void *arg1, const void *arg2);
#ifdef MULTIXACT_DEBUG
static char *mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids);
#endif

/* MultiXact cache management */
static MultiXactId mXactCacheGetBySet(int nxids, TransactionId *xids);
static int mXactCacheGetByMulti(MultiXactId multi, TransactionId **xids);
static void mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids);
static void mXactCacheClear(void);

/* management of SLRU infrastructure */
static int	ZeroMultiXactOffsetPage(int pageno);
static int	ZeroMultiXactMemberPage(int pageno);
static bool MultiXactOffsetPagePrecedes(int page1, int page2);
static bool MultiXactMemberPagePrecedes(int page1, int page2);
static bool MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2);
static void ExtendMultiXactOffset(MultiXactId multi);
static void ExtendMultiXactMember(uint32 offset);
static void TruncateMultiXactOffset(void);
static void TruncateMultiXactMember(uint32 offset);

/*
 * MultiXactIdExpand
 * 		Add a TransactionId to a possibly-already-existing MultiXactId.
 *
 * We abuse the notation for the first argument: if "isMulti" is true, then
 * it's really a MultiXactId; else it's a TransactionId.  We are already
 * storing MultiXactId in HeapTupleHeader's xmax so it's not really wrong.
 *
 * If isMulti is true, then get the members of the passed MultiXactId, add
 * the passed TransactionId, and create a new MultiXactId.  If isMulti is
 * false, then take the two TransactionIds and create a new MultiXactId with
 * them.  The caller should make sure that the multi and xid are different
 * in the latter case.
 *
 * If the TransactionId is already a member of the passed MultiXactId, return
 * it.
 *
 * NB - we don't worry about our local MultiXactId cache here, because that
 * is handled by the lower-level routines.
 */
MultiXactId
MultiXactIdExpand(MultiXactId multi, bool isMulti, TransactionId xid)
{
	MultiXactId		newMulti;
	TransactionId  *members;
	TransactionId  *newMembers;
	int				nmembers;
	int				i;
	int				j;

	AssertArg(MultiXactIdIsValid(multi));
	AssertArg(TransactionIdIsValid(xid));

	debug_elog5(DEBUG1, "Expand: received %s %u, xid %u",
			isMulti ? "MultiXactId" : "TransactionId",
			multi,
			xid);

	/*
	 * The first argument is a TransactionId, not a MultiXactId.
	 */
	if (!isMulti)
	{
		TransactionId	xids[2];

		Assert(!TransactionIdEquals(multi, xid));

		xids[0] = multi;
		xids[1] = xid;

		newMulti = CreateMultiXactId(2, xids);

		debug_elog5(DEBUG1, "Expand: returning %u two-elem %u/%u",
				newMulti, multi, xid);

		return newMulti;
	}

	nmembers = GetMultiXactIdMembers(multi, &members);

	/*
	 * The MultiXactId is obsolete.  This can only happen if the MultiXactId
	 * members stop running between the caller checking and passing it to
	 * us.  It would be better to return that fact to the caller, but it would
	 * complicate the API and it's unlikely to happen too often, so just deal
	 * with it.
	 */
	if (nmembers == 0)
	{
		newMulti = CreateMultiXactId(1, &xid);

		debug_elog4(DEBUG1, "Expand: %u has no members, returning singleton %u",
				multi, newMulti);
		return newMulti;
	}

	/*
	 * If the TransactionId is already of a member of the MultiXactId,
	 * return it.
	 */
	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdEquals(members[i], xid))
		{
			pfree(members);
			debug_elog4(DEBUG1, "Expand: %u is already a member of %u", xid, multi);
			return multi;
		}
	}

	/*
	 * Determine which of the members of the MultiXactId are still running,
	 * and use them to create a new one.
	 */
	newMembers = (TransactionId *)
		palloc(sizeof(TransactionId) * (nmembers + 1));

	for (i = 0, j = 0; i < nmembers; i++)
	{
		if (TransactionIdIsInProgress(members[i]))
			newMembers[j++] = members[i];
	}

	newMembers[j++] = xid;
	newMulti = CreateMultiXactId(j, newMembers);

	pfree(members);
	pfree(newMembers);

	debug_elog3(DEBUG1, "Expand: returning new multi %u", newMulti);

	return newMulti;
}

/*
 * MultiXactIdIsRunning
 * 		Returns whether a MultiXactId is "running".
 *
 * We return true if at least one member of the given MultiXactId is still
 * running.
 */
bool
MultiXactIdIsRunning(MultiXactId multi)
{
	TransactionId *members;
	TransactionId  myXid;
	int		nmembers;
	int		i;

	debug_elog3(DEBUG1, "IsRunning %u?", multi);

	nmembers = GetMultiXactIdMembers(multi, &members);

	if (nmembers == 0)
	{
		debug_elog2(DEBUG1, "IsRunning: no members");
		return false;
	}

	/* checking for myself is cheap */
	myXid = GetTopTransactionId();

	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdEquals(members[i], myXid))
		{
			pfree(members);
			debug_elog3(DEBUG1, "IsRunning: I (%d) am running!", i);
			return true;
		}
	}

	/*
	 * FIXME --- this could be made better by having a special entry point in
	 * sinval.c, walking the PGPROC array only once for the whole array.
	 */
	for (i = 0; i < nmembers; i++)
	{
		if (TransactionIdIsInProgress(members[i]))
		{
			pfree(members);
			debug_elog4(DEBUG1, "IsRunning: member %d (%u) is running",
					i, members[i]);
			return true;
		}
	}

	pfree(members);
	debug_elog3(DEBUG1, "IsRunning: %u is not running", multi);

	return false;
}

/*
 * MultiXactIdSetMin
 * 		Set the minimum MultiXactId for this transaction.
 *
 * We set the minimum known MultiXactId for a given transaction the first
 * time it's going to acquire a shared lock.  We need to do this even if we
 * end up using a TransactionId instead of a MultiXactId, because there
 * is a chance that another transaction would use us to create a MultiXactId.
 *
 * When calculating the global minimum in AtEOXact_MultiXact, we need to
 * account for said hypothetical future MultiXactId, or it will fail to
 * detect us if the second transaction finishes before we do.
 *
 * The value to set is the next to-be-assigned MultiXact, so this is meant
 * to be called just before acquiring a shared lock.
 */
void
MultiXactIdSetMin(void)
{
	if (!MultiXactIdIsValid(MultiXactState->minMXactId[MyBackendId]))
	{
		debug_elog2(DEBUG1, "MultiXact: setting min for this xact");
		MultiXactState->minMXactId[MyBackendId] =
			MultiXactState->nextMXact;
	}
}

/*
 * MultiXactIdWait
 *		Sleep on a MultiXactId.
 *
 * We do this by sleeping on each member using XactLockTableWait.  Note that
 * by the time we finish sleeping, someone else may have changed the Xmax of
 * the containing tuple, so the caller needs to iterate on us somehow.
 */
void
MultiXactIdWait(MultiXactId multi)
{
	TransactionId *members;
	int		nmembers;

	nmembers = GetMultiXactIdMembers(multi, &members);

	if (nmembers > 0)
	{
		int		i;

		for (i = 0; i < nmembers; i++)
		{
			debug_elog4(DEBUG1, "Wait: waiting for %d (%u)", i, members[i]);
			XactLockTableWait(members[i]);
		}

		pfree(members);
	}

	return;
}

/*
 * CreateMultiXactId
 * 		Make a new MultiXactId
 *
 * Make SLRU and cache entries for a new MultiXactId, recording the given
 * TransactionIds as members.  Returns the newly created MultiXactId.
 */
static MultiXactId
CreateMultiXactId(int nxids, TransactionId *xids)
{
	MultiXactId	multi;
	int			pageno;
	int			entryno;
	int			slotno;
	uint32	   *offptr;
	uint32		offset;
	int			i;

	debug_elog3(DEBUG1, "Create: %s", mxid_to_string(InvalidMultiXactId, nxids, xids));

	/* See if the Xid set is already on the cache */
	if (MultiXactIdIsValid(multi = mXactCacheGetBySet(nxids, xids)))
	{	
		debug_elog2(DEBUG1, "Create: in cache!");

		/*
		 * The cached multi can't precede the global min because we reset
		 * the cache at transaction end (which is where the global min is
		 * recalculated.)
		 */
		Assert(MultiXactIdPrecedes(MultiXactState->GlobalMinMXactId, multi));

		return multi;
	}

	multi = GetNewMultiXactId(nxids, &offset);

	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	ExtendMultiXactOffset(multi);

	pageno = MultiXactIdToOffsetPage(multi);
	entryno = MultiXactIdToOffsetEntry(multi);

	/*
	 * FIXME -- we could enhance error reporting on SimpleLruReadPage;
	 * clearly InvalidTransactionId is inappropiate here.  Maybe pass
	 * a function pointer?
	 */
	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offptr += entryno;
	*offptr = offset;

	MultiXactOffsetCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;

	/* Exchange our lock */
	LWLockRelease(MultiXactOffsetControlLock);

	debug_elog3(DEBUG1, "Create: got offset %u", offset);

	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	for (i = 0; i < nxids; i++, offset++)
	{
		TransactionId *memberptr;

		ExtendMultiXactMember(offset);

		pageno = MultiXactIdToMemberPage(offset);
		entryno = MultiXactIdToMemberEntry(offset);

		slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno,
								   InvalidTransactionId);

		memberptr = (TransactionId *)
			MultiXactMemberCtl->shared->page_buffer[slotno];
		memberptr += entryno;

		*memberptr = xids[i];
		MultiXactMemberCtl->shared->page_status[slotno] = SLRU_PAGE_DIRTY;
	}

	LWLockRelease(MultiXactMemberControlLock);

	/* Store the new MultiXactId in the cache */
	mXactCachePut(multi, nxids, xids);
	debug_elog2(DEBUG1, "Create: all done");

	return multi;
}

/*
 * GetNewMultiXactId
 *		Get the next MultiXactId.
 *
 * Get the next MultiXactId, XLogging if needed.  Also, reserve space
 * in the "members" area right away so no concurrent transaction can take it
 * away from us.  The offset of the reserved space is returned in the offset
 * parameter.
 */
static MultiXactId
GetNewMultiXactId(int nxids, uint32 *offset)
{
	MultiXactId		result;

	debug_elog3(DEBUG1, "GetNew: for %d xids", nxids);
	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);

	if (MultiXactState->nextMXact < FirstMultiXactId)
	{
		debug_elog2(DEBUG1, "GetNew: wraparound!");
		MultiXactState->nextMXact = FirstMultiXactId;
		MultiXactState->mXactCount = 0;
	}

	/* If we run out of logged for use multixacts then we must log more */
	if (MultiXactState->mXactCount == 0)
	{
		debug_elog2(DEBUG1, "GetNew: xlogging more");
		XLogPutNextMultiXactId(MultiXactState->nextMXact +
							   MXACT_PREFETCH);
		MultiXactState->mXactCount = MXACT_PREFETCH;
	}

	result = MultiXactState->nextMXact;
	*offset = MultiXactState->nextOffset;

	/*
	 * We don't care about MultiXactId wraparound; it will be handled by
	 * the next iteration.
	 */
	(MultiXactState->nextMXact)++;
	(MultiXactState->mXactCount)--;
	MultiXactState->nextOffset += nxids;

	/* Our own minimum should be set already by MultiXactIdSetMin() */
	Assert(MultiXactIdIsValid(MultiXactState->minMXactId[MyBackendId]));

	/* Set the shared "smallest MultiXactId assigned" if not done already */
	if (!MultiXactIdIsValid(MultiXactState->GlobalMinMXactId))
	{
		debug_elog3(DEBUG1, "GetNew: global min is %u", result);
		MultiXactState->GlobalMinMXactId = result;
		MultiXactState->offsetTruncationMulti = result;
	}

	LWLockRelease(MultiXactGenLock);

	debug_elog3(DEBUG1, "GetNew: all done, returning %u", result);
	return result;
}

/*
 * GetMultiXactIdMembers
 * 		Returns the set of TransactionIds that make up a MultiXactId
 */
static int
GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids)
{
	int			pageno;
	int			entryno;
	int			slotno;
	uint32	   *offptr;
	uint32		offset;
	int			length;
	int			i;
	MultiXactId	nextMXact;
	MultiXactId	tmp;
	uint32		nextOffset;
	TransactionId *ptr;

	debug_elog3(DEBUG1, "GetMembers: asked for %u", multi);

	Assert(MultiXactIdIsValid(multi));

	/*
	 * We check known limits on MultiXact before resorting to the SLRU area.
	 * Rationale is:
	 *
	 * a) if MultiXactState->GlobalMinMXactId is Invalid, then no
	 * transaction has used MultiXactIds yet, so we don't need to check.
	 *
	 * b) If the MultiXactId precedes MultiXactState->GlobalMinMXactId
	 * then all transactions that cared about that MultiXactId are gone, so
	 * no need to check.
	 */
	LWLockAcquire(MultiXactGenLock, LW_SHARED);

	/* Any valid MultiXactId must precede the next-to-assign MultiXactId */
	Assert(MultiXactIdPrecedes(multi, MultiXactState->nextMXact));

	if (!MultiXactIdIsValid(MultiXactState->GlobalMinMXactId) ||
		MultiXactIdPrecedes(multi, MultiXactState->GlobalMinMXactId))
	{
		debug_elog2(DEBUG1, "GetMembers: it's too small");
		xids = NULL;
		LWLockRelease(MultiXactGenLock);
		return 0;
	}

	/*
	 * Before releasing the lock, save the shared-var-cached values, because
	 * they may contain the next MultiXactId to be given out.  We will need
	 * to know the offset if that's the case.
	 */
	nextMXact = MultiXactState->nextMXact;
	nextOffset = MultiXactState->nextOffset;

	LWLockRelease(MultiXactGenLock);

	/* See if the MultiXactId is in the local cache */
	length = mXactCacheGetByMulti(multi, xids);
	if (length > 0)
	{
		debug_elog3(DEBUG1, "GetMembers: found %s in the cache",
				mxid_to_string(multi, length, *xids));
		return length;
	}

	/* Get the offset at which we need to start reading MultiXactOffset */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	pageno = MultiXactIdToOffsetPage(multi);
	entryno = MultiXactIdToOffsetEntry(multi);

	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offptr += entryno;
	offset = *offptr;

	/*
	 * How many members do we need to read?  If we are at the edge of the
	 * assigned MultiXactIds, consult ShmemVariableCache.  Else we
	 * need to check the MultiXactId following ours.
	 *
	 * Use the same increment rule as GetNewMultiXactId(), that is, don't
	 * handle wraparound explicitly until needed.
	 */
	tmp = multi + 1;

	if (nextMXact == tmp)
		length = nextOffset - offset;
	else
	{
		/* handle wraparound if needed */
		if (tmp == InvalidMultiXactId)
			tmp = FirstMultiXactId;

		pageno = MultiXactIdToOffsetPage(tmp);
		entryno = MultiXactIdToOffsetEntry(tmp);

		slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno,
								   InvalidTransactionId);
		offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
		offptr += entryno;
		length = *offptr - offset;
	}

	/* Exchange our lock */
	LWLockRelease(MultiXactOffsetControlLock);
	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	ptr = (TransactionId *) palloc(length * sizeof(TransactionId));
	*xids = ptr;

	/* Now get the members themselves. */
	for (i = 0; i < length ; i++, offset++)
	{
		TransactionId *xactptr;

		pageno = MultiXactIdToMemberPage(offset);
		entryno = MultiXactIdToMemberEntry(offset);

		slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno,
								   InvalidTransactionId);
		xactptr = (TransactionId *)
			MultiXactMemberCtl->shared->page_buffer[slotno];
		xactptr += entryno;

		*ptr++ = *xactptr;
	}

	LWLockRelease(MultiXactMemberControlLock);

	/*
	 * Enter the value in the cache.
	 */
	mXactCachePut(multi, length, ptr);

	debug_elog3(DEBUG1, "GetMembers: no cache for %s",
				mxid_to_string(multi, length, ptr));
	return length;
}

/*
 * mXactCacheGetBySet
 * 		returns a MultiXactId from the cache based on the set of
 * 		TransactionIds that compose it, or InvalidMultiXactId if
 * 		none matches.
 *
 * This is helpful, for example, if two transactions want to lock a huge
 * table.  By using the cache, the second will use the same MultiXactId
 * for the majority of tuples, thus keeping MultiXactId usage low (saving
 * both I/O and wraparound issues.)
 */
static MultiXactId
mXactCacheGetBySet(int nxids, TransactionId *xids)
{
	mXactCacheEnt	*entry;

	debug_elog3(DEBUG1, "CacheGet: looking for %s",
				mxid_to_string(InvalidMultiXactId, nxids, xids));

	/* sort the array so comparison is easy */
	qsort(xids, nxids, sizeof(TransactionId), xidComparator);

	for (entry = MXactCache; entry != NULL; entry = entry->next)
	{
		int i;

		if (entry->nxids != nxids)
			continue;

		/* We assume the entries are sorted */
		for (i = 0; i < nxids; i++)
		{
			if (entry->xids[i] != xids[i])
				goto next;
		}
		debug_elog3(DEBUG1, "CacheGet: found %u", entry->multi);
		return entry->multi;
next:
		;
	}

	debug_elog2(DEBUG1, "CacheGet: not found :-(");
	return InvalidMultiXactId;
}

/*
 * mXactCacheGetByMulti
 * 		returns the composing TransactionId set from the cache for a
 * 		given MultiXactId, if present.
 */
static int
mXactCacheGetByMulti(MultiXactId multi, TransactionId **xids)
{
	mXactCacheEnt	*entry;
	debug_elog3(DEBUG1, "CacheGet: looking for %u", multi);

	for (entry = MXactCache; entry != NULL; entry = entry->next)
	{
		if (entry->multi == multi)
		{
			TransactionId	*ptr;
			int		size;

			size = sizeof(TransactionId) * entry->nxids;
			ptr = (TransactionId *) palloc(size);
			*xids = ptr;

			memcpy(ptr, entry->xids, size);

			debug_elog3(DEBUG1, "CacheGet: found %s",
					mxid_to_string(multi, entry->nxids, entry->xids));
			return entry->nxids;
		}
	}

	debug_elog2(DEBUG1, "CacheGet: not found");
	return 0;
}

/*
 * mXactCachePut
 * 		Add a new MultiXactId and its composing set into the local cache.
 */
static void
mXactCachePut(MultiXactId multi, int nxids, TransactionId *xids)
{
	mXactCacheEnt  *entry;
	MemoryContext	old_cxt;

	debug_elog3(DEBUG1, "CachePut: storing %s", mxid_to_string(multi, nxids, xids));

	if (MXactContext == NULL)
	{
		debug_elog2(DEBUG1, "CachePut: initializing memory context");
		MXactContext = AllocSetContextCreate(TopTransactionContext,
											 "MultiXact Cache Context",
											 ALLOCSET_SMALL_MINSIZE,
											 ALLOCSET_SMALL_INITSIZE,
											 ALLOCSET_SMALL_MAXSIZE);
	}

	old_cxt = MemoryContextSwitchTo(MXactContext);

	entry = (mXactCacheEnt *) palloc(sizeof(mXactCacheEnt));

	entry->multi = multi;
	entry->nxids = nxids;
	entry->xids = (TransactionId *) palloc(sizeof(TransactionId) * nxids);
	memcpy(entry->xids, xids, sizeof(TransactionId) * nxids);

	/* mXactCacheGetBySet assumes the entries are sorted, so sort them */
	qsort(entry->xids, nxids, sizeof(TransactionId), xidComparator);
	entry->next = MXactCache;
	MXactCache = entry;

	MemoryContextSwitchTo(old_cxt);
}

/*
 * mXactCacheClear
 * 		Clear the local MultiXactId cache.
 */
static void
mXactCacheClear(void)
{
	debug_elog2(DEBUG1, "CacheClear");
	if (MXactContext != NULL)
	{
		MemoryContextReset(MXactContext);
		MXactContext = NULL;
		MXactCache = NULL;
	}
}

#ifdef MULTIXACT_DEBUG
static char *
mxid_to_string(MultiXactId multi, int nxids, TransactionId *xids)
{
	char *str = palloc(15 * (nxids + 1) + 4);
	int i;
	snprintf(str, 47, "%u %d[%u", multi, nxids, xids[0]);

	for (i = 1; i < nxids; i++)
		snprintf(str + strlen(str), 17, ", %u", xids[i]);

	strcat(str, "]");
	return str;
}
#endif

/* xidComparator
 * 		qsort comparison function for Xids
 *
 * FIXME -- I'm dubious about this comparator ... test it more.
 */
static int
xidComparator(const void *arg1, const void *arg2)
{
	const TransactionId *xid1 = (const TransactionId *) arg1;
	const TransactionId *xid2 = (const TransactionId *) arg2;

	if (TransactionIdEquals(*xid1, *xid2))
		return 0;

	return TransactionIdFollows(*xid1, *xid2);
}

static MultiXactId
FindGlobalMinMXact(void)
{
	MultiXactId	newMin = InvalidMultiXactId;
	int i;

	for (i = 0; i < MaxBackends; i++)
	{
		if (!MultiXactIdIsValid(MultiXactState->minMXactId[i]))
			continue;
		if (!MultiXactIdIsValid(newMin))
		{
			newMin = MultiXactState->minMXactId[i];
			continue;
		}
		if (MultiXactIdPrecedes(MultiXactState->minMXactId[i], newMin))
			newMin = MultiXactState->minMXactId[i];
	}

	return newMin;
}

/*
 * AtEOXact_MultiXact
 *		Handle transaction end for MultiXact
 *
 * Detect whether we are the transaction with the oldest known-valid
 * MultiXactId, and update the shared state if so.
 */
void
AtEOXact_MultiXact(void)
{
	/* return early if we didn't use MultiXacts */
	if (!MultiXactIdIsValid(MultiXactState->minMXactId[MyBackendId]))
		return;

	debug_elog4(DEBUG1, "atEOXact: my min is %u, global min is %u",
			MultiXactState->minMXactId[MyBackendId],
			MultiXactState->GlobalMinMXactId);

	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);

	if (MultiXactState->GlobalMinMXactId ==
		MultiXactState->minMXactId[MyBackendId])
	{
		MultiXactId		newGlobal;

		MultiXactState->minMXactId[MyBackendId] = InvalidMultiXactId;

		newGlobal = FindGlobalMinMXact();
		MultiXactState->GlobalMinMXactId = newGlobal;

		/*
		 * If we are really advancing the global minimum (i.e. if there
		 * still are transactions using MultiXactIds), advance the truncation
		 * point too.
		 */
		if (MultiXactIdIsValid(newGlobal))
			MultiXactState->offsetTruncationMulti = newGlobal;
	}
	else
		MultiXactState->minMXactId[MyBackendId] = InvalidMultiXactId;

	/*
	 * Invalidate our PGPROC's min MultiXactId before releasing
	 * the lock.
	 */
	MultiXactState->minMXactId[MyBackendId] = InvalidMultiXactId;
	debug_elog3(DEBUG1, "atEOXact: global min is now %u",
			MultiXactState->GlobalMinMXactId);

	LWLockRelease(MultiXactGenLock);

	/* Clear the local MultiXactId cache */
	mXactCacheClear();
}

/*
 * Initialization of shared memory for MultiXact.  We use two SLRU areas,
 * thus double memory.  Also, reserve space for the shared MultiXactState
 * struct. We reserve additional space for the MaxBackends-sized array in
 * that struct, noting that the first element is considered in
 * sizeof(MultiXactStateData).
 */
int
MultiXactShmemSize(void)
{

#define SHARED_MULTIXACT_STATE_SIZE \
			(sizeof(MultiXactStateData) + \
			 sizeof(MultiXactId) * (MaxBackends - 1))

	return (SimpleLruShmemSize() * 2 + SHARED_MULTIXACT_STATE_SIZE);
}

void
MultiXactShmemInit(void)
{
	bool	found;

	debug_elog2(DEBUG1, "Shared Memory Init for MultiXact");
	MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
	MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;

	SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset Ctl",
				  MultiXactOffsetControlLock, "pg_multixact/offsets");
	SimpleLruInit(MultiXactMemberCtl, "MultiXactMember Ctl",
				  MultiXactMemberControlLock, "pg_multixact/members");

	/* Override default assumption that writes should be fsync'd */
	MultiXactOffsetCtl->do_fsync = false;
	MultiXactMemberCtl->do_fsync = false;

	/* Initialize the state struct */
	MultiXactState = ShmemInitStruct("Shared MultiXact State",
										   SHARED_MULTIXACT_STATE_SIZE,
										   &found);
	if (!IsUnderPostmaster)
	{
		Assert(!found);

		memset(MultiXactState, 0, SHARED_MULTIXACT_STATE_SIZE);
	}
	else
		Assert(found);
}

/*
 * This func must be called ONCE on system install.  It creates the initials
 * MultiXact segments.  (The MultiXacts directores are assumed to have been
 * created by initdb, and MultiXactShmemInit must have been called already.)
 *
 * Note: it's not really necessary to create the initial segment now,
 * since slru.c would create it on first write anyway.	But we may as well
 * do it to be sure the directory is set up correctly.
 */
void
BootStrapMultiXact(void)
{
	int			slotno;

	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	/* Offsets first page */
	slotno = ZeroMultiXactOffsetPage(0);
	SimpleLruWritePage(MultiXactOffsetCtl, slotno, NULL);
	Assert(MultiXactOffsetCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);

	LWLockRelease(MultiXactOffsetControlLock);
	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	/* Members first page */
	slotno = ZeroMultiXactMemberPage(0);
	SimpleLruWritePage(MultiXactMemberCtl, slotno, NULL);
	Assert(MultiXactMemberCtl->shared->page_status[slotno] == SLRU_PAGE_CLEAN);

	LWLockRelease(MultiXactMemberControlLock);
}

/*
 * Initialize (or reinitialize) a page of MultiXactOffset to zeroes.
 *
 * The page is not actually written, just set up in shared memory.
 * The slot number of the new page is returned.
 *
 * Control lock must be held at entry, and will be held at exit.
 */
static int
ZeroMultiXactOffsetPage(int pageno)
{
	return SimpleLruZeroPage(MultiXactOffsetCtl, pageno);
}

/*
 * Ditto, for MultiXactMember
 */
static int
ZeroMultiXactMemberPage(int pageno)
{
	return SimpleLruZeroPage(MultiXactMemberCtl, pageno);
}

/*
 * This must be called ONCE during postmaster or standalone-backend startup.
 */
void
StartupMultiXact(MultiXactId nextMulti)
{
	int			startPage;

	/*
	 * Set the next to-be-assigned MultiXactId based on information
	 * from the control file.
	 */
	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);

	debug_elog3(DEBUG1, "Startup called for MultiXact with next %u", nextMulti);

	MultiXactState->nextMXact = nextMulti;
	MultiXactState->mXactCount = 0;

	LWLockRelease(MultiXactGenLock);

	/*
	 * Since we don't expect MultiXact to be valid across crashes, we
	 * initialize the currently-active page to zeroes during startup.
	 * Whenever we advance into a new page, both ExtendMultiXact routines
	 * will likewise zero the new page without regard to whatever was
	 * previously on disk.
	 */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	startPage = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
	(void) ZeroMultiXactOffsetPage(startPage);

	LWLockRelease(MultiXactOffsetControlLock);

	LWLockAcquire(MultiXactMemberControlLock, LW_EXCLUSIVE);

	/* The members area always starts at 0 */
	(void) ZeroMultiXactMemberPage(0);

	LWLockRelease(MultiXactMemberControlLock);

}

/*
 * This must be called ONCE during postmaster or standalone-backend shutdown
 */
void
ShutdownMultiXact(void)
{
	/*
	 * Flush dirty MultiXact pages to disk
	 *
	 * This is not actually necessary from a correctness point of view. We do
	 * it merely as a debugging aid.
	 */
	SimpleLruFlush(MultiXactOffsetCtl, false);
	SimpleLruFlush(MultiXactMemberCtl, false);
}

/*
 * Get the next MultiXactId for the upcoming checkpoint
 */
MultiXactId
MultiXactGetCheckptMulti(bool is_shutdown)
{
	MultiXactId	retval;

	LWLockAcquire(MultiXactGenLock, LW_SHARED);

	retval = MultiXactState->nextMXact;
	if (!is_shutdown)
		retval += MultiXactState->mXactCount;

	LWLockRelease(MultiXactGenLock);

	debug_elog3(DEBUG1, "MultiXact: MultiXact for checkpoint record is %u", retval);

	return retval;
}

/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
void
CheckPointMultiXact(void)
{
	debug_elog2(DEBUG1, "MultiXact Checkpoint flush");
	/*
	 * Flush dirty MultiXact pages to disk
	 *
	 * This is not actually necessary from a correctness point of view. We do
	 * it merely to improve the odds that writing of dirty pages is done
	 * by the checkpoint process and not by backends.
	 */
	SimpleLruFlush(MultiXactOffsetCtl, true);
	SimpleLruFlush(MultiXactMemberCtl, true);

	/*
	 * Truncate the SLRU areas
	 */
	TruncateMultiXactOffset();
}

/*
 * Set the next-to-be-assigned MultiXactId from a XLog record
 */
void
MultiXactSetNextMXact(MultiXactId nextMulti)
{
	debug_elog3(DEBUG1, "MultiXact: setting next multi to %u", nextMulti);
	MultiXactState->nextMXact = nextMulti;
	MultiXactState->mXactCount = 0;
}

/*
 * Make sure that MultiXactOffset has room for a newly-allocated MultiXactId.
 *
 * The MultiXactOffsetControlLock should be held at entry, and will
 * be held at exit.
 */
void
ExtendMultiXactOffset(MultiXactId multi)
{
	int			pageno;

	/*
	 * No work except at first MultiXactId of a page.  But beware: just after
	 * wraparound, the first MultiXactId of page zero is FirstMultiXactId.
	 */
	if (MultiXactIdToOffsetEntry(multi) != 0 &&
		multi != FirstMultiXactId)
		return;

	pageno = MultiXactIdToOffsetPage(multi);

	/* Zero the page */
	ZeroMultiXactOffsetPage(pageno);
}

/*
 * Make sure that MultiXactMember has room for the members of a newly-
 * allocated MultiXactId.
 *
 * The MultiXactMemberControlLock should be held at entry, and will be held
 * at exit.
 */
void
ExtendMultiXactMember(uint32 offset)
{
	int		pageno;

	/*
	 * No work except at first entry of a page.
	 */
	if (MultiXactIdToMemberEntry(offset) != 0)
		return;

	pageno = MultiXactIdToMemberPage(offset);

	/* Zero the page */
	ZeroMultiXactMemberPage(pageno);
}

/*
 * Remove all MultiXactOffset segments before the one holding the passed
 * MultiXactId.
 */
static void
TruncateMultiXactOffset(void)
{
	int			cutoffPage;
	int			pageno;
	int			slotno;
	int			entryno;
	uint32	   *offptr;
	uint32		offset;
	MultiXactId	multi;

	/*
	 * Get the truncation point from the shared state area.  If it hasn't
	 * been set yet, skip this truncation cycle, because this needs we don't
	 * have any work to do.
	 *
	 * FIXME -- it's possible we have segments left after a crash that could
	 * be cleaned up.  Think harder about whether it's reasonable to use
	 * nextMXactId as truncation point if truncationMulti is Invalid.
	 */
	multi = MultiXactState->offsetTruncationMulti;

	if (!MultiXactIdIsValid(multi))
		return;

	/* if we already truncated at this point, skip it. */
	if (MultiXactState->lastTruncationMulti == multi)
		return;

	/*
	 * Before truncating we need to read the stored datum, in order
	 * to truncate MultiXactMember at that point.
	 */
	LWLockAcquire(MultiXactOffsetControlLock, LW_EXCLUSIVE);

	pageno = MultiXactIdToOffsetPage(multi);
	entryno = MultiXactIdToOffsetEntry(multi);

	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, InvalidTransactionId);
	offptr = (uint32 *) MultiXactOffsetCtl->shared->page_buffer[slotno];
	offptr += entryno;
	offset = *offptr;

	LWLockRelease(MultiXactOffsetControlLock);

	/*
	 * The cutoff point is the start of the segment containing multi.
	 * We pass the *page* containing oldestXact to SimpleLruTruncate.
	 */
	cutoffPage = MultiXactIdToOffsetPage(multi);

	SimpleLruTruncate(MultiXactOffsetCtl, cutoffPage);

	/*
	 * Now truncate MultiXactMember.
	 */
	TruncateMultiXactMember(offset);

	/*
	 * Set the last known truncation point
	 */
	MultiXactState->lastTruncationMulti = multi;
}

/*
 * Truncate the members' SLRU area, removing segments before the one
 * holding the passed offset.
 *
 * This is called from TruncateMultiXactOffset only, because at that point
 * we know what is the earliest offset that might be needed to check in the
 * future.
 */
static void
TruncateMultiXactMember(uint32 offset)
{
	int		cutoffPage;

	cutoffPage = MultiXactIdToMemberPage(offset);

	SimpleLruTruncate(MultiXactMemberCtl, cutoffPage);
}

/*
 * Decide which of two MultiXactOffset page numbers is "older" for truncation
 * purposes.
 *
 * We need to use comparison of MultiXactId here in order to do the right
 * thing with wraparound.  However, if we are asked about page number zero, we
 * don't want to hand InvalidMultiXactId to MultiXactIdPrecedes: it'll get
 * weird.  So, offset both multis by FirstMultiXactId to avoid that.
 */
static bool
MultiXactOffsetPagePrecedes(int page1, int page2)
{
	MultiXactId multi1;
	MultiXactId multi2;

	multi1 = ((MultiXactId) page1) * MULTIXACT_OFFSETS_PER_PAGE;
	multi1 += FirstMultiXactId;
	multi2 = ((MultiXactId) page2) * MULTIXACT_OFFSETS_PER_PAGE;
	multi2 += FirstMultiXactId;

	return MultiXactIdPrecedes(multi1, multi2);
}

/*
 * Decide which of two MultiXactMember page numbers is "older" for truncation
 * purposes.  There is no "invalid offset number" so use the numbers verbatim.
 */
static bool
MultiXactMemberPagePrecedes(int page1, int page2)
{
	uint32	offset1;
	uint32	offset2;

	offset1 = ((uint32) page1) * MULTIXACT_MEMBERS_PER_PAGE;
	offset2 = ((uint32) page2) * MULTIXACT_MEMBERS_PER_PAGE;

	return MultiXactOffsetPrecedes(offset1, offset2);
}

/*
 * Decide which of two MultiXactIds is earlier.
 *
 * FIXME -- do we need to do something special to InvalidMultiXactId?
 */
bool
MultiXactIdPrecedes(MultiXactId multi1, MultiXactId multi2)
{
	int32 diff = (int32) (multi1 - multi2);
	return (diff < 0);
}

/*
 * Decide which of two offsets is earlier.
 */
static bool
MultiXactOffsetPrecedes(uint32 offset1, uint32 offset2)
{
	int32 diff = (int32) (offset1 - offset2);
	return (diff < 0);
}
