[PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | [PATCH 08/16] Introduce the ApplyCache module which can reassemble transactions from a stream of interspersed changes |
Date | |
Msg-id | 1339586927-13156-8-git-send-email-andres@2ndquadrant.com Whole thread Raw |
In response to | [RFC][PATCH] Logical Replication/BDR prototype and architecture (Andres Freund <andres@2ndquadrant.com>) |
Responses |
Re: [PATCH 08/16] Introduce the ApplyCache module which
can reassemble transactions from a stream of interspersed changes
|
List | pgsql-hackers |
From: Andres Freund <andres@anarazel.de> The individual changes need to be identified by an xid. The xid can be a subtransaction or a toplevel one, at commit those can be reintegrated by doing a k-way mergesort between the individual transaction. Callbacks for apply_begin, apply_change and apply_commit are provided to retrieve complete transactions. Missing: - spill-to-disk - correct subtransaction merge, current behaviour is simple/wrong - DDL handling (?) - resource usage controls ---src/backend/replication/Makefile | 2 +src/backend/replication/logical/Makefile | 19 ++src/backend/replication/logical/applycache.c| 380 ++++++++++++++++++++++++++src/include/replication/applycache.h | 185 +++++++++++++4 files changed, 586 insertions(+)create mode 100644 src/backend/replication/logical/Makefilecreatemode 100644 src/backend/replication/logical/applycache.ccreate mode 100644src/include/replication/applycache.h diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 9d9ec87..ae7f6b1 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o\ repl_gram.o syncrep.o +SUBDIRS = logical +include $(top_srcdir)/src/backend/common.mk# repl_scanner is compiled as part of repl_gram diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile new file mode 100644 index 0000000..2eadab8 --- /dev/null +++ b/src/backend/replication/logical/Makefile @@ -0,0 +1,19 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/logical +# +# IDENTIFICATION +# src/backend/replication/logical/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/logical +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) + +OBJS = applycache.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/applycache.c b/src/backend/replication/logical/applycache.c new file mode 100644 index 0000000..b73b0ba --- /dev/null +++ b/src/backend/replication/logical/applycache.c @@ -0,0 +1,380 @@ +/*------------------------------------------------------------------------- + * + * applycache.c + * + * PostgreSQL logical replay "cache" management + * + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/applycache.c + * + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "access/xact.h" +#include "catalog/pg_class.h" +#include "catalog/pg_control.h" +#include "replication/applycache.h" + +#include "utils/ilist.h" +#include "utils/memutils.h" +#include "utils/relcache.h" +#include "utils/syscache.h" + +const Size max_memtries = 1<<16; + +const size_t max_cached_changes = 1024; +const size_t max_cached_tuplebufs = 1024; /* ~8MB */ +const size_t max_cached_transactions = 512; + +typedef struct ApplyCacheTXNByIdEnt +{ + TransactionId xid; + ApplyCacheTXN* txn; +} ApplyCacheTXNByIdEnt; + +static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache); +static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn); + +static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid, bool create); + + +ApplyCache* +ApplyCacheAllocate(void) +{ + ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache)); + HASHCTL hash_ctl; + + if (!cache) + elog(ERROR, "Could not allocate the ApplyCache"); + + memset(&hash_ctl, 0, sizeof(hash_ctl)); + + cache->context = AllocSetContextCreate(TopMemoryContext, + "ApplyCache", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt); + hash_ctl.hash = tag_hash; + hash_ctl.hcxt = cache->context; + + cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + cache->nr_cached_transactions = 0; + cache->nr_cached_changes = 0; + cache->nr_cached_tuplebufs = 0; + + ilist_d_init(&cache->cached_transactions); + ilist_d_init(&cache->cached_changes); + ilist_s_init(&cache->cached_tuplebufs); + + return cache; +} + +void ApplyCacheFree(ApplyCache* cache) +{ + /* FIXME: check for in-progress transactions */ + /* FIXME: clean up cached transaction */ + /* FIXME: clean up cached changes */ + /* FIXME: clean up cached tuplebufs */ + hash_destroy(cache->by_txn); + free(cache); +} + +static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache) +{ + ApplyCacheTXN* txn; + + if (cache->nr_cached_transactions) + { + cache->nr_cached_transactions--; + txn = ilist_container(ApplyCacheTXN, node, + ilist_d_pop_front(&cache->cached_transactions)); + } + else + { + txn = (ApplyCacheTXN*) + malloc(sizeof(ApplyCacheTXN)); + + if (!txn) + elog(ERROR, "Could not allocate a ApplyCacheTXN struct"); + } + + memset(txn, 0, sizeof(ApplyCacheTXN)); + ilist_d_init(&txn->changes); + ilist_d_init(&txn->subtxns); + return txn; +} + +void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn) +{ + if(cache->nr_cached_transactions < max_cached_transactions){ + cache->nr_cached_transactions++; + ilist_d_push_front(&cache->cached_transactions, &txn->node); + } + else{ + free(txn); + } +} + +ApplyCacheChange* +ApplyCacheGetChange(ApplyCache* cache) +{ + ApplyCacheChange* change; + + if (cache->nr_cached_changes) + { + cache->nr_cached_changes--; + change = ilist_container(ApplyCacheChange, node, + ilist_d_pop_front(&cache->cached_changes)); + } + else + { + change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange)); + + if (!change) + elog(ERROR, "Could not allocate a ApplyCacheChange struct"); + } + + + memset(change, 0, sizeof(ApplyCacheChange)); + return change; +} + +void +ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change) +{ + if (change->newtuple) + ApplyCacheReturnTupleBuf(cache, change->newtuple); + if (change->oldtuple) + ApplyCacheReturnTupleBuf(cache, change->oldtuple); + + if (change->table) + heap_freetuple(change->table); + + if(cache->nr_cached_changes < max_cached_changes){ + cache->nr_cached_changes++; + ilist_d_push_front(&cache->cached_changes, &change->node); + } + else{ + free(change); + } +} + +ApplyCacheTupleBuf* +ApplyCacheGetTupleBuf(ApplyCache* cache) +{ + ApplyCacheTupleBuf* tuple; + + if (cache->nr_cached_tuplebufs) + { + cache->nr_cached_tuplebufs--; + tuple = ilist_container(ApplyCacheTupleBuf, node, + ilist_s_pop_front(&cache->cached_tuplebufs)); + } + else + { + tuple = + (ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf)); + + if (!tuple) + elog(ERROR, "Could not allocate a ApplyCacheTupleBuf struct"); + } + + return tuple; +} + +void +ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple) +{ + if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){ + cache->nr_cached_tuplebufs++; + ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node); + } + else{ + free(tuple); + } +} + + +static +ApplyCacheTXN* +ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create) +{ + ApplyCacheTXNByIdEnt* ent; + bool found; + + ent = (ApplyCacheTXNByIdEnt*) + hash_search(cache->by_txn, + (void *)&xid, + (create ? HASH_ENTER : HASH_FIND), + &found); + + if (found) + { +#ifdef VERBOSE_DEBUG + elog(LOG, "found cache entry for %u at %p", xid, ent); +#endif + } + else + { +#ifdef VERBOSE_DEBUG + elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u", + xid, cache, ent, create); +#endif + } + + if (!found && !create) + return NULL; + + if (!found) + { + ent->txn = ApplyCacheGetTXN(cache); + } + + return ent->txn; +} + +void +ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn, + ApplyCacheChange* change) +{ + ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true); + txn->lsn = lsn; + ilist_d_push_back(&txn->changes, &change->node); +} + + +void +ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid, + TransactionId subxid, XLogRecPtr lsn) +{ + ApplyCacheTXN* txn; + ApplyCacheTXN* subtxn; + + subtxn = ApplyCacheTXNByXid(cache, subxid, false); + + /* + * No need to do anything if that subtxn didn't contain any changes + */ + if (!subtxn) + return; + + subtxn->lsn = lsn; + + txn = ApplyCacheTXNByXid(cache, xid, true); + + ilist_d_push_back(&txn->subtxns, &subtxn->node); +} + +void +ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn) +{ + ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false); + ilist_d_node* cur_change, *next_change; + ilist_d_node* cur_txn, *next_txn; + bool found; + + if (!txn) + return; + + txn->lsn = lsn; + + cache->begin(cache, txn); + + /* + * FIXME: + * do a k-way mergesort of all changes ordered by xid + * + * For now we just iterate through all subtransactions and then through the + * main txn. But thats *WRONG*. + * + * The best way to do is probably to model the current heads of all TXNs as + * a heap and always remove from the smallest lsn till thats not the case + * anymore. + */ + ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns) + { + ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn); + + ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes) + { + ApplyCacheChange* change = + ilist_container(ApplyCacheChange, node, cur_change); + cache->apply_change(cache, txn, subtxn, change); + + ApplyCacheReturnChange(cache, change); + } + ApplyCacheReturnTXN(cache, subtxn); + } + + ilist_d_foreach_modify (cur_change, next_change, &txn->changes) + { + ApplyCacheChange* change = + ilist_container(ApplyCacheChange, node, cur_change); + cache->apply_change(cache, txn, NULL, change); + + ApplyCacheReturnChange(cache, change); + } + + cache->commit(cache, txn); + + /* now remove reference from cache */ + hash_search(cache->by_txn, + (void *)&xid, + HASH_REMOVE, + &found); + Assert(found); + + ApplyCacheReturnTXN(cache, txn); +} + +void +ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn) +{ + ilist_d_node* cur_change, *next_change; + ilist_d_node* cur_txn, *next_txn; + ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false); + bool found; + + /* no changes in this commit */ + if (!txn) + return; + + /* iterate through all subtransactions and free memory */ + ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns) + { + ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn); + ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes) + { + ApplyCacheChange* change = + ilist_container(ApplyCacheChange, node, cur_change); + ApplyCacheReturnChange(cache, change); + } + ApplyCacheReturnTXN(cache, subtxn); + } + + ilist_d_foreach_modify (cur_change, next_change, &txn->changes) + { + ApplyCacheChange* change = + ilist_container(ApplyCacheChange, node, cur_change); + ApplyCacheReturnChange(cache, change); + } + + /* now remove reference from cache */ + hash_search(cache->by_txn, + (void *)&xid, + HASH_REMOVE, + &found); + Assert(found); + + ApplyCacheReturnTXN(cache, txn); +} diff --git a/src/include/replication/applycache.h b/src/include/replication/applycache.h new file mode 100644 index 0000000..4ceba63 --- /dev/null +++ b/src/include/replication/applycache.h @@ -0,0 +1,185 @@ +/* + * applycache.h + * + * PostgreSQL logical replay "cache" management + * + * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/replication/applycache.h + */ +#ifndef APPLYCACHE_H +#define APPLYCACHE_H + +#include "access/htup.h" +#include "utils/hsearch.h" +#include "utils/ilist.h" + +typedef struct ApplyCache ApplyCache; + +enum ApplyCacheChangeType +{ + APPLY_CACHE_CHANGE_INSERT, + APPLY_CACHE_CHANGE_UPDATE, + APPLY_CACHE_CHANGE_DELETE +}; + +typedef struct ApplyCacheTupleBuf +{ + /* position in preallocated list */ + ilist_s_node node; + + HeapTupleData tuple; + HeapTupleHeaderData header; + char data[MaxHeapTupleSize]; +} ApplyCacheTupleBuf; + +typedef struct ApplyCacheChange +{ + XLogRecPtr lsn; + enum ApplyCacheChangeType action; + + ApplyCacheTupleBuf* newtuple; + + ApplyCacheTupleBuf* oldtuple; + + HeapTuple table; + + /* + * While in use this is how a change is linked into a transactions, + * otherwise its the preallocated list. + */ + ilist_d_node node; +} ApplyCacheChange; + +typedef struct ApplyCacheTXN +{ + TransactionId xid; + + XLogRecPtr lsn; + + /* + * How many ApplyCacheChange's do we have in this txn. + * + * Subtransactions are *not* included. + */ + Size nentries; + + /* + * How many of the above entries are stored in memory in contrast to being + * spilled to disk. + */ + Size nentries_mem; + + /* + * List of actual changes + */ + ilist_d_head changes; + + /* + * non-hierarchical list of subtransactions that are *not* aborted + */ + ilist_d_head subtxns; + + /* + * our position in a list of subtransactions while the TXN is in + * use. Otherwise its the position in the list of preallocated + * transactions. + */ + ilist_d_node node; +} ApplyCacheTXN; + + +/* XXX: were currently passing the originating subtxn. Not sure thats necessary */ +typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange*change); +typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn); +typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn); + +/* + * max number of concurrent top-level transactions or transaction where we + * don't know if they are top-level can be calculated by: + * (max_connections + max_prepared_xactx + ?) * PGPROC_MAX_CACHED_SUBXIDS + */ +struct ApplyCache +{ + TransactionId last_txn; + ApplyCacheTXN *last_txn_cache; + HTAB *by_txn; + + ApplyCacheBeginCB begin; + ApplyCacheApplyChangeCB apply_change; + ApplyCacheCommitCB commit; + + void* private_data; + + MemoryContext context; + + /* + * we don't want to repeatedly (de-)allocated those structs, so cache them for reusage. + */ + ilist_d_head cached_transactions; + size_t nr_cached_transactions; + + ilist_d_head cached_changes; + size_t nr_cached_changes; + + ilist_s_head cached_tuplebufs; + size_t nr_cached_tuplebufs; +}; + + +ApplyCache* +ApplyCacheAllocate(void); + +void +ApplyCacheFree(ApplyCache*); + +ApplyCacheTupleBuf* +ApplyCacheGetTupleBuf(ApplyCache*); + +void +ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple); + +/* + * Returns a (potentically preallocated) change struct. Its lifetime is managed + * by the applycache module. + * + * If not added to a transaction with ApplyCacheAddChange it needs to be + * returned via ApplyCacheReturnChange + * + * FIXME: better name + */ +ApplyCacheChange* +ApplyCacheGetChange(ApplyCache*); + +/* + * Return an unused ApplyCacheChange struct + */ +void +ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*); + + +/* + * record the transaction as in-progress if not already done, add the current + * change. + * + * We have a one-entry cache for lookin up the current ApplyCacheTXN so we + * don't need to do a full hash-lookup if the same xid is used + * sequentially. Them being used multiple times that way is rather frequent. + */ +void +ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, ApplyCacheChange*); + +/* + * + */ +void +ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn); + +void +ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr lsn); + +void +ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn); + +#endif -- 1.7.10.rc3.3.g19a6c.dirty
pgsql-hackers by date: