diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 13a8b68d95..bf21abd8e0 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -364,6 +364,17 @@ COPY { table_name [ (
+
+ IGNORE_CONFLICTS
+
+
+ specifies ignore to error up to specified amount .
+ Instead write the error record to failed record file and
+ precede to the next record
+
+
+
+
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9bc67ce60f..ffa6aecbd5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -43,6 +43,7 @@
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -118,6 +119,7 @@ typedef struct CopyStateData
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ FILE *failed_rec_file;
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
@@ -147,6 +149,9 @@ typedef struct CopyStateData
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
+ char *failed_rec_filename;
+ bool ignore_conflict;
+ int error_limit; /* total # of error to log */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
@@ -766,6 +771,21 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
+/*
+ * LogCopyError log error in to error log file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+ appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+}
/*
* DoCopy executes the SQL COPY statement
@@ -1226,6 +1246,19 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "ignore_conflicts") == 0)
+ {
+ List *conflictOption;
+ if (cstate->ignore_conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->ignore_conflict = true;
+ conflictOption = (List *) defel->arg;
+ cstate->error_limit = intVal(list_nth(conflictOption, 0));
+ cstate->failed_rec_filename = strVal(list_nth(conflictOption, 1));
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1749,6 +1782,11 @@ EndCopy(CopyState cstate)
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
+ if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->failed_rec_filename)));
}
MemoryContextDelete(cstate->copycontext);
@@ -2461,6 +2499,8 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_FROZEN;
}
+ if (!cstate->ignore_conflict)
+ cstate->error_limit = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
@@ -2579,6 +2619,10 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (cstate->ignore_conflict)
+ {
+ insertMethod = CIM_SINGLE;
+ }
else
{
/*
@@ -2968,12 +3012,59 @@ CopyFrom(CopyState cstate)
*/
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
+ else if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ bool specConflict;
+ uint32 specToken;
+ specConflict = false;
+
+ specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+ HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+ /* insert the tuple, with the speculative token */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+ estate->es_output_cid,
+ HEAP_INSERT_SPECULATIVE,
+ NULL);
+
+ /* insert index entries for tuple */
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate, true, &specConflict,
+ NIL);
+
+ /* adjust the tuple's state accordingly */
+ if (!specConflict)
+ {
+ heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+ processed++;
+ }
+ else
+ {
+ heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+
+ }
+
+ /*
+ * Wake up anyone waiting for our decision. They will re-check
+ * the tuple, see that it's no longer speculative, and wait on our
+ * XID as if this was a regularly inserted tuple all along.
+ */
+ SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+ }
else
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
mycid, hi_options, bistate);
/* And create index entries for it */
- if (resultRelInfo->ri_NumIndices > 0)
+ if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
estate,
@@ -2994,7 +3085,8 @@ CopyFrom(CopyState cstate)
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
*/
- processed++;
+ if(!cstate->ignore_conflict)
+ processed++;
}
}
@@ -3286,6 +3378,48 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
+ if (cstate->failed_rec_filename)
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(cstate->failed_rec_filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for failed record file")));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ PG_TRY();
+ {
+ cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+ }
+ PG_CATCH();
+ {
+ umask(oumask);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ umask(oumask);
+ if (cstate->failed_rec_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->failed_rec_filename)));
+
+ if (fstat(fileno(cstate->failed_rec_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->failed_rec_filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+ }
+
if (data_source_cb)
{
cstate->copy_dest = COPY_CALLBACK;
@@ -3498,7 +3632,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
if (!cstate->binary)
{
char **field_strings;
@@ -3513,9 +3647,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* check for overflowing fields */
if (nfields > 0 && fldct > nfields)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " extra data after last expected column");
+ goto next_line;
+ }else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
+ }
fieldno = 0;
@@ -3523,15 +3664,29 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
if (file_has_oids)
{
if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for OID column")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " missing data for OID column");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for OID column")));
+ }
string = field_strings[fieldno++];
if (string == NULL)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " null OID in COPY data");
+ goto next_line;
+ }else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data")));
+ }
else if (cstate->oids && tupleOid != NULL)
{
cstate->cur_attname = "oid";
@@ -3539,9 +3694,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(string)));
if (*tupleOid == InvalidOid)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " invalid OID in COPY data");
+ goto next_line;
+ }else
+
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
+ }
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
@@ -3555,10 +3718,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, " missing data for column %s",
+ NameStr(att->attname));
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(att->attname))));
+ }
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
@@ -3645,10 +3818,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
}
if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+ (int) fld_count, attr_count);
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+ }
if (file_has_oids)
{
@@ -3663,9 +3845,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
-1,
&isnull));
if (isnull || loaded_oid == InvalidOid)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("invalid OID in COPY data")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " invalid OID in COPY data");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("invalid OID in COPY data")));
+ }
cstate->cur_attname = NULL;
if (cstate->oids && tupleOid != NULL)
*tupleOid = loaded_oid;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 87f5e95827..c1084f71bc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -3107,6 +3107,10 @@ copy_opt_item:
{
$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
}
+ | ON CONFLICT LOG_P Iconst ',' LOG_P FILE_P NAME_P Sconst
+ {
+ $$ = makeDefElem("ignore_conflicts", (Node *)list_make2(makeInteger($4), makeString($9)), @1);
+ }
;
/* The following exist for backward compatibility with very old versions */
@@ -15086,6 +15090,7 @@ unreserved_keyword:
| EXTENSION
| EXTERNAL
| FAMILY
+ | FILE_P
| FILTER
| FIRST_P
| FOLLOWING
@@ -15134,6 +15139,7 @@ unreserved_keyword:
| LOCATION
| LOCK_P
| LOCKED
+ | LOG_P
| LOGGED
| MAPPING
| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..442562b0fe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)