diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3350ca0..6df0f1e 100644 *** a/src/backend/commands/copy.c --- b/src/backend/commands/copy.c *************** typedef struct CopyStateData *** 93,105 **** FILE *copy_file; /* used if copy_dest == COPY_FILE */ StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for * dest == COPY_NEW_FE in COPY FROM */ - bool fe_copy; /* true for all FE copy dests */ bool fe_eof; /* true if detected end of copy data */ EolType eol_type; /* EOL type of input */ int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - uint64 processed; /* # of tuples processed */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ --- 93,103 ---- *************** typedef struct CopyStateData *** 119,131 **** bool *force_quote_flags; /* per-column CSV FQ flags */ bool *force_notnull_flags; /* per-column CSV FNN flags */ ! /* these are just for error messages, see copy_in_error_callback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ /* * Working state for COPY TO */ FmgrInfo *out_functions; /* lookup info for output functions */ --- 117,134 ---- bool *force_quote_flags; /* per-column CSV FQ flags */ bool *force_notnull_flags; /* per-column CSV FNN flags */ ! /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ int cur_lineno; /* line number for error messages */ const char *cur_attname; /* current att for error messages */ const char *cur_attval; /* current att value for error messages */ /* + * Working state for COPY TO/FROM + */ + MemoryContext copycontext; /* per-copy execution context */ + + /* * Working state for COPY TO */ FmgrInfo *out_functions; /* lookup info for output functions */ *************** typedef struct CopyStateData *** 167,181 **** char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ - } CopyStateData; ! typedef CopyStateData *CopyState; /* DestReceiver for COPY (SELECT) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ } DR_copy; --- 170,197 ---- char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ ! /* ! * The definition of input functions and default expressions are stored ! * in these variables. ! */ ! EState *estate; ! AttrNumber num_defaults; ! bool file_has_oids; ! FmgrInfo oid_in_function; ! Oid oid_typioparam; ! FmgrInfo *in_functions; /* array of input functions for each attrs */ ! Oid *typioparams; /* array of element types for in_functions */ ! int *defmap; /* array of default att numbers */ ! ExprState **defexprs; /* array of default att expressions */ ! } CopyStateData; /* DestReceiver for COPY (SELECT) TO */ typedef struct { DestReceiver pub; /* publicly-known function pointers */ CopyState cstate; /* CopyStateData for the command */ + uint64 processed; /* # of tuples processed */ } DR_copy; *************** static const char BinarySignature[11] = *** 248,258 **** /* non-export function prototypes */ ! static void DoCopyTo(CopyState cstate); ! static void CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); ! static void CopyFrom(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); --- 264,280 ---- /* non-export function prototypes */ ! static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query, ! const char *queryString, List *attnamelist, List *options); ! static void EndCopy(CopyState cstate); ! static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString, ! const char *filename, List *attnamelist, List *options); ! static void EndCopyTo(CopyState cstate); ! static uint64 DoCopyTo(CopyState cstate); ! static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); ! static uint64 CopyFrom(CopyState cstate); static bool CopyReadLine(CopyState cstate); static bool CopyReadLineText(CopyState cstate); static int CopyReadAttributesText(CopyState cstate); *************** DoCopy(const CopyStmt *stmt, const char *** 724,745 **** CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); ! List *attnamelist = stmt->attlist; List *force_quote = NIL; List *force_notnull = NIL; bool force_quote_all = false; bool format_specified = false; - AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); ListCell *option; TupleDesc tupDesc; int num_phys_attrs; ! uint64 processed; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); /* Extract options from the statement node tree */ ! foreach(option, stmt->options) { DefElem *defel = (DefElem *) lfirst(option); --- 746,870 ---- CopyState cstate; bool is_from = stmt->is_from; bool pipe = (stmt->filename == NULL); ! Relation rel; ! uint64 processed; ! ! /* Disallow file COPY except to superusers. */ ! if (!pipe && !superuser()) ! ereport(ERROR, ! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), ! errmsg("must be superuser to COPY to or from a file"), ! errhint("Anyone can COPY to stdout or from stdin. " ! "psql's \\copy command also works for anyone."))); ! ! if (stmt->relation) ! { ! TupleDesc tupDesc; ! AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); ! RangeTblEntry *rte; ! List *attnums; ! ListCell *cur; ! ! Assert(!stmt->query); ! ! /* Open and lock the relation, using the appropriate lock type. */ ! rel = heap_openrv(stmt->relation, ! (is_from ? RowExclusiveLock : AccessShareLock)); ! ! rte = makeNode(RangeTblEntry); ! rte->rtekind = RTE_RELATION; ! rte->relid = RelationGetRelid(rel); ! rte->requiredPerms = required_access; ! ! tupDesc = RelationGetDescr(rel); ! attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist); ! foreach (cur, attnums) ! { ! int attno = lfirst_int(cur) - ! FirstLowInvalidHeapAttributeNumber; ! ! if (is_from) ! rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); ! else ! rte->selectedCols = bms_add_member(rte->selectedCols, attno); ! } ! ExecCheckRTPerms(list_make1(rte), true); ! } ! else ! { ! Assert(stmt->query); ! ! rel = NULL; ! } ! ! if (is_from) ! { ! /* check read-only transaction */ ! if (XactReadOnly && rel->rd_backend != MyBackendId) ! PreventCommandIfReadOnly("COPY FROM"); ! ! cstate = BeginCopyFrom(rel, stmt->filename, ! stmt->attlist, stmt->options); ! processed = CopyFrom(cstate); /* copy from file to database */ ! EndCopyFrom(cstate); ! } ! else ! { ! cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename, ! stmt->attlist, stmt->options); ! processed = DoCopyTo(cstate); /* copy from database to file */ ! EndCopyTo(cstate); ! } ! ! /* ! * Close the relation. If reading, we can release the AccessShareLock we got; ! * if writing, we should hold the lock until end of transaction to ensure that ! * updates will be committed before lock is released. ! */ ! if (rel != NULL) ! heap_close(rel, (is_from ? NoLock : AccessShareLock)); ! ! return processed; ! } ! ! /* ! * Common setup routines used by BeginCopyFrom and BeginCopyTo. ! */ ! static CopyState ! BeginCopy(bool is_from, ! Relation rel, ! Node *raw_query, ! const char *queryString, ! List *attnamelist, ! List *options) ! { ! CopyState cstate; List *force_quote = NIL; List *force_notnull = NIL; bool force_quote_all = false; bool format_specified = false; ListCell *option; TupleDesc tupDesc; int num_phys_attrs; ! MemoryContext oldcontext; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); + /* + * We allocate everything used by a cstate in a new memory context. + * This would avoid memory leaks repeated uses of COPY in a query. + */ + cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, + "COPY", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + /* Extract options from the statement node tree */ ! foreach(option, options) { DefElem *defel = (DefElem *) lfirst(option); *************** DoCopy(const CopyStmt *stmt, const char *** 980,1030 **** (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); ! /* Disallow file COPY except to superusers. */ ! if (!pipe && !superuser()) ! ereport(ERROR, ! (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), ! errmsg("must be superuser to COPY to or from a file"), ! errhint("Anyone can COPY to stdout or from stdin. " ! "psql's \\copy command also works for anyone."))); ! ! if (stmt->relation) { ! RangeTblEntry *rte; ! List *attnums; ! ListCell *cur; ! ! Assert(!stmt->query); ! cstate->queryDesc = NULL; ! /* Open and lock the relation, using the appropriate lock type. */ ! cstate->rel = heap_openrv(stmt->relation, ! (is_from ? RowExclusiveLock : AccessShareLock)); tupDesc = RelationGetDescr(cstate->rel); - /* Check relation permissions. */ - rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = RelationGetRelid(cstate->rel); - rte->requiredPerms = required_access; - - attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); - foreach (cur, attnums) - { - int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; - - if (is_from) - rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); - else - rte->selectedCols = bms_add_member(rte->selectedCols, attno); - } - ExecCheckRTPerms(list_make1(rte), true); - - /* check read-only transaction */ - if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId) - PreventCommandIfReadOnly("COPY FROM"); - /* Don't allow COPY w/ OIDs to or from a table without them */ if (cstate->oids && !cstate->rel->rd_rel->relhasoids) ereport(ERROR, --- 1105,1118 ---- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); ! if (rel) { ! Assert(!raw_query); ! cstate->rel = rel; tupDesc = RelationGetDescr(cstate->rel); /* Don't allow COPY w/ OIDs to or from a table without them */ if (cstate->oids && !cstate->rel->rd_rel->relhasoids) ereport(ERROR, *************** DoCopy(const CopyStmt *stmt, const char *** 1058,1064 **** * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ ! rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query), queryString, NULL, 0); /* We don't expect more or less than one result query */ --- 1146,1152 ---- * function and is executed repeatedly. (See also the same hack in * DECLARE CURSOR and PREPARE.) XXX FIXME someday. */ ! rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query), queryString, NULL, 0); /* We don't expect more or less than one result query */ *************** DoCopy(const CopyStmt *stmt, const char *** 1160,1173 **** } } - /* Set up variables to avoid per-attribute overhead. */ - initStringInfo(&cstate->attribute_buf); - initStringInfo(&cstate->line_buf); - cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; - cstate->processed = 0; - /* * Set up encoding conversion info. Even if the client and server * encodings are the same, we must apply pg_client_to_server() to validate --- 1248,1253 ---- *************** DoCopy(const CopyStmt *stmt, const char *** 1181,1264 **** cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ - cstate->filename = stmt->filename; ! if (is_from) ! CopyFrom(cstate); /* copy from file to database */ ! else ! DoCopyTo(cstate); /* copy from database to file */ ! /* ! * Close the relation or query. If reading, we can release the ! * AccessShareLock we got; if writing, we should hold the lock until end ! * of transaction to ensure that updates will be committed before lock is ! * released. ! */ ! if (cstate->rel) ! heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock)); ! else ! { ! /* Close down the query and free resources. */ ! ExecutorEnd(cstate->queryDesc); ! FreeQueryDesc(cstate->queryDesc); ! PopActiveSnapshot(); ! } ! /* Clean up storage (probably not really necessary) */ ! processed = cstate->processed; ! pfree(cstate->attribute_buf.data); ! pfree(cstate->line_buf.data); ! pfree(cstate->raw_buf); pfree(cstate); - - return processed; } - /* ! * This intermediate routine exists mainly to localize the effects of setjmp ! * so we don't need to plaster a lot of variables with "volatile". */ ! static void ! DoCopyTo(CopyState cstate) { ! bool pipe = (cstate->filename == NULL); ! if (cstate->rel) { ! if (cstate->rel->rd_rel->relkind != RELKIND_RELATION) ! { ! if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from view \"%s\"", ! RelationGetRelationName(cstate->rel)), ! errhint("Try the COPY (SELECT ...) TO variant."))); ! else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from foreign table \"%s\"", ! RelationGetRelationName(cstate->rel)), ! errhint("Try the COPY (SELECT ...) TO variant."))); ! else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from sequence \"%s\"", ! RelationGetRelationName(cstate->rel)))); ! else ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from non-table relation \"%s\"", ! RelationGetRelationName(cstate->rel)))); ! } } if (pipe) { ! if (whereToSendOutput == DestRemote) ! cstate->fe_copy = true; ! else cstate->copy_file = stdout; } else --- 1261,1335 ---- cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding); cstate->copy_dest = COPY_FILE; /* default */ ! MemoryContextSwitchTo(oldcontext); ! return cstate; ! } ! /* ! * Release resources allocated in a cstate. ! */ ! static void ! EndCopy(CopyState cstate) ! { ! if (cstate->filename != NULL && FreeFile(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); ! MemoryContextDelete(cstate->copycontext); pfree(cstate); } /* ! * Setup CopyState to read tuples from a table or a query for COPY TO. */ ! static CopyState ! BeginCopyTo(Relation rel, ! Node *query, ! const char *queryString, ! const char *filename, ! List *attnamelist, ! List *options) { ! CopyState cstate; ! bool pipe = (filename == NULL); ! MemoryContext oldcontext; ! if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { ! if (rel->rd_rel->relkind == RELKIND_VIEW) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from view \"%s\"", ! RelationGetRelationName(rel)), ! errhint("Try the COPY (SELECT ...) TO variant."))); ! else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from foreign table \"%s\"", ! RelationGetRelationName(rel)), ! errhint("Try the COPY (SELECT ...) TO variant."))); ! else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from sequence \"%s\"", ! RelationGetRelationName(rel)))); ! else ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("cannot copy from non-table relation \"%s\"", ! RelationGetRelationName(rel)))); } + cstate = BeginCopy(false, rel, query, queryString, attnamelist, options); + oldcontext = MemoryContextSwitchTo(cstate->copycontext); + if (pipe) { ! if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; } else *************** DoCopyTo(CopyState cstate) *** 1270,1280 **** * Prevent write to relative path ... too easy to shoot oneself in the * foot by overwriting a database file ... */ ! if (!is_absolute_path(cstate->filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); oumask = umask(S_IWGRP | S_IWOTH); cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); umask(oumask); --- 1341,1352 ---- * Prevent write to relative path ... too easy to shoot oneself in the * foot by overwriting a database file ... */ ! if (!is_absolute_path(filename)) ereport(ERROR, (errcode(ERRCODE_INVALID_NAME), errmsg("relative path not allowed for COPY to file"))); + cstate->filename = pstrdup(filename); oumask = umask(S_IWGRP | S_IWOTH); cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); umask(oumask); *************** DoCopyTo(CopyState cstate) *** 1292,1305 **** errmsg("\"%s\" is a directory", cstate->filename))); } PG_TRY(); { ! if (cstate->fe_copy) SendCopyBegin(cstate); ! CopyTo(cstate); ! if (cstate->fe_copy) SendCopyEnd(cstate); } PG_CATCH(); --- 1364,1393 ---- errmsg("\"%s\" is a directory", cstate->filename))); } + MemoryContextSwitchTo(oldcontext); + + return cstate; + } + + /* + * This intermediate routine exists mainly to localize the effects of setjmp + * so we don't need to plaster a lot of variables with "volatile". + */ + static uint64 + DoCopyTo(CopyState cstate) + { + bool pipe = (cstate->filename == NULL); + bool fe_copy = (pipe && whereToSendOutput == DestRemote); + uint64 processed; + PG_TRY(); { ! if (fe_copy) SendCopyBegin(cstate); ! processed = CopyTo(cstate); ! if (fe_copy) SendCopyEnd(cstate); } PG_CATCH(); *************** DoCopyTo(CopyState cstate) *** 1314,1339 **** } PG_END_TRY(); ! if (!pipe) { ! if (FreeFile(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); } } /* * Copy from relation or query TO file. */ ! static void CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; Form_pg_attribute *attr; ListCell *cur; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); --- 1402,1439 ---- } PG_END_TRY(); ! return processed; ! } ! ! /* ! * Clean up storage and release resources for COPY TO. ! */ ! static void ! EndCopyTo(CopyState cstate) ! { ! if (cstate->queryDesc != NULL) { ! /* Close down the query and free resources. */ ! ExecutorEnd(cstate->queryDesc); ! FreeQueryDesc(cstate->queryDesc); ! PopActiveSnapshot(); } + + /* Clean up storage */ + EndCopy(cstate); } /* * Copy from relation or query TO file. */ ! static uint64 CopyTo(CopyState cstate) { TupleDesc tupDesc; int num_phys_attrs; Form_pg_attribute *attr; ListCell *cur; + uint64 processed; if (cstate->rel) tupDesc = RelationGetDescr(cstate->rel); *************** CopyTo(CopyState cstate) *** 1439,1444 **** --- 1539,1545 ---- scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); + processed = 0; while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL) { CHECK_FOR_INTERRUPTS(); *************** CopyTo(CopyState cstate) *** 1448,1461 **** --- 1549,1567 ---- /* Format and send the data */ CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls); + processed++; } heap_endscan(scandesc); + + pfree(values); + pfree(nulls); } else { /* run the plan --- the dest receiver will send tuples */ ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } if (cstate->binary) *************** CopyTo(CopyState cstate) *** 1467,1472 **** --- 1573,1580 ---- } MemoryContextDelete(cstate->rowcontext); + + return processed; } /* *************** CopyOneRowTo(CopyState cstate, Oid tuple *** 1558,1573 **** CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); - - cstate->processed++; } /* * error context callback for COPY FROM */ ! static void ! copy_in_error_callback(void *arg) { CopyState cstate = (CopyState) arg; --- 1666,1681 ---- CopySendEndOfRow(cstate); MemoryContextSwitchTo(oldcontext); } /* * error context callback for COPY FROM + * + * The argument for the error context must be CopyState. */ ! void ! CopyFromErrorCallback(void *arg) { CopyState cstate = (CopyState) arg; *************** copy_in_error_callback(void *arg) *** 1575,1585 **** { /* can't usefully display the data */ if (cstate->cur_attname) ! errcontext("COPY %s, line %d, column %s", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname); else ! errcontext("COPY %s, line %d", cstate->cur_relname, cstate->cur_lineno); } else --- 1683,1693 ---- { /* can't usefully display the data */ if (cstate->cur_attname) ! errcontext("relation %s, line %d, column %s", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname); else ! errcontext("relation %s, line %d", cstate->cur_relname, cstate->cur_lineno); } else *************** copy_in_error_callback(void *arg) *** 1590,1596 **** char *attval; attval = limit_printout_length(cstate->cur_attval); ! errcontext("COPY %s, line %d, column %s: \"%s\"", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname, attval); pfree(attval); --- 1698,1704 ---- char *attval; attval = limit_printout_length(cstate->cur_attval); ! errcontext("relation %s, line %d, column %s: \"%s\"", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname, attval); pfree(attval); *************** copy_in_error_callback(void *arg) *** 1598,1604 **** else if (cstate->cur_attname) { /* error is relevant to a particular column, value is NULL */ ! errcontext("COPY %s, line %d, column %s: null input", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname); } --- 1706,1712 ---- else if (cstate->cur_attname) { /* error is relevant to a particular column, value is NULL */ ! errcontext("relation %s, line %d, column %s: null input", cstate->cur_relname, cstate->cur_lineno, cstate->cur_attname); } *************** copy_in_error_callback(void *arg) *** 1610,1616 **** char *lineval; lineval = limit_printout_length(cstate->line_buf.data); ! errcontext("COPY %s, line %d: \"%s\"", cstate->cur_relname, cstate->cur_lineno, lineval); pfree(lineval); } --- 1718,1724 ---- char *lineval; lineval = limit_printout_length(cstate->line_buf.data); ! errcontext("relation %s, line %d: \"%s\"", cstate->cur_relname, cstate->cur_lineno, lineval); pfree(lineval); } *************** copy_in_error_callback(void *arg) *** 1624,1630 **** * regurgitate it without conversion. So we have to punt and * just report the line number. */ ! errcontext("COPY %s, line %d", cstate->cur_relname, cstate->cur_lineno); } } --- 1732,1738 ---- * regurgitate it without conversion. So we have to punt and * just report the line number. */ ! errcontext("relation %s, line %d", cstate->cur_relname, cstate->cur_lineno); } } *************** limit_printout_length(const char *str) *** 1669,1709 **** /* * Copy FROM file to relation. */ ! static void CopyFrom(CopyState cstate) { - bool pipe = (cstate->filename == NULL); HeapTuple tuple; TupleDesc tupDesc; - Form_pg_attribute *attr; - AttrNumber num_phys_attrs, - attr_count, - num_defaults; - FmgrInfo *in_functions; - FmgrInfo oid_in_function; - Oid *typioparams; - Oid oid_typioparam; - int attnum; - int i; - Oid in_func_oid; Datum *values; bool *nulls; - int nfields; - char **field_strings; - bool done = false; - bool isnull; ResultRelInfo *resultRelInfo; ! EState *estate = CreateExecutorState(); /* for ExecConstraints() */ TupleTableSlot *slot; - bool file_has_oids; - int *defmap; - ExprState **defexprs; /* array of default att expressions */ - ExprContext *econtext; /* used for ExecEvalExpr for default atts */ MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; Assert(cstate->rel); --- 1777,1798 ---- /* * Copy FROM file to relation. */ ! static uint64 CopyFrom(CopyState cstate) { HeapTuple tuple; TupleDesc tupDesc; Datum *values; bool *nulls; ResultRelInfo *resultRelInfo; ! EState *estate = cstate->estate; /* for ExecConstraints() */ TupleTableSlot *slot; MemoryContext oldcontext = CurrentMemoryContext; ErrorContextCallback errcontext; CommandId mycid = GetCurrentCommandId(true); int hi_options = 0; /* start with default heap_insert options */ BulkInsertState bistate; + uint64 processed = 0; Assert(cstate->rel); *************** CopyFrom(CopyState cstate) *** 1731,1736 **** --- 1820,1827 ---- RelationGetRelationName(cstate->rel)))); } + tupDesc = RelationGetDescr(cstate->rel); + /*---------- * Check to see if we can avoid writing WAL * *************** CopyFrom(CopyState cstate) *** 1766,1803 **** hi_options |= HEAP_INSERT_SKIP_WAL; } - if (pipe) - { - if (whereToSendOutput == DestRemote) - ReceiveCopyBegin(cstate); - else - cstate->copy_file = stdin; - } - else - { - struct stat st; - - cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); - - if (cstate->copy_file == NULL) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\" for reading: %m", - cstate->filename))); - - fstat(fileno(cstate->copy_file), &st); - if (S_ISDIR(st.st_mode)) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is a directory", cstate->filename))); - } - - tupDesc = RelationGetDescr(cstate->rel); - attr = tupDesc->attrs; - num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); - num_defaults = 0; - /* * We need a ResultRelInfo so we can use the regular executor's * index-entry-making machinery. (There used to be a huge amount of code --- 1857,1862 ---- *************** CopyFrom(CopyState cstate) *** 1826,1832 **** slot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(slot, tupDesc); ! econtext = GetPerTupleExprContext(estate); /* * Pick up the required catalog information for each attribute in the --- 1885,2071 ---- slot = ExecInitExtraTupleSlot(estate); ExecSetSlotDescriptor(slot, tupDesc); ! /* Prepare to catch AFTER triggers. */ ! AfterTriggerBeginQuery(); ! ! /* ! * Check BEFORE STATEMENT insertion triggers. It's debateable whether we ! * should do this for COPY, since it's not really an "INSERT" statement as ! * such. However, executing these triggers maintains consistency with the ! * EACH ROW triggers that we already fire on COPY. ! */ ! ExecBSInsertTriggers(estate, resultRelInfo); ! ! values = (Datum *) palloc(tupDesc->natts * sizeof(Datum)); ! nulls = (bool *) palloc(tupDesc->natts * sizeof(bool)); ! ! bistate = GetBulkInsertState(); ! ! /* Set up callback to identify error line number */ ! errcontext.callback = CopyFromErrorCallback; ! errcontext.arg = (void *) cstate; ! errcontext.previous = error_context_stack; ! error_context_stack = &errcontext; ! ! for (;;) ! { ! bool skip_tuple; ! Oid loaded_oid = InvalidOid; ! ! CHECK_FOR_INTERRUPTS(); ! ! /* Reset the per-tuple exprcontext */ ! ResetPerTupleExprContext(estate); ! ! /* Switch into its memory context */ ! MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); ! ! if (!NextCopyFrom(cstate, values, nulls, &loaded_oid)) ! break; ! ! /* And now we can form the input tuple. */ ! tuple = heap_form_tuple(tupDesc, values, nulls); ! ! if (loaded_oid != InvalidOid) ! HeapTupleSetOid(tuple, loaded_oid); ! ! /* Triggers and stuff need to be invoked in query context. */ ! MemoryContextSwitchTo(oldcontext); ! ! skip_tuple = false; ! ! /* BEFORE ROW INSERT Triggers */ ! if (resultRelInfo->ri_TrigDesc && ! resultRelInfo->ri_TrigDesc->trig_insert_before_row) ! { ! HeapTuple newtuple; ! ! newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple); ! ! if (newtuple == NULL) /* "do nothing" */ ! skip_tuple = true; ! else if (newtuple != tuple) /* modified by Trigger(s) */ ! { ! heap_freetuple(tuple); ! tuple = newtuple; ! } ! } ! ! if (!skip_tuple) ! { ! List *recheckIndexes = NIL; ! ! /* Place tuple in tuple slot */ ! ExecStoreTuple(tuple, slot, InvalidBuffer, false); ! ! /* Check the constraints of the tuple */ ! if (cstate->rel->rd_att->constr) ! ExecConstraints(resultRelInfo, slot, estate); ! ! /* OK, store the tuple and create index entries for it */ ! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); ! ! if (resultRelInfo->ri_NumIndices > 0) ! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), ! estate); ! ! /* AFTER ROW INSERT Triggers */ ! ExecARInsertTriggers(estate, resultRelInfo, tuple, ! recheckIndexes); ! ! list_free(recheckIndexes); ! ! /* ! * We count only tuples not suppressed by a BEFORE INSERT trigger; ! * this is the same definition used by execMain.c for counting ! * tuples inserted by an INSERT command. ! */ ! processed++; ! } ! } ! ! /* Done, clean up */ ! error_context_stack = errcontext.previous; ! ! FreeBulkInsertState(bistate); ! ! MemoryContextSwitchTo(oldcontext); ! ! /* Execute AFTER STATEMENT insertion triggers */ ! ExecASInsertTriggers(estate, resultRelInfo); ! ! /* Handle queued AFTER triggers */ ! AfterTriggerEndQuery(estate); ! ! pfree(values); ! pfree(nulls); ! ! ExecResetTupleTable(estate->es_tupleTable, false); ! ! ExecCloseIndices(resultRelInfo); ! ! /* ! * If we skipped writing WAL, then we need to sync the heap (but not ! * indexes since those use WAL anyway) ! */ ! if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel); ! ! return processed; ! } ! ! /* ! * CopyGetAttnums - build an integer list of attnums to be copied ! * ! * The input attnamelist is either the user-specified column list, ! * or NIL if there was none (in which case we want all the non-dropped ! * columns). ! * ! * rel can be NULL ... it's only used for error reports. ! */ ! CopyState ! BeginCopyFrom(Relation rel, ! const char *filename, ! List *attnamelist, ! List *options) ! { ! CopyState cstate; ! bool pipe = (filename == NULL); ! TupleDesc tupDesc; ! Form_pg_attribute *attr; ! AttrNumber num_phys_attrs, ! num_defaults; ! FmgrInfo *in_functions; ! Oid *typioparams; ! int attnum; ! Oid in_func_oid; ! EState *estate = CreateExecutorState(); /* for ExecPrepareExpr() */ ! int *defmap; ! ExprState **defexprs; ! MemoryContext oldcontext; ! ! cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options); ! oldcontext = MemoryContextSwitchTo(cstate->copycontext); ! ! /* Initialize state variables */ ! cstate->fe_eof = false; ! cstate->eol_type = EOL_UNKNOWN; ! cstate->cur_relname = RelationGetRelationName(cstate->rel); ! cstate->cur_lineno = 0; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! ! /* Set up variables to avoid per-attribute overhead. */ ! initStringInfo(&cstate->attribute_buf); ! initStringInfo(&cstate->line_buf); ! cstate->line_buf_converted = false; ! cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); ! cstate->raw_buf_index = cstate->raw_buf_len = 0; ! ! tupDesc = RelationGetDescr(cstate->rel); ! attr = tupDesc->attrs; ! num_phys_attrs = tupDesc->natts; ! num_defaults = 0; /* * Pick up the required catalog information for each attribute in the *************** CopyFrom(CopyState cstate) *** 1871,1889 **** } } ! /* Prepare to catch AFTER triggers. */ ! AfterTriggerBeginQuery(); ! /* ! * Check BEFORE STATEMENT insertion triggers. It's debateable whether we ! * should do this for COPY, since it's not really an "INSERT" statement as ! * such. However, executing these triggers maintains consistency with the ! * EACH ROW triggers that we already fire on COPY. ! */ ! ExecBSInsertTriggers(estate, resultRelInfo); if (!cstate->binary) ! file_has_oids = cstate->oids; /* must rely on user to tell us... */ else { /* Read and verify binary header */ --- 2110,2155 ---- } } ! /* We keep those variables in cstate. */ ! cstate->estate = estate; ! cstate->in_functions = in_functions; ! cstate->typioparams = typioparams; ! cstate->defmap = defmap; ! cstate->defexprs = defexprs; ! cstate->num_defaults = num_defaults; ! if (pipe) ! { ! if (whereToSendOutput == DestRemote) ! ReceiveCopyBegin(cstate); ! else ! cstate->copy_file = stdin; ! } ! else ! { ! struct stat st; ! ! cstate->filename = pstrdup(filename); ! cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); ! ! if (cstate->copy_file == NULL) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not open file \"%s\" for reading: %m", ! cstate->filename))); ! ! fstat(fileno(cstate->copy_file), &st); ! if (S_ISDIR(st.st_mode)) ! ereport(ERROR, ! (errcode(ERRCODE_WRONG_OBJECT_TYPE), ! errmsg("\"%s\" is a directory", cstate->filename))); ! } if (!cstate->binary) ! { ! /* must rely on user to tell us... */ ! cstate->file_has_oids = cstate->oids; ! } else { /* Read and verify binary header */ *************** CopyFrom(CopyState cstate) *** 1901,1907 **** ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); ! file_has_oids = (tmp & (1 << 16)) != 0; tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, --- 2167,2173 ---- ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); ! cstate->file_has_oids = (tmp & (1 << 16)) != 0; tmp &= ~(1 << 16); if ((tmp >> 16) != 0) ereport(ERROR, *************** CopyFrom(CopyState cstate) *** 1923,1984 **** } } ! if (file_has_oids && cstate->binary) { getTypeBinaryInputInfo(OIDOID, ! &in_func_oid, &oid_typioparam); ! fmgr_info(in_func_oid, &oid_in_function); } - values = (Datum *) palloc(num_phys_attrs * sizeof(Datum)); - nulls = (bool *) palloc(num_phys_attrs * sizeof(bool)); - /* create workspace for CopyReadAttributes results */ ! nfields = file_has_oids ? (attr_count + 1) : attr_count; ! if (! cstate->binary) { cstate->max_fields = nfields; cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); } ! /* Initialize state variables */ ! cstate->fe_eof = false; ! cstate->eol_type = EOL_UNKNOWN; ! cstate->cur_relname = RelationGetRelationName(cstate->rel); ! cstate->cur_lineno = 0; ! cstate->cur_attname = NULL; ! cstate->cur_attval = NULL; ! bistate = GetBulkInsertState(); ! /* Set up callback to identify error line number */ ! errcontext.callback = copy_in_error_callback; ! errcontext.arg = (void *) cstate; ! errcontext.previous = error_context_stack; ! error_context_stack = &errcontext; /* on input just throw the header line away */ ! if (cstate->header_line) { cstate->cur_lineno++; ! done = CopyReadLine(cstate); } ! while (!done) ! { ! bool skip_tuple; ! Oid loaded_oid = InvalidOid; ! ! CHECK_FOR_INTERRUPTS(); cstate->cur_lineno++; - /* Reset the per-tuple exprcontext */ - ResetPerTupleExprContext(estate); - - /* Switch into its memory context */ - MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); --- 2189,2259 ---- } } ! if (cstate->file_has_oids && cstate->binary) { getTypeBinaryInputInfo(OIDOID, ! &in_func_oid, &cstate->oid_typioparam); ! fmgr_info(in_func_oid, &cstate->oid_in_function); } /* create workspace for CopyReadAttributes results */ ! if (!cstate->binary) { + AttrNumber attr_count = list_length(cstate->attnumlist); + int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count; + cstate->max_fields = nfields; cstate->raw_fields = (char **) palloc(nfields * sizeof(char *)); } ! MemoryContextSwitchTo(oldcontext); ! return cstate; ! } ! /* ! * Read next tuple from file for COPY FROM. Return false if no more tuples. ! * ! * values and nulls arrays must be the same length as columns of the ! * relation passed to BeginCopyFrom. Oid of the tuple is returned with ! * tupleOid separately. ! */ ! bool ! NextCopyFrom(CopyState cstate, Datum *values, bool *nulls, Oid *tupleOid) ! { ! TupleDesc tupDesc; ! Form_pg_attribute *attr; ! AttrNumber num_phys_attrs, ! attr_count, ! num_defaults = cstate->num_defaults; ! FmgrInfo *in_functions = cstate->in_functions; ! Oid *typioparams = cstate->typioparams; ! int i; ! int nfields; ! char **field_strings; ! bool isnull; ! bool file_has_oids = cstate->file_has_oids; ! int *defmap = cstate->defmap; ! ExprState **defexprs = cstate->defexprs; ! ExprContext *econtext; /* used for ExecEvalExpr for default atts */ /* on input just throw the header line away */ ! if (cstate->cur_lineno == 0 && cstate->header_line) { cstate->cur_lineno++; ! if (CopyReadLine(cstate)) ! return false; /* done */ } ! tupDesc = RelationGetDescr(cstate->rel); ! attr = tupDesc->attrs; ! num_phys_attrs = tupDesc->natts; ! attr_count = list_length(cstate->attnumlist); ! nfields = file_has_oids ? (attr_count + 1) : attr_count; + /* XXX: Indentation is not adjusted to keep the patch small. */ cstate->cur_lineno++; /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); *************** CopyFrom(CopyState cstate) *** 1989,1994 **** --- 2264,2270 ---- int fldct; int fieldno; char *string; + bool done; /* Actually read the line into memory here */ done = CopyReadLine(cstate); *************** CopyFrom(CopyState cstate) *** 1999,2005 **** * EOF, ie, process the line and then exit loop on next iteration. */ if (done && cstate->line_buf.len == 0) ! break; /* Parse the line into de-escaped field values */ if (cstate->csv_mode) --- 2275,2281 ---- * EOF, ie, process the line and then exit loop on next iteration. */ if (done && cstate->line_buf.len == 0) ! return false; /* Parse the line into de-escaped field values */ if (cstate->csv_mode) *************** CopyFrom(CopyState cstate) *** 2029,2041 **** ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); ! else { cstate->cur_attname = "oid"; cstate->cur_attval = string; ! loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin, ! CStringGetDatum(string))); ! if (loaded_oid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); --- 2305,2317 ---- ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("null OID in COPY data"))); ! else if (cstate->oids && tupleOid != NULL) { cstate->cur_attname = "oid"; cstate->cur_attval = string; ! *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin, ! CStringGetDatum(string))); ! if (*tupleOid == InvalidOid) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); *************** CopyFrom(CopyState cstate) *** 2087,2094 **** if (!CopyGetInt16(cstate, &fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ ! done = true; ! break; } if (fld_count == -1) --- 2363,2369 ---- if (!CopyGetInt16(cstate, &fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ ! return false; } if (fld_count == -1) *************** CopyFrom(CopyState cstate) *** 2112,2119 **** ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); ! done = true; ! break; } if (fld_count != attr_count) --- 2387,2393 ---- ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); ! return false; } if (fld_count != attr_count) *************** CopyFrom(CopyState cstate) *** 2124,2135 **** if (file_has_oids) { cstate->cur_attname = "oid"; loaded_oid = DatumGetObjectId(CopyReadBinaryAttribute(cstate, 0, ! &oid_in_function, ! oid_typioparam, -1, &isnull)); if (isnull || loaded_oid == InvalidOid) --- 2398,2411 ---- if (file_has_oids) { + Oid loaded_oid; + cstate->cur_attname = "oid"; loaded_oid = DatumGetObjectId(CopyReadBinaryAttribute(cstate, 0, ! &cstate->oid_in_function, ! cstate->oid_typioparam, -1, &isnull)); if (isnull || loaded_oid == InvalidOid) *************** CopyFrom(CopyState cstate) *** 2137,2142 **** --- 2413,2420 ---- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid OID in COPY data"))); cstate->cur_attname = NULL; + if (cstate->oids && tupleOid != NULL) + *tupleOid = loaded_oid; } i = 0; *************** CopyFrom(CopyState cstate) *** 2162,2278 **** * provided by the input data. Anything not processed here or above * will remain NULL. */ for (i = 0; i < num_defaults; i++) { values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, &nulls[defmap[i]], NULL); } ! /* And now we can form the input tuple. */ ! tuple = heap_form_tuple(tupDesc, values, nulls); ! ! if (cstate->oids && file_has_oids) ! HeapTupleSetOid(tuple, loaded_oid); ! ! /* Triggers and stuff need to be invoked in query context. */ ! MemoryContextSwitchTo(oldcontext); ! ! skip_tuple = false; ! ! /* BEFORE ROW INSERT Triggers */ ! if (resultRelInfo->ri_TrigDesc && ! resultRelInfo->ri_TrigDesc->trig_insert_before_row) ! { ! HeapTuple newtuple; ! ! newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple); ! ! if (newtuple == NULL) /* "do nothing" */ ! skip_tuple = true; ! else if (newtuple != tuple) /* modified by Trigger(s) */ ! { ! heap_freetuple(tuple); ! tuple = newtuple; ! } ! } ! ! if (!skip_tuple) ! { ! List *recheckIndexes = NIL; ! ! /* Place tuple in tuple slot */ ! ExecStoreTuple(tuple, slot, InvalidBuffer, false); ! ! /* Check the constraints of the tuple */ ! if (cstate->rel->rd_att->constr) ! ExecConstraints(resultRelInfo, slot, estate); ! ! /* OK, store the tuple and create index entries for it */ ! heap_insert(cstate->rel, tuple, mycid, hi_options, bistate); ! ! if (resultRelInfo->ri_NumIndices > 0) ! recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), ! estate); ! ! /* AFTER ROW INSERT Triggers */ ! ExecARInsertTriggers(estate, resultRelInfo, tuple, ! recheckIndexes); ! ! list_free(recheckIndexes); ! ! /* ! * We count only tuples not suppressed by a BEFORE INSERT trigger; ! * this is the same definition used by execMain.c for counting ! * tuples inserted by an INSERT command. ! */ ! cstate->processed++; ! } ! } ! ! /* Done, clean up */ ! error_context_stack = errcontext.previous; ! ! FreeBulkInsertState(bistate); ! ! MemoryContextSwitchTo(oldcontext); ! ! /* Execute AFTER STATEMENT insertion triggers */ ! ExecASInsertTriggers(estate, resultRelInfo); ! ! /* Handle queued AFTER triggers */ ! AfterTriggerEndQuery(estate); ! ! pfree(values); ! pfree(nulls); ! if (! cstate->binary) ! pfree(cstate->raw_fields); ! ! pfree(in_functions); ! pfree(typioparams); ! pfree(defmap); ! pfree(defexprs); ! ! ExecResetTupleTable(estate->es_tupleTable, false); ! ! ExecCloseIndices(resultRelInfo); ! ! FreeExecutorState(estate); ! if (!pipe) ! { ! if (FreeFile(cstate->copy_file)) ! ereport(ERROR, ! (errcode_for_file_access(), ! errmsg("could not close file \"%s\": %m", ! cstate->filename))); ! } ! /* ! * If we skipped writing WAL, then we need to sync the heap (but not ! * indexes since those use WAL anyway) ! */ ! if (hi_options & HEAP_INSERT_SKIP_WAL) ! heap_sync(cstate->rel); } --- 2440,2466 ---- * provided by the input data. Anything not processed here or above * will remain NULL. */ + econtext = GetPerTupleExprContext(cstate->estate); for (i = 0; i < num_defaults; i++) { values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, &nulls[defmap[i]], NULL); } + /* XXX: End of only-indentation changes. */ ! return true; ! } ! /* ! * Clean up storage and release resources for COPY FROM. ! */ ! void ! EndCopyFrom(CopyState cstate) ! { ! FreeExecutorState(cstate->estate); ! /* Clean up storage */ ! EndCopy(cstate); } *************** copy_dest_receive(TupleTableSlot *slot, *** 3537,3542 **** --- 3725,3731 ---- /* And send the data */ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); + myState->processed++; } /* diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 9e2bbe8..8340e3d 100644 *** a/src/include/commands/copy.h --- b/src/include/commands/copy.h *************** *** 14,25 **** --- 14,35 ---- #ifndef COPY_H #define COPY_H + #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" + typedef struct CopyStateData *CopyState; + extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString); + extern CopyState BeginCopyFrom(Relation rel, const char *filename, + List *attnamelist, List *options); + extern void EndCopyFrom(CopyState cstate); + extern bool NextCopyFrom(CopyState cstate, + Datum *values, bool *nulls, Oid *tupleOid); + extern void CopyFromErrorCallback(void *arg); + extern DestReceiver *CreateCopyDestReceiver(void); #endif /* COPY_H */