From 21b223f0aaa93e6fdf5644e3e6c1a7d7d0269fa9 Mon Sep 17 00:00:00 2001 From: Paul Guo Date: Thu, 28 Feb 2019 15:43:34 +0800 Subject: [PATCH v3] Heap batch insert for CTAS/MatView. --- src/backend/access/heap/heapam.c | 6 +- src/backend/commands/copy.c | 24 ++--- src/backend/commands/createas.c | 153 ++++++++++++++++++++++++++++++- src/include/access/heapam.h | 11 +++ 4 files changed, 169 insertions(+), 25 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 8ac0f8a513..5f5ed06e2d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2111,7 +2111,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { TransactionId xid = GetCurrentTransactionId(); - HeapTuple *heaptuples; int i; int ndone; PGAlignedBlock scratch; @@ -2120,6 +2119,10 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, Size saveFreeSpace; bool need_tuple_data = RelationIsLogicallyLogged(relation); bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); + /* Declare it as static to let this memory be not on stack. */ + static HeapTuple heaptuples[MAX_MULTI_INSERT_TUPLES]; + + Assert(ntuples <= MAX_MULTI_INSERT_TUPLES); /* currently not needed (thus unsupported) for heap_multi_insert() */ AssertArg(!(options & HEAP_INSERT_NO_LOGICAL)); @@ -2129,7 +2132,6 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, HEAP_DEFAULT_FILLFACTOR); /* Toast and set header data in all the slots */ - heaptuples = palloc(ntuples * sizeof(HeapTuple)); for (i = 0; i < ntuples; i++) { HeapTuple tuple; diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 84c54fbc70..5e0e929034 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -234,18 +234,6 @@ typedef struct uint64 processed; /* # of tuples processed */ } DR_copy; - -/* - * No more than this many tuples per CopyMultiInsertBuffer - * - * Caution: Don't make this too big, as we could end up with this many - * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's - * multiInsertBuffers list. Increasing this can cause quadratic growth in - * memory requirements during copies into partitioned tables with a large - * number of partitions. - */ -#define MAX_BUFFERED_TUPLES 1000 - /* * Flush buffers if there are >= this many bytes, as counted by the input * size, of tuples stored. @@ -258,11 +246,11 @@ typedef struct /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TupleTableSlot *slots[MAX_MULTI_INSERT_TUPLES]; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ BulkInsertState bistate; /* BulkInsertState for this rel */ int nused; /* number of 'slots' containing tuples */ - uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy + uint64 linenos[MAX_MULTI_INSERT_TUPLES]; /* Line # of tuple in copy * stream */ } CopyMultiInsertBuffer; @@ -2352,7 +2340,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES); buffer->resultRelInfo = rri; buffer->bistate = GetBulkInsertState(); buffer->nused = 0; @@ -2411,7 +2399,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, static inline bool CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) { - if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || + if (miinfo->bufferedTuples >= MAX_MULTI_INSERT_TUPLES || miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) return true; return false; @@ -2531,7 +2519,7 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertBuffer *buffer) FreeBulkInsertState(buffer->bistate); /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + for (i = 0; i < MAX_MULTI_INSERT_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); pfree(buffer); @@ -2617,7 +2605,7 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, int nused = buffer->nused; Assert(buffer != NULL); - Assert(nused < MAX_BUFFERED_TUPLES); + Assert(nused < MAX_MULTI_INSERT_TUPLES); if (buffer->slots[nused] == NULL) buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 4c1d909d38..66aa051c3a 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -62,6 +62,15 @@ typedef struct CommandId output_cid; /* cmin to insert in output tuples */ int ti_options; /* table_tuple_insert performance options */ BulkInsertState bistate; /* bulk insert state */ + MemoryContext mi_context; /* Memory context for multi insert */ + int tup_len; /* accurate or average tuple length. */ + /* Below are buffered slots and related information. */ + TupleTableSlot *buffered_slots[MAX_MULTI_INSERT_TUPLES]; + int buffered_slots_num; /* How many buffered slots for multi insert */ + int buffered_slots_size; /* Total tuple size for multi insert */ + /* Below are variables for sampling (to calculte avg.tup_len if needed). */ + int sampled_tuples_num; /* -1 means no sampling is needed. */ + uint64 sampled_tuples_size; /* Total tuple size of samples. */ } DR_intorel; /* utility functions for CTAS definition creation */ @@ -441,6 +450,8 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) RangeTblEntry *rte; ListCell *lc; int attnum; + int tup_len; + bool use_sampling; Assert(into != NULL); /* else somebody forgot to set it */ @@ -456,12 +467,22 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) */ attrList = NIL; lc = list_head(into->colNames); + tup_len = 0; + use_sampling = false; for (attnum = 0; attnum < typeinfo->natts; attnum++) { Form_pg_attribute attribute = TupleDescAttr(typeinfo, attnum); ColumnDef *col; char *colname; + if (attribute->attlen > 0) + { + if (!use_sampling) + tup_len += attribute->attlen; + } + else + use_sampling = true; /* Update tup_len via sampling. */ + if (lc) { colname = strVal(lfirst(lc)); @@ -561,11 +582,59 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) myState->ti_options = TABLE_INSERT_SKIP_FSM | (XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL); myState->bistate = GetBulkInsertState(); + memset(myState->buffered_slots, 0, sizeof(TupleTableSlot *) * MAX_MULTI_INSERT_TUPLES); + myState->buffered_slots_num = 0; + myState->buffered_slots_size = 0; + myState->tup_len = use_sampling ? 0 : tup_len; + myState->sampled_tuples_num = use_sampling ? 0 : -1; + myState->sampled_tuples_size = 0; + + /* + * Create a temporary memory context so that we can reset once per + * multi insert. + */ + myState->mi_context = AllocSetContextCreate(CurrentMemoryContext, + "intorel_multi_insert", + ALLOCSET_DEFAULT_SIZES); /* Not using WAL requires smgr_targblock be initially invalid */ Assert(RelationGetTargetBlock(intoRelationDesc) == InvalidBlockNumber); } +/* + * If the tuple length, which is obtained either through sampling on tuples with + * variable length attribute(s), or through calculating for tuples with + * accurate length attributes, is larger than or equal to this value, we do + * not use multi insert since memory copy overhead could decrease the + * benefit of multi insert. + */ +#define MAX_TUP_LEN_FOR_MULTI_INSERT 1600 + +/* How many first tuples are sampled to calculte average tuple length? */ +#define MAX_MULTI_INSERT_SAMPLES 1000 + +static void +intorel_flush_multi_insert(DR_intorel *myState) +{ + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(myState->mi_context); + + table_multi_insert(myState->rel, myState->buffered_slots, + myState->buffered_slots_num, myState->output_cid, + myState->ti_options, myState->bistate); + + MemoryContextReset(myState->mi_context); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < myState->buffered_slots_num; i++) + ExecClearTuple(myState->buffered_slots[i]); + + myState->buffered_slots_num = 0; + myState->buffered_slots_size = 0; +} + /* * intorel_receive --- receive one tuple */ @@ -573,6 +642,8 @@ static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_intorel *myState = (DR_intorel *) self; + TupleTableSlot *batchslot; + HeapTuple tuple; /* * Note that the input slot might not be of the type of the target @@ -583,11 +654,72 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) * tuple's xmin), but since we don't do that here... */ - table_tuple_insert(myState->rel, - slot, - myState->output_cid, - myState->ti_options, - myState->bistate); + /* + * If the accurate/average tuple length is large, do single insert. + * We do not call ExecFetchSlotHeapTuple() for the input slot to get + * accurate tuple length here since sometimes it is wasteful to call + * it again in table_tuple_insert(), e.g. VirtualTupleTableSlot + */ + if (myState->tup_len >= MAX_TUP_LEN_FOR_MULTI_INSERT) + { + table_tuple_insert(myState->rel, + slot, + myState->output_cid, + myState->ti_options, + myState->bistate); + return true; + } + + /* Copy the slot to batchslot lists and materialize them. */ + if (myState->buffered_slots[myState->buffered_slots_num] == NULL) + { + batchslot = table_slot_create(myState->rel, NULL); + myState->buffered_slots[myState->buffered_slots_num] = batchslot; + } + else + batchslot = myState->buffered_slots[myState->buffered_slots_num]; + + ExecCopySlot(batchslot, slot); + /* + * In theory we do not need materalize here but if both input slot and + * dst slot are BufferHeapTupleTableSlot, there might be hot code in + * ResourceOwnerForgetBuffer() and ResourceOwnerRememberBuffer() + * since we do them in batch. We could easily work around this by doing + * materialize in advance. This is harmless since later when calling + * table_multi_insert(), we need materialize also. + */ + ExecMaterializeSlot(batchslot); + myState->buffered_slots_num++; + + if (myState->sampled_tuples_num < 0 || + myState->sampled_tuples_num == MAX_MULTI_INSERT_SAMPLES) + myState->buffered_slots_size += myState->tup_len; + else + { + /* + * Sampling to get the rough average tuple length for later use. + * We do not use plan width since that is inaccurate sometimes. + */ + tuple = ExecFetchSlotHeapTuple(batchslot, true, NULL); + + myState->buffered_slots_size += tuple->t_len; + myState->sampled_tuples_size += tuple->t_len; + myState->sampled_tuples_num++; + + /* + * Just finished sampling. Let's update myState->tup_len and + * flush the tuples since in next call we possibly do single insert. + */ + if(myState->sampled_tuples_num == MAX_MULTI_INSERT_SAMPLES) + { + myState->tup_len = myState->sampled_tuples_size / myState->sampled_tuples_num; + intorel_flush_multi_insert(myState); + } + } + + if (myState->buffered_slots_num == MAX_MULTI_INSERT_TUPLES || + myState->buffered_slots_size >= 65535) + intorel_flush_multi_insert(myState); /* We know this is a newly created relation, so there are no indexes */ @@ -601,11 +733,22 @@ static void intorel_shutdown(DestReceiver *self) { DR_intorel *myState = (DR_intorel *) self; + int i; + + if (myState->buffered_slots_num != 0) + intorel_flush_multi_insert(myState); + + for (i = 0; i < MAX_MULTI_INSERT_TUPLES && myState->buffered_slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(myState->buffered_slots[i]); FreeBulkInsertState(myState->bistate); table_finish_bulk_insert(myState->rel, myState->ti_options); + if (myState->mi_context) + MemoryContextDelete(myState->mi_context); + myState->mi_context = NULL; + /* close rel, but keep lock until commit */ table_close(myState->rel, NoLock); myState->rel = NULL; diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index dffb57bf11..e90c6a3fc6 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -40,6 +40,17 @@ struct TupleTableSlot; #define MaxLockTupleMode LockTupleExclusive +/* + * No more than this many tuples per MultiInsertBuffer + * + * Caution: Don't make this too big. For COPY, we could end up with this many + * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's + * multiInsertBuffers list. Increasing this can cause quadratic growth in + * memory requirements during copies into partitioned tables with a large + * number of partitions. For CTAS/MatView, the impact is similar. + */ +#define MAX_MULTI_INSERT_TUPLES 1000 + /* * Descriptor for heap table scans. */ -- 2.17.2