From 59bd7de19762241fa53eed6f64510f022345b14b Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 8 Dec 2020 13:01:32 +0530 Subject: [PATCH v1] COPY With New Multi and Single Insert Table AM This patch adds new single and multi insert table access method to COPY code. --- src/backend/commands/copyfrom.c | 483 +++++++++++--------------------- 1 file changed, 163 insertions(+), 320 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1b14e9a6eb..8376af32f5 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -45,10 +45,10 @@ #include "utils/snapmgr.h" /* - * No more than this many tuples per CopyMultiInsertBuffer + * No more than this many tuples per multi insert buffer * * Caution: Don't make this too big, as we could end up with this many - * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's + * multi insert buffer items stored in CopyMultiInsertInfo's * multiInsertBuffers list. Increasing this can cause quadratic growth in * memory requirements during copies into partitioned tables with a large * number of partitions. @@ -67,31 +67,11 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ - ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel */ - int nused; /* number of 'slots' containing tuples */ - uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy - * stream */ + TableInsertState *istate; + /* Line # of tuple in copy stream. */ + uint64 linenos[MAX_BUFFERED_TUPLES]; } CopyMultiInsertBuffer; -/* - * Stores one or many CopyMultiInsertBuffers and details about the size and - * number of tuples which are stored in them. This allows multiple buffers to - * exist at once when COPYing into a partitioned table. - */ -typedef struct CopyMultiInsertInfo -{ - List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ - int bufferedTuples; /* number of tuples buffered over all buffers */ - int bufferedBytes; /* number of bytes from all buffered tuples */ - CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */ - EState *estate; /* Executor state used for COPY */ - CommandId mycid; /* Command Id used for COPY */ - int ti_options; /* table insert options */ -} CopyMultiInsertInfo; - - /* non-export function prototypes */ static char *limit_printout_length(const char *str); @@ -204,227 +184,130 @@ limit_printout_length(const char *str) return res; } -/* - * Allocate memory and initialize a new CopyMultiInsertBuffer for this - * ResultRelInfo. - */ -static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +static void +InitCopyMultiInsertBufferInfo(List **mirri, ResultRelInfo *rri, + CommandId mycid, int ti_options) { CopyMultiInsertBuffer *buffer; + TriggerDesc *trigdesc = rri->ri_TrigDesc; - buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); - buffer->resultRelInfo = rri; - buffer->bistate = GetBulkInsertState(); - buffer->nused = 0; - - return buffer; -} + buffer = (CopyMultiInsertBuffer *) palloc0(sizeof(CopyMultiInsertBuffer)); -/* - * Make a new buffer for this ResultRelInfo. - */ -static inline void -CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) -{ - CopyMultiInsertBuffer *buffer; + buffer->istate = table_insert_begin(rri->ri_RelationDesc, + mycid, + ti_options, + true, + true, + MAX_BUFFERED_TUPLES, + MAX_BUFFERED_BYTES); - buffer = CopyMultiInsertBufferInit(rri); + if (rri->ri_NumIndices || + (trigdesc && (trigdesc->trig_insert_after_row || + trigdesc->trig_insert_new_table))) + buffer->istate->mistate->clear_slots = false; - /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; - /* Record that we're tracking this buffer */ - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); -} - -/* - * Initialize an already allocated CopyMultiInsertInfo. - * - * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up - * for that table. - */ -static void -CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - CopyFromState cstate, EState *estate, CommandId mycid, - int ti_options) -{ - miinfo->multiInsertBuffers = NIL; - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; - miinfo->cstate = cstate; - miinfo->estate = estate; - miinfo->mycid = mycid; - miinfo->ti_options = ti_options; - /* - * Only setup the buffer when not dealing with a partitioned table. - * Buffers for partitioned tables will just be setup when we need to send - * tuples their way for the first time. - */ - if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); + *mirri = lappend(*mirri, rri); } -/* - * Returns true if the buffers are full - */ -static inline bool -CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) -{ - if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || - miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) - return true; - return false; -} - -/* - * Returns true if we have no buffered tuples - */ -static inline bool -CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) -{ - return miinfo->bufferedTuples == 0; -} - -/* - * Write the tuples stored in 'buffer' out to the table. - */ -static inline void -CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) +static void +HandleAfterRowEvents(ResultRelInfo *rri, EState *estate, + CopyFromState cstate, int32 cur_slots) { - MemoryContext oldcontext; - int i; - uint64 save_cur_lineno; - CopyFromState cstate = miinfo->cstate; - EState *estate = miinfo->estate; - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; + int i; + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + TableInsertState *istate = buffer->istate; + uint64 save_cur_lineno = cstate->cur_lineno; bool line_buf_valid = cstate->line_buf_valid; - int nused = buffer->nused; - ResultRelInfo *resultRelInfo = buffer->resultRelInfo; - TupleTableSlot **slots = buffer->slots; - /* - * Print error context information correctly, if one of the operations - * below fail. - */ cstate->line_buf_valid = false; - save_cur_lineno = cstate->cur_lineno; - - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); - - for (i = 0; i < nused; i++) + for (i = 0; i < cur_slots; i++) { /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (rri->ri_NumIndices > 0) { - List *recheckIndexes; + List *recheckIndexes; cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, NULL, - NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, + ExecInsertIndexTuples(rri, + istate->mistate->slots[i], estate, + false, + NULL, + NULL); + + ExecARInsertTriggers(estate, + rri, + istate->mistate->slots[i], + recheckIndexes, cstate->transition_capture); + list_free(recheckIndexes); } /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (rri->ri_TrigDesc != NULL && + (rri->ri_TrigDesc->trig_insert_after_row || + rri->ri_TrigDesc->trig_insert_new_table)) { cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); + ExecARInsertTriggers(estate, + rri, + istate->mistate->slots[i], + NULL, + cstate->transition_capture); } - ExecClearTuple(slots[i]); + ExecClearTuple(istate->mistate->slots[i]); } - /* Mark that all slots are free */ - buffer->nused = 0; - /* reset cur_lineno and line_buf_valid to what they were */ cstate->line_buf_valid = line_buf_valid; cstate->cur_lineno = save_cur_lineno; } -/* - * Drop used slots and free member for this buffer. - * - * The buffer must be flushed before cleanup. - */ -static inline void -CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) +static void +CopyMultiInsertBufferTuple(ResultRelInfo *rri, TupleTableSlot *slot, + CopyFromState cstate, EState *estate) { - int i; - - /* Ensure buffer was flushed */ - Assert(buffer->nused == 0); - - /* Remove back-link to ourself */ - buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - - FreeBulkInsertState(buffer->bistate); + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + TableInsertState *istate = buffer->istate; + int32 cur_slots = istate->mistate->cur_slots; - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + buffer->linenos[istate->mistate->cur_slots] = cstate->cur_lineno; + istate->mistate->cur_tup_size = cstate->line_buf.len; - table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + table_multi_insert_v2(buffer->istate, slot); - pfree(buffer); + if (istate->mistate->flushed) + HandleAfterRowEvents(rri, estate, cstate, cur_slots); } -/* - * Write out all stored tuples in all buffers out to the tables. - * - * Once flushed we also trim the tracked buffers list down to size by removing - * the buffers created earliest first. - * - * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being - * used. When cleaning up old buffers we'll never remove the one for - * 'curr_rri'. - */ -static inline void -CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) +static void +CopyMulitInsertFlushBuffers(List **mirri, ResultRelInfo *curr_rri, + CopyFromState cstate, EState *estate) { ListCell *lc; - foreach(lc, miinfo->multiInsertBuffers) + foreach(lc, *mirri) { - CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); + ResultRelInfo *rri = lfirst(lc); + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + TableInsertState *istate = buffer->istate; + int32 cur_slots = istate->mistate->cur_slots; - CopyMultiInsertBufferFlush(miinfo, buffer); - } + table_multi_insert_flush(istate); - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; + if (istate->mistate->flushed) + HandleAfterRowEvents(rri, estate, cstate, cur_slots); + } /* * Trim the list of tracked buffers down if it exceeds the limit. Here we @@ -432,87 +315,62 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) * likely that these older ones will be needed than the ones that were * just created. */ - while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) + while (list_length(*mirri) > MAX_PARTITION_BUFFERS) { + ResultRelInfo *rri; CopyMultiInsertBuffer *buffer; + TableInsertState *istate; + int ti_options; - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + rri = (ResultRelInfo *) linitial(*mirri); /* * We never want to remove the buffer that's currently being used, so * if we happen to find that then move it to the end of the list. */ - if (buffer->resultRelInfo == curr_rri) + if (rri == curr_rri) { - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); - buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); + *mirri = list_delete_first(*mirri); + *mirri = lappend(*mirri, rri); + rri = (ResultRelInfo *) linitial(*mirri); } - CopyMultiInsertBufferCleanup(miinfo, buffer); - miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); - } -} + buffer = rri->ri_CopyMultiInsertBuffer; + istate = buffer->istate; + istate->mistate->clear_slots = true; + ti_options = istate->options; -/* - * Cleanup allocated buffers and free memory - */ -static inline void -CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) -{ - ListCell *lc; + table_insert_end(istate); - foreach(lc, miinfo->multiInsertBuffers) - CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); + table_finish_bulk_insert(rri->ri_RelationDesc, ti_options); - list_free(miinfo->multiInsertBuffers); + *mirri = list_delete_first(*mirri); + } } -/* - * Get the next TupleTableSlot that the next tuple should be stored in. - * - * Callers must ensure that the buffer is not full. - * - * Note: 'miinfo' is unused but has been included for consistency with the - * other functions in this area. - */ -static inline TupleTableSlot * -CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) +static void +CopyMulitInsertDropBuffers(List *mirri) { - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; - int nused = buffer->nused; - - Assert(buffer != NULL); - Assert(nused < MAX_BUFFERED_TUPLES); + ListCell *lc; - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; -} + foreach(lc, mirri) + { + int ti_options; + ResultRelInfo *rri = lfirst(lc); + CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + TableInsertState *istate = buffer->istate; -/* - * Record the previously reserved TupleTableSlot that was reserved by - * CopyMultiInsertInfoNextFreeSlot as being consumed. - */ -static inline void -CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, - TupleTableSlot *slot, int tuplen, uint64 lineno) -{ - CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; + istate->mistate->clear_slots = true; + ti_options = istate->options; - Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); + table_insert_end(istate); - /* Store the line number so we can properly report any errors later */ - buffer->linenos[buffer->nused] = lineno; + table_finish_bulk_insert(rri->ri_RelationDesc, ti_options); - /* Record this slot as being used */ - buffer->nused++; + pfree(buffer); + } - /* Update how many tuples are stored and their size */ - miinfo->bufferedTuples++; - miinfo->bufferedBytes += tuplen; + list_free(mirri); } /* @@ -527,20 +385,20 @@ CopyFrom(CopyFromState cstate) EState *estate = CreateExecutorState(); /* for ExecConstraints() */ ModifyTableState *mtstate; ExprContext *econtext; - TupleTableSlot *singleslot = NULL; + TupleTableSlot *slot = NULL; MemoryContext oldcontext = CurrentMemoryContext; PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; CommandId mycid = GetCurrentCommandId(true); int ti_options = 0; /* start with default options for insert */ - BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; - CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ uint64 processed = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + List *multi_insert_rris = NULL; + TableInsertState *istate = NULL; Assert(cstate->rel); Assert(list_length(cstate->range_table) == 1); @@ -723,7 +581,7 @@ CopyFrom(CopyFromState cstate) * For partitioned tables we can't support multi-inserts when there * are any statement level insert triggers. It might be possible to * allow partitioned tables with such triggers in the future, but for - * now, CopyMultiInsertInfoFlush expects that any before row insert + * now, CopyMulitInsertFlushBuffers expects that any before row insert * and statement level insert triggers are on the same relation. */ insertMethod = CIM_SINGLE; @@ -771,22 +629,22 @@ CopyFrom(CopyFromState cstate) else insertMethod = CIM_MULTI; - CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, - estate, mycid, ti_options); + /* + * Only setup the buffer when not dealing with a partitioned table. + * Buffers for partitioned tables will just be setup when we need to + * send tuples their way for the first time. + */ + if (!proute) + InitCopyMultiInsertBufferInfo(&multi_insert_rris, resultRelInfo, + mycid, ti_options); } /* - * If not using batch mode (which allocates slots as needed) set up a - * tuple slot too. When inserting into a partitioned table, we also need - * one, even if we might batch insert, to read the tuple in the root - * partition's form. + * Set up a tuple slot to which the input data from copy stream is read + * into and used for inserts into table. */ - if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) - { - singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, - &estate->es_tupleTable); - bistate = GetBulkInsertState(); - } + slot = table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row); @@ -824,19 +682,8 @@ CopyFrom(CopyFromState cstate) ResetPerTupleExprContext(estate); /* select slot to (initially) load row into */ - if (insertMethod == CIM_SINGLE || proute) - { - myslot = singleslot; - Assert(myslot != NULL); - } - else - { - Assert(resultRelInfo == target_resultRelInfo); - Assert(insertMethod == CIM_MULTI); - - myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, - resultRelInfo); - } + myslot = slot; + Assert(myslot != NULL); /* * Switch to per-tuple context before calling NextCopyFrom, which does @@ -904,21 +751,22 @@ CopyFrom(CopyFromState cstate) if (leafpart_use_multi_insert) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) - CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); + InitCopyMultiInsertBufferInfo(&multi_insert_rris, + resultRelInfo, mycid, + ti_options); } - else if (insertMethod == CIM_MULTI_CONDITIONAL && - !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) + else if (insertMethod == CIM_MULTI_CONDITIONAL) { /* * Flush pending inserts if this partition can't use * batching, so rows are visible to triggers etc. */ - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMulitInsertFlushBuffers(&multi_insert_rris, + resultRelInfo, cstate, estate); } - if (bistate != NULL) - ReleaseBulkInsertStatePin(bistate); + if (istate && istate->bistate) + ReleaseBulkInsertStatePin(istate->bistate); prevResultRelInfo = resultRelInfo; } @@ -960,8 +808,8 @@ CopyFrom(CopyFromState cstate) /* no other path available for partitioned table */ Assert(insertMethod == CIM_MULTI_CONDITIONAL); - batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, - resultRelInfo); + batchslot = table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); if (map != NULL) myslot = execute_attr_map_slot(map->attrMap, myslot, @@ -1033,24 +881,9 @@ CopyFrom(CopyFromState cstate) /* Store the slot in the multi-insert buffer, when enabled. */ if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) { - /* - * The slot previously might point into the per-tuple - * context. For batching it needs to be longer lived. - */ - ExecMaterializeSlot(myslot); - /* Add this tuple to the tuple buffer */ - CopyMultiInsertInfoStore(&multiInsertInfo, - resultRelInfo, myslot, - cstate->line_buf.len, - cstate->cur_lineno); - - /* - * If enough inserts have queued up, then flush all - * buffers out to their tables. - */ - if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMultiInsertBufferTuple(resultRelInfo, myslot, cstate, + estate); } else { @@ -1076,9 +909,21 @@ CopyFrom(CopyFromState cstate) } else { + if (!istate) + { + istate = table_insert_begin(resultRelInfo->ri_RelationDesc, + mycid, + ti_options, + true, + false, + -1, + -1); + } + + istate->rel = resultRelInfo->ri_RelationDesc; + /* OK, store the tuple and create index entries for it */ - table_tuple_insert(resultRelInfo->ri_RelationDesc, - myslot, mycid, ti_options, bistate); + table_insert_v2(istate, myslot); if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(resultRelInfo, @@ -1108,16 +953,14 @@ CopyFrom(CopyFromState cstate) /* Flush any remaining buffered tuples */ if (insertMethod != CIM_SINGLE) - { - if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); - } + CopyMulitInsertFlushBuffers(&multi_insert_rris, resultRelInfo, + cstate, estate); /* Done, clean up */ error_context_stack = errcallback.previous; - if (bistate != NULL) - FreeBulkInsertState(bistate); + if (istate) + table_insert_end(istate); MemoryContextSwitchTo(oldcontext); @@ -1144,7 +987,7 @@ CopyFrom(CopyFromState cstate) /* Tear down the multi-insert buffer data */ if (insertMethod != CIM_SINGLE) - CopyMultiInsertInfoCleanup(&multiInsertInfo); + CopyMulitInsertDropBuffers(multi_insert_rris); /* Close all the partitioned tables, leaf partitions, and their indices */ if (proute) -- 2.25.1