diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 7bf35602b0..32c2eacf7a 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8533,6 +8533,7 @@ drop table loct2; -- =================================================================== -- test COPY FROM -- =================================================================== +alter server loopback options (add batch_size '2'); create table loc2 (f1 int, f2 text); alter table loc2 set (autovacuum_enabled = 'false'); create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); @@ -8555,7 +8556,7 @@ copy rem2 from stdin; -- ERROR ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" DETAIL: Failing row contains (-1, xyzzy). CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) -COPY rem2, line 1: "-1 xyzzy" +COPY rem2 select * from rem2; f1 | f2 ----+----- @@ -8566,6 +8567,19 @@ select * from rem2; alter foreign table rem2 drop constraint rem2_f1positive; alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); +copy foo from stdin; +NOTICE: (1) -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -8674,6 +8688,34 @@ drop trigger rem2_trig_row_before on rem2; drop trigger rem2_trig_row_after on rem2; drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +ERROR: column "f1" of relation "loc2" does not exist +CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2), ($3, $4) +COPY rem2 +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(2 rows) + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +select * from rem2; +-- +(4 rows) + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; @@ -8690,6 +8732,7 @@ select * from rem3; drop foreign table rem3; drop table loc3; +alter server loopback options (drop batch_size); -- =================================================================== -- test for TRUNCATE -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 16320170ce..7d18c70f1a 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2059,6 +2059,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) resultRelInfo->ri_TrigDesc->trig_insert_after_row))) return 1; + /* + * If the foreign table has no columns, disable batching as the INSERT + * syntax doesn't allow batching multiple empty rows into a zero-column + * table. This isn't needed in case of INSERT, but is in case of COPY. + * Note that in the latter case fmstate must be non-NULL. + */ + if (fmstate && list_length(fmstate->target_attrs) == 0) + return 1; + /* * Otherwise use the batch size specified for server/table. The number of * parameters in a batch is limited to 65535 (uint16), so make sure we diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 42735ae78a..b0743098d4 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2368,6 +2368,7 @@ drop table loct2; -- test COPY FROM -- =================================================================== +alter server loopback options (add batch_size '2'); create table loc2 (f1 int, f2 text); alter table loc2 set (autovacuum_enabled = 'false'); create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); @@ -2400,6 +2401,23 @@ alter table loc2 drop constraint loc2_f1positive; delete from rem2; +create table foo (a int) partition by list (a); +create table foo1 (like foo); +create foreign table ffoo1 partition of foo for values in (1) + server loopback options (table_name 'foo1'); +create table foo2 (like foo); +create foreign table ffoo2 partition of foo for values in (2) + server loopback options (table_name 'foo2'); +create function print_new_row() returns trigger language plpgsql as $$ + begin raise notice '%', new; return new; end; $$; +create trigger ffoo1_br_trig before insert on ffoo1 + for each row execute function print_new_row(); + +copy foo from stdin; +1 +2 +\. + -- Test local triggers create trigger trig_stmt_before before insert on rem2 for each statement execute procedure trigger_func(); @@ -2500,6 +2518,34 @@ drop trigger loc2_trig_row_before_insert on loc2; delete from rem2; +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; +1 foo +2 bar +\. + +alter table loc2 add column f1 int; +alter table loc2 add column f2 int; +select * from rem2; + +-- dropped columns locally and on the foreign server +alter table rem2 drop column f1; +alter table rem2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + +alter table loc2 drop column f1; +alter table loc2 drop column f2; +copy rem2 from stdin; + + +\. +select * from rem2; + -- test COPY FROM with foreign table created in the same transaction create table loc3 (f1 int, f2 text); begin; @@ -2513,6 +2559,7 @@ commit; select * from rem3; drop foreign table rem3; drop table loc3; +alter server loopback options (drop batch_size); -- =================================================================== -- test for TRUNCATE diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index a976008b3d..ced3b53191 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -116,6 +116,13 @@ CopyFromErrorCallback(void *arg) { CopyFromState cstate = (CopyFromState) arg; + if (cstate->relname_only) + { + errcontext("COPY %s", + cstate->cur_relname); + return; + } + if (cstate->opts.binary) { /* can't usefully display the data */ @@ -222,7 +229,7 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); buffer->resultRelInfo = rri; - buffer->bistate = GetBulkInsertState(); + buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -299,83 +306,162 @@ CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) */ static inline void CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, - CopyMultiInsertBuffer *buffer) + CopyMultiInsertBuffer *buffer, + int64 *processed) { - MemoryContext oldcontext; - int i; - uint64 save_cur_lineno; CopyFromState cstate = miinfo->cstate; EState *estate = miinfo->estate; - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; - bool line_buf_valid = cstate->line_buf_valid; int nused = buffer->nused; ResultRelInfo *resultRelInfo = buffer->resultRelInfo; TupleTableSlot **slots = buffer->slots; + int i; - /* - * Print error context information correctly, if one of the operations - * below fails. - */ - cstate->line_buf_valid = false; - save_cur_lineno = cstate->cur_lineno; + if (resultRelInfo->ri_FdwRoutine) + { + int batch_size = resultRelInfo->ri_BatchSize; + int sent = 0; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + Assert(batch_size > 1); - for (i = 0; i < nused; i++) - { /* - * If there are any indexes, update them for all the inserted tuples, - * and run AFTER ROW INSERT triggers. + * We suppress error context information other than the relation name, + * if one of the operations below fails. */ - if (resultRelInfo->ri_NumIndices > 0) + Assert(!cstate->relname_only); + cstate->relname_only = true; + + while (sent < nused) { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, false, - NULL, NIL); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); + int size = (batch_size < nused - sent) ? batch_size : (nused - sent); + int inserted = size; + TupleTableSlot **rslots; + + /* Batch insert into foreign table */ + Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); + rslots = + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate, + resultRelInfo, + &slots[sent], + NULL, + &inserted); + + /* If any rows were inserted, run AFTER ROW INSERT triggers. */ + if (inserted > 0 && + resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + for (i = 0; i < inserted; i++) + { + TupleTableSlot *slot = rslots[i]; + + /* + * AFTER ROW Triggers might reference the tableoid column, + * so (re-)initialize tts_tableOid before evaluating them. + */ + slot->tts_tableOid = + RelationGetRelid(resultRelInfo->ri_RelationDesc); + + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + cstate->transition_capture); + } + } + + sent += size; + + /* Update the row counter and progress of the COPY command */ + if (inserted > 0) + { + *processed += inserted; + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + *processed); + } } + for (i = 0; i < nused; i++) + ExecClearTuple(slots[i]); + + /* reset relname_only */ + cstate->relname_only = false; + } + else + { + CommandId mycid = miinfo->mycid; + int ti_options = miinfo->ti_options; + bool line_buf_valid = cstate->line_buf_valid; + uint64 save_cur_lineno = cstate->cur_lineno; + MemoryContext oldcontext; + /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. + * Print error context information correctly, if one of the operations + * below fails. */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + cstate->line_buf_valid = false; + + /* + * table_multi_insert may leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + table_multi_insert(resultRelInfo->ri_RelationDesc, + slots, + nused, + mycid, + ti_options, + buffer->bistate); + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < nused; i++) { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, cstate->transition_capture); + /* + * If there are any indexes, update them for all the inserted + * tuples, and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[i]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + buffer->slots[i], estate, false, + false, NULL, NIL); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[i]; + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + cstate->transition_capture); + } + + ExecClearTuple(slots[i]); } - ExecClearTuple(slots[i]); + /* Update the row counter and progress of the COPY command */ + *processed += nused; + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, + *processed); + + /* reset cur_lineno and line_buf_valid to what they were */ + cstate->line_buf_valid = line_buf_valid; + cstate->cur_lineno = save_cur_lineno; } /* Mark that all slots are free */ buffer->nused = 0; - - /* reset cur_lineno and line_buf_valid to what they were */ - cstate->line_buf_valid = line_buf_valid; - cstate->cur_lineno = save_cur_lineno; } /* @@ -387,22 +473,25 @@ static inline void CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, CopyMultiInsertBuffer *buffer) { + ResultRelInfo *resultRelInfo = buffer->resultRelInfo; int i; /* Ensure buffer was flushed */ Assert(buffer->nused == 0); /* Remove back-link to ourself */ - buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + resultRelInfo->ri_CopyMultiInsertBuffer = NULL; - FreeBulkInsertState(buffer->bistate); + if (resultRelInfo->ri_FdwRoutine == NULL) + FreeBulkInsertState(buffer->bistate); /* Since we only create slots on demand, just drop the non-null ones. */ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); - table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + if (resultRelInfo->ri_FdwRoutine == NULL) + table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, + miinfo->ti_options); pfree(buffer); } @@ -418,7 +507,8 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, * 'curr_rri'. */ static inline void -CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) +CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri, + int64 *processed) { ListCell *lc; @@ -426,7 +516,7 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) { CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); - CopyMultiInsertBufferFlush(miinfo, buffer); + CopyMultiInsertBufferFlush(miinfo, buffer, processed); } miinfo->bufferedTuples = 0; @@ -679,6 +769,23 @@ CopyFrom(CopyFromState cstate) resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); + /* + * Also, if the named relation is a foreign table, determine if the FDW + * supports batch insert and determine the batch size (a FDW may support + * batching, but it may be disabled for the server/table). + * + * If the FDW does not support batching, we set the batch size to 1. + */ + if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize && + resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert) + resultRelInfo->ri_BatchSize = + resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo); + else + resultRelInfo->ri_BatchSize = 1; + + Assert(resultRelInfo->ri_BatchSize >= 1); + /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); @@ -725,6 +832,15 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } + else if (resultRelInfo->ri_FdwRoutine != NULL && + resultRelInfo->ri_BatchSize == 1) + { + /* + * Can't support multi-inserts to foreign tables if the FDW does not + * support batching. + */ + insertMethod = CIM_SINGLE; + } else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && resultRelInfo->ri_TrigDesc->trig_insert_new_table) { @@ -737,14 +853,12 @@ CopyFrom(CopyFromState cstate) */ insertMethod = CIM_SINGLE; } - else if (resultRelInfo->ri_FdwRoutine != NULL || - cstate->volatile_defexprs) + else if (cstate->volatile_defexprs) { /* - * Can't support multi-inserts to foreign tables or if there are any - * volatile default expressions in the table. Similarly to the - * trigger case above, such expressions may query the table we're - * inserting into. + * Can't support multi-inserts if there are any volatile default + * expressions in the table. Similarly to the trigger case above, + * such expressions may query the table we're inserting into. * * Note: It does not matter if any partitions have any volatile * default expressions as we use the defaults from the target of the @@ -910,12 +1024,14 @@ CopyFrom(CopyFromState cstate) /* * Disable multi-inserts when the partition has BEFORE/INSTEAD - * OF triggers, or if the partition is a foreign partition. + * OF triggers, or if the partition is a foreign partition + * that can't use batching. */ leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && !has_before_insert_row_trig && !has_instead_insert_row_trig && - resultRelInfo->ri_FdwRoutine == NULL; + (resultRelInfo->ri_FdwRoutine == NULL || + resultRelInfo->ri_BatchSize > 1); /* Set the multi-insert buffer to use for this partition. */ if (leafpart_use_multi_insert) @@ -931,7 +1047,9 @@ CopyFrom(CopyFromState cstate) * Flush pending inserts if this partition can't use * batching, so rows are visible to triggers etc. */ - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMultiInsertInfoFlush(&multiInsertInfo, + resultRelInfo, + &processed); } if (bistate != NULL) @@ -1067,7 +1185,17 @@ CopyFrom(CopyFromState cstate) * buffers out to their tables. */ if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); + CopyMultiInsertInfoFlush(&multiInsertInfo, + resultRelInfo, + &processed); + + /* + * We delay updating the row counter and the progress of + * the COPY command until after writing the tuples stored + * in the buffer out to the table. See + * CopyMultiInsertBufferFlush(). + */ + continue; /* next tuple please */ } else { @@ -1130,7 +1258,7 @@ CopyFrom(CopyFromState cstate) if (insertMethod != CIM_SINGLE) { if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); + CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed); } /* Done, clean up */ @@ -1349,6 +1477,7 @@ BeginCopyFrom(ParseState *pstate, cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; + cstate->relname_only = false; /* * Allocate buffers for the input pipeline. diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index e37c6032ae..21e8b89baa 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -81,6 +81,7 @@ typedef struct CopyFromStateData uint64 cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ /* * Working state