[PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical - Mailing list pgsql-hackers
| From | Andres Freund |
|---|---|
| Subject | [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical |
| Date | |
| Msg-id | 1339586927-13156-7-git-send-email-andres@2ndquadrant.com Whole thread Raw |
| In response to | [RFC][PATCH] Logical Replication/BDR prototype and architecture (Andres Freund <andres@2ndquadrant.com>) |
| Responses |
Re: [PATCH 07/16] Log enough data into the wal to
reconstruct logical changes from it if wal_level=logical
Re: [PATCH 07/16] Log enough data into the wal to reconstruct logical changes from it if wal_level=logical |
| List | pgsql-hackers |
From: Andres Freund <andres@anarazel.de>
This adds a new wal_level value 'logical'
Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---src/backend/access/heap/heapam.c | 135 ++++++++++++++++++++++++++++---src/backend/access/transam/xlog.c
| 1 +src/backend/catalog/index.c | 74 +++++++++++++++++src/bin/pg_controldata/pg_controldata.c | 2
+src/include/access/xlog.h | 3 +-src/include/catalog/index.h | 4 +6 files changed, 207
insertions(+),12 deletions(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9519e73..9149d53 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -52,6 +52,7 @@#include "access/xact.h"#include "access/xlogutils.h"#include "catalog/catalog.h"
+#include "catalog/index.h"#include "catalog/namespace.h"#include "miscadmin.h"#include "pgstat.h"
@@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec;
xl_heap_header xlhdr; XLogRecPtr recptr;
- XLogRecData rdata[3];
+ XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info =
XLOG_HEAP_INSERT;
+ /*
+ * For the logical replication case we need the tuple even if were
+ * doing a full page write. We could alternatively store a pointer into
+ * the fpw though.
+ * For that to work we add another rdata entry for the buffer in that
+ * case.
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+ xlrec.all_visible_cleared = all_visible_cleared; xlrec.target.node = relation->rd_node;
xlrec.target.tid= heaptup->t_self;
@@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data =
(char*) &xlhdr; rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = buffer;
+ rdata[1].buffer = need_tuple ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next
=&(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *)
heaptup->t_data+ offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len -
offsetof(HeapTupleHeaderData,t_bits);
- rdata[2].buffer = buffer;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next
=NULL; /*
+ * add record for the buffer without actual content thats removed if
+ * fpw is done for that buffer
+ */
+ if(need_tuple){
+ rdata[2].next = &(rdata[3]);
+
+ rdata[3].data = NULL;
+ rdata[3].len = 0;
+ rdata[3].buffer = buffer;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
+ }
+
+ /* * If this is the single and first tuple on page, we can reinit the * page instead of
restoringthe whole thing. Set flag, and hide * buffer references from XLogInsert.
@@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page)== FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+ rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr =
XLogInsert(RM_HEAP_ID,info, rdata);
@@ -2568,7 +2592,9 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr;
- XLogRecData rdata[2];
+ XLogRecData rdata[4];
+
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id >= FirstNormalObjectId;
xlrec.all_visible_cleared= all_visible_cleared; xlrec.target.node = relation->rd_node;
@@ -2584,6 +2610,73 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL;
+ /*
+ * XXX: We could decide not to log changes when the origin is not the
+ * local node, that should reduce redundant logging.
+ */
+ if(need_tuple){
+ xl_heap_header xlhdr;
+
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+ TupleDesc desc = RelationGetDescr(relation);
+ Relation index_rel;
+ TupleDesc indexdesc;
+ int natt;
+
+ Datum idxvals[INDEX_MAX_KEYS];
+ bool idxisnull[INDEX_MAX_KEYS];
+ HeapTuple idxtuple;
+
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+ MemSet(idxvals, 0, sizeof(idxvals));
+ MemSet(idxisnull, 0, sizeof(idxisnull));
+ relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if(!indexoid){
+ elog(WARNING, "Could not find primary key for table with oid %u",
+ relation->rd_id);
+ goto no_index_found;
+ }
+
+ index_rel = index_open(indexoid, AccessShareLock);
+
+ indexdesc = RelationGetDescr(index_rel);
+
+ for(natt = 0; natt < indexdesc->natts; natt++){
+ idxvals[natt] =
+ fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+ Assert(!idxisnull[natt]);
+ }
+
+ idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+ xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+ xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+ xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+ rdata[1].next = &(rdata[2]);
+ rdata[2].data = (char*)&xlhdr;
+ rdata[2].len = SizeOfHeapHeader;
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].next = NULL;
+
+ rdata[2].next = &(rdata[3]);
+ rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].buffer = InvalidBuffer;
+ rdata[3].next = NULL;
+
+ heap_close(index_rel, NoLock);
+ no_index_found:
+ ;
+ }
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr);
@@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, xl_heap_header xlhdr;
uint8 info; XLogRecPtr recptr;
- XLogRecData rdata[4];
+ XLogRecData rdata[5]; Page page = BufferGetPage(newbuf);
+ /*
+ * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+ /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln));
@@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, xlhdr.t_hoff =
newtup->t_data->t_hoff; /*
- * As with insert records, we need not store the rdata[2] segment if we
- * decide to store the whole buffer instead.
+ * As with insert's logging , we need not store the the Datum containing
+ * tuples separately from the buffer if we do logical replication that
+ * is... */ rdata[2].data = (char *) &xlhdr; rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = newbuf;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next =
&(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[3].data = (char *) newtup->t_data +
offsetof(HeapTupleHeaderData,t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = newbuf;
+ rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL;
+ /*
+ * separate storage for the buffer reference of the new page in the
+ * wal_level=logical case
+ */
+ if(need_tuple){
+ rdata[3].next = &(rdata[4]);
+
+ rdata[4].data = NULL,
+ rdata[4].len = 0;
+ rdata[4].buffer = newbuf;
+ rdata[4].buffer_std = true;
+ rdata[4].next = NULL;
+ }
+ /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self))
==FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |=
XLOG_HEAP_INIT_PAGE;
- rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+ rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID,
info,rdata);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 166efb0..c6feed0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = { {"minimal", WAL_LEVEL_MINIMAL, false},
{"archive",WAL_LEVEL_ARCHIVE, false}, {"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+ {"logical", WAL_LEVEL_LOGICAL, false}, {NULL, 0, false}};
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9e8b1cc..4cddcac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@#include "nodes/nodeFuncs.h"#include "optimizer/clauses.h"#include "parser/parser.h"
+#include "parser/parse_relation.h"#include "storage/bufmgr.h"#include "storage/lmgr.h"#include "storage/predicate.h"
@@ -3311,3 +3312,76 @@ ResetReindexPending(void){ pendingReindexedIndexes = NIL;}
+
+/*
+ * relationFindPrimaryKey
+ * Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses){
+ List *indexoidlist;
+ ListCell *indexoidscan;
+ HeapTuple indexTuple = NULL;
+ Datum indclassDatum;
+ bool isnull;
+ oidvector *indclass;
+ int i;
+ Form_pg_index indexStruct = NULL;
+
+ *indexOid = InvalidOid;
+
+ indexoidlist = RelationGetIndexList(pkrel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirst_oid(indexoidscan);
+
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if(!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+ if(indexStruct->indisprimary && indexStruct->indimmediate)
+ {
+ *indexOid = indexoid;
+ break;
+ }
+ ReleaseSysCache(indexTuple);
+
+ }
+ list_free(indexoidlist);
+
+ if (!OidIsValid(*indexOid))
+ return;
+
+ /* Must get indclass the hard way */
+ indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+ Anum_pg_index_indclass, &isnull);
+ Assert(!isnull);
+ indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ *nratts = indexStruct->indnatts;
+ /*
+ * Now build the list of PK attributes from the indkey definition (we
+ * assume a primary key cannot have expressional elements)
+ */
+ for (i = 0; i < indexStruct->indnatts; i++)
+ {
+ int pkattno = indexStruct->indkey.values[i];
+
+ attnums[i] = pkattno;
+ atttypids[i] = attnumTypeId(pkrel, pkattno);
+ opclasses[i] = indclass->values[i];
+ }
+
+ ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index c00183a..47715c9 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level) return "archive"; case WAL_LEVEL_HOT_STANDBY:
return "hot_standby";
+ case WAL_LEVEL_LOGICAL:
+ return "logical"; } return _("unrecognized wal_level");}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index df5f232..2843aca 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,7 +199,8 @@ typedef enum WalLevel{ WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_ARCHIVE,
- WAL_LEVEL_HOT_STANDBY
+ WAL_LEVEL_HOT_STANDBY,
+ WAL_LEVEL_LOGICAL} WalLevel;extern int wal_level;
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7c8198f..2ba0ac3 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);extern bool ReindexIsProcessingIndex(Oid
indexOid);externOid IndexGetRelation(Oid indexId, bool missing_ok);
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses);
+#endif /* INDEX_H */
--
1.7.10.rc3.3.g19a6c.dirty
pgsql-hackers by date: