Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers
From | Sutou Kouhei |
---|---|
Subject | Re: Make COPY format extendable: Extract COPY TO format implementations |
Date | |
Msg-id | 20241105.174328.1705956947135248653.kou@clear-code.com Whole thread Raw |
In response to | Make COPY format extendable: Extract COPY TO format implementations (Sutou Kouhei <kou@clear-code.com>) |
List | pgsql-hackers |
Hi, In <CAD21AoAdj-EJOH1o2fTLke-uskSvuenT--fKW9nkLzYcLwU_eg@mail.gmail.com> "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 4 Nov 2024 22:19:07 -0800, Masahiko Sawada <sawada.mshk@gmail.com> wrote: > I've further investigated the performance regression, and found out it > might be relevant that the compiler doesn't inline the > CopyFromTextLikeOneRow() function. It might be worth testing with > pg_attribute_always_inline instead of 'inline' as below: Wow! Good catch! I've rebased on the current master and updated the v20 and v21 patch sets with "pg_attribute_always_inline" not "inline". The v22 patch set is for the v20 patch set. (TO/FROM changes are in one commit.) The v23 patch set is for the v21 patch set. (TO/FROM changes are separated for easy to merge only FROM or TO part.) I'll run benchmark on my environment again. Thanks, -- kou From 960414f4d256b0d250a70156aac50f88e07de19a Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Mon, 4 Mar 2024 13:52:34 +0900 Subject: [PATCH v22 1/5] Add CopyToRoutine/CopyFromRountine They are for implementing custom COPY TO/FROM format. But this is not enough to implement custom COPY TO/FROM format yet. We'll export some APIs to receive/send data and add "format" option to COPY TO/FROM later. Existing text/csv/binary format implementations don't use CopyToRoutine/CopyFromRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyfrom.c | 24 +++++- src/backend/commands/copyfromparse.c | 5 ++ src/backend/commands/copyto.c | 31 ++++++- src/include/commands/copyapi.h | 101 +++++++++++++++++++++++ src/include/commands/copyfrom_internal.h | 4 + src/tools/pgindent/typedefs.list | 2 + 6 files changed, 159 insertions(+), 8 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 07cbd5d22b8..909375e81b7 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1635,12 +1635,22 @@ BeginCopyFrom(ParseState *pstate, /* Fetch the input function and typioparam info */ if (cstate->opts.binary) + { getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); + else + { getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1780,10 +1790,13 @@ BeginCopyFrom(ParseState *pstate, /* Read and verify binary header */ ReceiveCopyBinaryHeader(cstate); } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) + else if (cstate->routine) { + cstate->routine->CopyFromStart(cstate, tupDesc); + } + else + { + /* create workspace for CopyReadAttributes results */ AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; @@ -1801,6 +1814,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + if (cstate->routine) + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index d1d43b53d83..b104e4a9114 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -1003,6 +1003,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Assert(fieldno == attr_count); } + else if (cstate->routine) + { + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; + } else { /* binary */ diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index f55e6d96751..405e1782685 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -20,6 +20,7 @@ #include "access/tableam.h" #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -64,6 +65,9 @@ typedef enum CopyDest */ typedef struct CopyToStateData { + /* format routine */ + const CopyToRoutine *routine; + /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ @@ -776,14 +780,22 @@ DoCopyTo(CopyToState cstate) Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->opts.binary) + { getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); else + { getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } } /* @@ -810,6 +822,8 @@ DoCopyTo(CopyToState cstate) tmp = 0; CopySendInt32(cstate, tmp); } + else if (cstate->routine) + cstate->routine->CopyToStart(cstate, tupDesc); else { /* @@ -891,6 +905,8 @@ DoCopyTo(CopyToState cstate) /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } + else if (cstate->routine) + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -912,15 +928,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + if (cstate->routine) + { + cstate->routine->CopyToOneRow(cstate, slot); + MemoryContextSwitchTo(oldcontext); + return; + } + if (cstate->opts.binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - if (!cstate->opts.binary) { bool need_delim = false; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 00000000000..d1289424c67 --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO/FROM handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Called when COPY FROM is started to set up the input functions + * associated with the relation's attributes writing to. `finfo` can be + * optionally filled to provide the catalog information of the input + * function. `typioparam` can be optionally filled to define the OID of + * the type to pass to the input function. `atttypid` is the OID of data + * type used by the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Called when COPY FROM is started. + * + * `tupDesc` is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required + * by a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Copy one row to a set of `values` and `nulls` of size tupDesc->natts. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to copy. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* Called when COPY FROM has ended. */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + +/* + * API structure for a COPY TO format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyToRoutine +{ + /* + * Called when COPY TO is started to set up the output functions + * associated with the relation's attributes reading from. `finfo` can be + * optionally filled to provide the catalog information of the output + * function. `atttypid` is the OID of data type used by the relation's + * attribute. + */ + void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid, + FmgrInfo *finfo); + + /* + * Called when COPY TO is started. + * + * `tupDesc` is the tuple descriptor of the relation from where the data + * is read. + */ + void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc); + + /* + * Copy one row for COPY TO. + * + * `slot` is the tuple slot where the data is emitted. + */ + void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO has ended */ + void (*CopyToEnd) (CopyToState cstate); +} CopyToRoutine; + +#endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc783..509b9e92a18 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,6 +15,7 @@ #define COPYFROM_INTERNAL_H #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" @@ -58,6 +59,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1847bbfa95c..a8422fa4d35 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -492,6 +492,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice @@ -503,6 +504,7 @@ CopyMultiInsertInfo CopyOnErrorChoice CopySource CopyStmt +CopyToRoutine CopyToState CopyToStateData Cost -- 2.45.2 From 78ed1bf847051f09f417980931c031cfa5d93e4c Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jul 2024 16:44:44 +0900 Subject: [PATCH v22 2/5] Use CopyToRoutine/CopyFromRountine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyfrom.c | 215 ++++++--- src/backend/commands/copyfromparse.c | 530 +++++++++++++---------- src/backend/commands/copyto.c | 477 +++++++++++++------- src/include/commands/copy.h | 2 - src/include/commands/copyfrom_internal.h | 8 + 5 files changed, 790 insertions(+), 442 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 909375e81b7..e6ea9ce1602 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations for text and CSV. + */ + +/* + * CopyFromTextLikeInFunc + * + * Assign input function data for a relation's attribute in text/CSV format. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromTextLikeStart + * + * Start of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * CopyFromTextLikeEnd + * + * End of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* + * CopyFromBinaryInFunc + * + * Assign input function data for a relation's attribute in binary format. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromBinaryStart + * + * Start of COPY FROM for binary format. + */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * CopyFromBinaryEnd + * + * End of COPY FROM for binary format. + */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * Routines assigned to each format. ++ + * CSV and text share the same implementation, at the exception of the + * per-row callback. + */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* + * Define the COPY FROM routines to use for a format. + */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + + /* * error context callback for COPY FROM * @@ -1396,7 +1547,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1428,6 +1578,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1583,25 +1736,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1634,23 +1768,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - { - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyFromInFunc(cstate, att->atttypid, - &in_functions[attnum - 1], - &typioparams[attnum - 1]); - - else - { - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1785,23 +1905,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - else if (cstate->routine) - { - cstate->routine->CopyFromStart(cstate, tupDesc); - } - else - { - /* create workspace for CopyReadAttributes results */ - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1814,8 +1918,7 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { - if (cstate->routine) - cstate->routine->CopyFromEnd(cstate); + cstate->routine->CopyFromEnd(cstate); /* No COPY FROM related resources except memory. */ if (cstate->is_program) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index b104e4a9114..0447c4df7e0 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -140,8 +140,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static pg_attribute_always_inline bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, @@ -741,8 +741,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * * NOTE: force_not_null option are not applied to the returned fields. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static pg_attribute_always_inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -759,13 +759,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -809,7 +813,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -819,8 +823,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) if (done && cstate->line_buf.len == 0) return false; - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + /* + * Parse the line into de-escaped field values + * + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -830,6 +839,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } +/* + * CopyFromTextLikeOneRow + * + * Copy one row to a set of `values` and `nulls` for the text and CSV + * formats. + * + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + */ +static pg_attribute_always_inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + + +/* + * CopyFromTextOneRow + * + * Per-row callback for COPY FROM with text format. + */ +bool +CopyFromTextOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* + * CopyFromCSVOneRow + * + * Per-row callback for COPY FROM with CSV format. + */ +bool +CopyFromCSVOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* + * CopyFromBinaryOneRow + * + * Copy one row to a set of `values` and `nulls` for the binary format. + */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * @@ -847,221 +1117,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else if (cstate->routine) - { - if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) - return false; - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1092,7 +1162,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1100,7 +1170,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1167,8 +1237,8 @@ CopyReadLine(CopyFromState cstate) /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ -static bool -CopyReadLineText(CopyFromState cstate) +static pg_attribute_always_inline bool +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1183,7 +1253,11 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1260,7 +1334,11 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant + * at caller. + */ + if (is_csv) { /* * If character is '\r', we may need to look ahead below. Force @@ -1299,7 +1377,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1327,10 +1405,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1344,10 +1422,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1355,15 +1433,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1375,7 +1453,7 @@ CopyReadLineText(CopyFromState cstate) * Process backslash, except in CSV mode where backslash is a normal * character. */ - if (c == '\\' && !cstate->opts.csv_mode) + if (c == '\\' && !is_csv) { char c2; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 405e1782685..46f3507a8b5 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ + +/* + * CopyToTextLikeSendEndOfRow + * + * Apply line terminations for a line sent in text or CSV format depending + * on the destination, then send the end of a row. + */ +static pg_attribute_always_inline void +CopyToTextLikeSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + + /* Now take the actions related to the end of a row */ + CopySendEndOfRow(cstate); +} + +/* + * CopyToTextLikeStart + * + * Start of COPY TO for text and CSV format. + */ +static void +CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + ListCell *cur; + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextLikeSendEndOfRow(cstate); + } +} + +/* + * CopyToTextLikeOutFunc + * + * Assign output function data for a relation's attribute in text/CSV format. + */ +static void +CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + + +/* + * CopyToTextLikeOneRow + * + * Process one row for text/CSV format. + * + * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). + */ +static pg_attribute_always_inline void +CopyToTextLikeOneRow(CopyToState cstate, + TupleTableSlot *slot, + bool is_csv) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1]); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextLikeSendEndOfRow(cstate); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with text format. + */ +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, false); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with CSV format. + */ +static void +CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, true); +} + +/* + * CopyToTextLikeEnd + * + * End of COPY TO for text/CSV format. + */ +static void +CopyToTextLikeEnd(CopyToState cstate) +{ + /* Nothing to do here */ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* + * CopyToBinaryStart + * + * Start of COPY TO for binary format. + */ +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); +} + +/* + * CopyToBinaryOutFunc + * + * Assign output function data for a relation's attribute in binary format. + */ +static void +CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* + * CopyToBinaryOneRow + * + * Process one row for binary format. + */ +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +/* + * CopyToBinaryEnd + * + * End of COPY TO for binary format. + */ +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +/* + * CSV and text share the same implementation, at the exception of the + * output representation and per-row callbacks. + */ +static const CopyToRoutine CopyToRoutineText = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineCSV = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToCSVOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineBinary = { + .CopyToStart = CopyToBinaryStart, + .CopyToOutFunc = CopyToBinaryOutFunc, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* + * Define the COPY TO routines to use for a format. This should be called + * after options are parsed. + */ +static const CopyToRoutine * +CopyToGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyToRoutineCSV; + else if (opts.binary) + return &CopyToRoutineBinary; + + /* default is text */ + return &CopyToRoutineText; +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyToGetRoutine(cstate->opts); + /* Process the source/target relation or query */ if (rel) { @@ -775,27 +1075,10 @@ DoCopyTo(CopyToState cstate) foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->opts.binary) - { - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyToOutFunc(cstate, attr->atttypid, - &cstate->out_functions[attnum - 1]); - else - { - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); } /* @@ -808,58 +1091,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else if (cstate->routine) - cstate->routine->CopyToStart(cstate, tupDesc); - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -898,15 +1130,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - else if (cstate->routine) - cstate->routine->CopyToEnd(cstate); + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -922,7 +1146,6 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; MemoryContextReset(cstate->rowcontext); @@ -931,69 +1154,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - if (cstate->routine) - { - cstate->routine->CopyToOneRow(cstate, slot); - MemoryContextSwitchTo(oldcontext); - return; - } - - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - - if (!cstate->opts.binary) - { - bool need_delim = false; - - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - char *string; - - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - - if (isnull) - CopySendString(cstate, cstate->opts.null_print_client); - else - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1]); - else - CopyAttributeOutText(cstate, string); - } - } - } - else - { - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - bytea *outputbytes; - - if (isnull) - CopySendInt32(cstate, -1); - else - { - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 4002a7f5382..f2409013fba 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 509b9e92a18..c11b5ff3cc0 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -187,4 +187,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* Callbacks for CopyFromRoutine->CopyFromOneRow */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ -- 2.45.2 From 328bb34d626fdbcc2cb2e2013c46e24e6123faef Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jul 2024 17:39:41 +0900 Subject: [PATCH v22 3/5] Add support for adding custom COPY TO/FROM format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine and a COPY FROM handler returns a CopyFromRoutine. This uses the same handler for COPY TO and COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine This also add a test module for custom COPY TO/FROM handler. --- src/backend/commands/copy.c | 96 ++++++++++++++--- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyto.c | 4 +- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copy.h | 2 + src/include/commands/copyapi.h | 4 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 ++++ .../expected/test_copy_format.out | 21 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++ .../test_copy_format/sql/test_copy_format.sql | 6 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 100 ++++++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 21 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3485ba8663f..c8643b2dee7 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -462,6 +463,87 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine (for + * is_from == false) or CopyFromRountine (for is_from == true). If no COPY + * format handler is found, this function reports an error. + */ +static void +ProcessCopyOptionFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + Node *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + return; + } + + /* custom format */ + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = (Node *) DatumGetPointer(datum); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + + opts_out->routine = routine; +} + /* * Process the statement option list for COPY. * @@ -505,22 +587,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionFormat(pstate, opts_out, is_from, defel); } else if (strcmp(defel->defname, "freeze") == 0) { diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index e6ea9ce1602..932f1ff4f6e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = { static const CopyFromRoutine * CopyFromGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyFromRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyFromRoutineCSV; else if (opts.binary) return &CopyFromRoutineBinary; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 46f3507a8b5..1f1d2baf9be 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = { static const CopyToRoutine * CopyToGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyToRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyToRoutineCSV; else if (opts.binary) return &CopyToRoutineBinary; diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e0..173ee11811c 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 81df3bdf95f..428ab4f0d93 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ my @all_input_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ my @nodetag_only_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index e189e9b79d2..25f24ab95d2 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f23321a41f1..6af90a26374 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7761,6 +7761,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index ceff66ccde1..37ebfa0908f 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -633,6 +633,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to/from method function', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', descr => 'pseudo-type for the result of a table AM handler function', typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2409013fba..63f3e8e1af7 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -87,6 +87,8 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index d1289424c67..e049a45a4b1 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -27,6 +27,8 @@ typedef struct CopyToStateData *CopyToState; */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Called when COPY FROM is started to set up the input functions * associated with the relation's attributes writing to. `finfo` can be @@ -69,6 +71,8 @@ typedef struct CopyFromRoutine */ typedef struct CopyToRoutine { + NodeTag type; + /* * Called when COPY TO is started to set up the output functions * associated with the relation's attributes reading from. `finfo` can be diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b657..103df1a7873 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index c0d3cf0e14b..33e3a49a4fb 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index c829b619530..75b6ab1b6a9 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -14,6 +14,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..8497f91624d --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..4ed7c0b12db --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,21 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd +COPY public.test TO stdout WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToOutFunc: atttypid=21 +NOTICE: CopyToOutFunc: atttypid=23 +NOTICE: CopyToOutFunc: atttypid=20 +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..4cefe7b709a --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..e805f7cb011 --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,6 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +\. +COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 00000000000..d24ea03ce99 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..f6b105659ab --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,100 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static void +CopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid))); +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromInFunc = CopyFromInFunc, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + +static void +CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToOutFunc = CopyToOutFunc, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 00000000000..f05a6362358 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true -- 2.45.2 From 682b868c825409d44aec0d3ad32ece63aaed309f Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jan 2024 14:54:10 +0900 Subject: [PATCH v22 4/5] Export CopyToStateData and CopyFromStateData It's for custom COPY TO/FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest/CopySource enum values. CopyDest/CopySource enum values such as COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for CopyDest enum values and COPY_SOURCE_ prefix instead of COPY_ prefix is used for CopySource enum values. For example, COPY_FILE in CopyDest is renamed to COPY_DEST_FILE and COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE. Note that this isn't enough to implement custom COPY TO/FROM format handlers as extension. We'll do the followings in a subsequent commit: For custom COPY TO format handler: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer For custom COPY FROM format handler: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/backend/commands/copyto.c | 77 +----- src/include/commands/copy.h | 81 +----- src/include/commands/copyapi.h | 309 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 165 ------------ 6 files changed, 323 insertions(+), 323 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 932f1ff4f6e..d758e66c6a1 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1716,7 +1716,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1844,7 +1844,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 0447c4df7e0..ccfbacb4a37 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -171,7 +171,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -239,7 +239,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -331,7 +331,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1179,7 +1179,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 1f1d2baf9be..fb68f42ce1e 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -37,67 +37,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format routine */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -929,12 +868,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 63f3e8e1af7..e2411848e9f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,90 +14,11 @@ #ifndef COPY_H #define COPY_H -#include "nodes/execnodes.h" +#include "commands/copyapi.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * Represents verbosity of logged messages by COPY command. - */ -typedef enum CopyLogVerbosityChoice -{ - COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */ - COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is - * the default, assign 0 */ - COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ -} CopyLogVerbosityChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ - int64 reject_limit; /* maximum tolerable number of errors */ - List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine or CopyFromRoutine (can be - * NULL) */ -} CopyFormatOptions; - -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index e049a45a4b1..206d4c9fac9 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,12 +14,84 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" -/* These are private in commands/copy[from|to].c */ +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * Represents verbosity of logged messages by COPY command. + */ +typedef enum CopyLogVerbosityChoice +{ + COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */ + COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is + * the default, assign 0 */ + COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ +} CopyLogVerbosityChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ + int64 reject_limit; /* maximum tolerable number of errors */ + List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ +} CopyFormatOptions; + typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; /* * API structure for a COPY FROM format implementation. Note this must be @@ -65,6 +137,176 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +/* + * Represents the insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ +} CopyInsertMethod; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* format routine */ + const CopyFromRoutine *routine; + + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + + +typedef struct CopyToStateData *CopyToState; + /* * API structure for a COPY TO format implementation. Note this must be * allocated in a server-lifetime manner, typically as a static const struct. @@ -102,4 +344,67 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format routine */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYAPI_H */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c11b5ff3cc0..3863d26d5b7 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -19,171 +19,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - -/* - * Represents the insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or - * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or - * ExecForeignBatchInsert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* format routine */ - const CopyFromRoutine *routine; - - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.45.2 From 3f9b4a8caa33960fe11512883177a96939186373 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Tue, 23 Jan 2024 15:12:43 +0900 Subject: [PATCH v22 5/5] Add support for implementing custom COPY TO/FROM format as extension For custom COPY TO format implementation: * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf as CopyToStateFlush() For custom COPY FROM format implementation: * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data as CopyFromStateRead() --- src/backend/commands/copyfromparse.c | 14 ++++++++++++++ src/backend/commands/copyto.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 10 ++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index ccfbacb4a37..4fa23d992f5 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -730,6 +730,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) return copied_bytes; } +/* + * CopyFromStateRead + * + * Export CopyReadBinaryData() for extensions. We want to keep + * CopyReadBinaryData() as a static function for + * optimization. CopyReadBinaryData() calls in this file may be optimized by + * a compiler. + */ +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) +{ + return CopyReadBinaryData(cstate, dest, nbytes); +} + /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index fb68f42ce1e..93b041352c5 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * CopyToStateFlush + * + * Export CopySendEndOfRow() for extensions. We want to keep + * CopySendEndOfRow() as a static function for + * optimization. CopySendEndOfRow() calls in this file may be optimized by a + * compiler. + */ +void +CopyToStateFlush(CopyToState cstate) +{ + CopySendEndOfRow(cstate); +} + /* * These functions do apply some data conversion */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 206d4c9fac9..2de610ef729 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -302,8 +302,13 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + typedef struct CopyToStateData *CopyToState; @@ -405,6 +410,11 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + #endif /* COPYAPI_H */ -- 2.45.2 From 79470eab70ba8df417796cc9e66eca41b97e74b5 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:24:49 +0900 Subject: [PATCH v23 01/10] Add CopyToRountine It's for implementing custom COPY TO format. But this is not enough to implement custom COPY TO format yet. We'll export some APIs to send data and add "format" option to COPY TO later. Existing text/csv/binary format implementations don't use CopyToRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyto.c | 31 ++++++++++++++--- src/include/commands/copyapi.h | 58 ++++++++++++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 src/include/commands/copyapi.h diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index f55e6d96751..405e1782685 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -20,6 +20,7 @@ #include "access/tableam.h" #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" @@ -64,6 +65,9 @@ typedef enum CopyDest */ typedef struct CopyToStateData { + /* format routine */ + const CopyToRoutine *routine; + /* low-level state data */ CopyDest copy_dest; /* type of copy source/destination */ FILE *copy_file; /* used if copy_dest == COPY_FILE */ @@ -776,14 +780,22 @@ DoCopyTo(CopyToState cstate) Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); if (cstate->opts.binary) + { getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); else + { getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } } /* @@ -810,6 +822,8 @@ DoCopyTo(CopyToState cstate) tmp = 0; CopySendInt32(cstate, tmp); } + else if (cstate->routine) + cstate->routine->CopyToStart(cstate, tupDesc); else { /* @@ -891,6 +905,8 @@ DoCopyTo(CopyToState cstate) /* Need to flush out the trailer */ CopySendEndOfRow(cstate); } + else if (cstate->routine) + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -912,15 +928,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); + /* Make sure the tuple is fully deconstructed */ + slot_getallattrs(slot); + + if (cstate->routine) + { + cstate->routine->CopyToOneRow(cstate, slot); + MemoryContextSwitchTo(oldcontext); + return; + } + if (cstate->opts.binary) { /* Binary per-tuple header */ CopySendInt16(cstate, list_length(cstate->attnumlist)); } - /* Make sure the tuple is fully deconstructed */ - slot_getallattrs(slot); - if (!cstate->opts.binary) { bool need_delim = false; diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h new file mode 100644 index 00000000000..5ce24f195dc --- /dev/null +++ b/src/include/commands/copyapi.h @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * copyapi.h + * API for COPY TO handlers + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/commands/copyapi.h + * + *------------------------------------------------------------------------- + */ +#ifndef COPYAPI_H +#define COPYAPI_H + +#include "executor/tuptable.h" +#include "nodes/execnodes.h" + +/* This is private in commands/copyto.c */ +typedef struct CopyToStateData *CopyToState; + +/* + * API structure for a COPY TO format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyToRoutine +{ + /* + * Called when COPY TO is started to set up the output functions + * associated with the relation's attributes reading from. `finfo` can be + * optionally filled to provide the catalog information of the output + * function. `atttypid` is the OID of data type used by the relation's + * attribute. + */ + void (*CopyToOutFunc) (CopyToState cstate, Oid atttypid, + FmgrInfo *finfo); + + /* + * Called when COPY TO is started. + * + * `tupDesc` is the tuple descriptor of the relation from where the data + * is read. + */ + void (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc); + + /* + * Copy one row for COPY TO. + * + * `slot` is the tuple slot where the data is emitted. + */ + void (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO has ended */ + void (*CopyToEnd) (CopyToState cstate); +} CopyToRoutine; + +#endif /* COPYAPI_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1847bbfa95c..098e7023486 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -503,6 +503,7 @@ CopyMultiInsertInfo CopyOnErrorChoice CopySource CopyStmt +CopyToRoutine CopyToState CopyToStateData Cost -- 2.45.2 From 8743273a660c75b49862dbb18991c74131ed776e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:26:29 +0900 Subject: [PATCH v23 02/10] Use CopyToRountine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyToTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyToTextOneRow() calls CopyToTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyto.c | 477 +++++++++++++++++++++++----------- 1 file changed, 319 insertions(+), 158 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 405e1782685..46f3507a8b5 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToRoutine implementations. + */ + +/* + * CopyToTextLikeSendEndOfRow + * + * Apply line terminations for a line sent in text or CSV format depending + * on the destination, then send the end of a row. + */ +static pg_attribute_always_inline void +CopyToTextLikeSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + + /* Now take the actions related to the end of a row */ + CopySendEndOfRow(cstate); +} + +/* + * CopyToTextLikeStart + * + * Start of COPY TO for text and CSV format. + */ +static void +CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* + * For non-binary copy, we need to convert null_print to file encoding, + * because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + ListCell *cur; + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToTextLikeSendEndOfRow(cstate); + } +} + +/* + * CopyToTextLikeOutFunc + * + * Assign output function data for a relation's attribute in text/CSV format. + */ +static void +CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + + +/* + * CopyToTextLikeOneRow + * + * Process one row for text/CSV format. + * + * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow(). + */ +static pg_attribute_always_inline void +CopyToTextLikeOneRow(CopyToState cstate, + TupleTableSlot *slot, + bool is_csv) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], + value); + + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1]); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToTextLikeSendEndOfRow(cstate); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with text format. + */ +static void +CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, false); +} + +/* + * CopyToTextOneRow + * + * Per-row callback for COPY TO with CSV format. + */ +static void +CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + CopyToTextLikeOneRow(cstate, slot, true); +} + +/* + * CopyToTextLikeEnd + * + * End of COPY TO for text/CSV format. + */ +static void +CopyToTextLikeEnd(CopyToState cstate) +{ + /* Nothing to do here */ +} + +/* + * CopyToRoutine implementation for "binary". + */ + +/* + * CopyToBinaryStart + * + * Start of COPY TO for binary format. + */ +static void +CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); +} + +/* + * CopyToBinaryOutFunc + * + * Assign output function data for a relation's attribute in binary format. + */ +static void +CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + Oid func_oid; + bool is_varlena; + + /* Set output function for an attribute */ + getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena); + fmgr_info(func_oid, finfo); +} + +/* + * CopyToBinaryOneRow + * + * Process one row for binary format. + */ +static void +CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach_int(attnum, cstate->attnumlist) + { + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], + value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +/* + * CopyToBinaryEnd + * + * End of COPY TO for binary format. + */ +static void +CopyToBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +/* + * CSV and text share the same implementation, at the exception of the + * output representation and per-row callbacks. + */ +static const CopyToRoutine CopyToRoutineText = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToTextOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineCSV = { + .CopyToStart = CopyToTextLikeStart, + .CopyToOutFunc = CopyToTextLikeOutFunc, + .CopyToOneRow = CopyToCSVOneRow, + .CopyToEnd = CopyToTextLikeEnd, +}; + +static const CopyToRoutine CopyToRoutineBinary = { + .CopyToStart = CopyToBinaryStart, + .CopyToOutFunc = CopyToBinaryOutFunc, + .CopyToOneRow = CopyToBinaryOneRow, + .CopyToEnd = CopyToBinaryEnd, +}; + +/* + * Define the COPY TO routines to use for a format. This should be called + * after options are parsed. + */ +static const CopyToRoutine * +CopyToGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyToRoutineCSV; + else if (opts.binary) + return &CopyToRoutineBinary; + + /* default is text */ + return &CopyToRoutineText; +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyToGetRoutine(cstate->opts); + /* Process the source/target relation or query */ if (rel) { @@ -775,27 +1075,10 @@ DoCopyTo(CopyToState cstate) foreach(cur, cstate->attnumlist) { int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - if (cstate->opts.binary) - { - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyToOutFunc(cstate, attr->atttypid, - &cstate->out_functions[attnum - 1]); - else - { - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } + cstate->routine->CopyToOutFunc(cstate, attr->atttypid, + &cstate->out_functions[attnum - 1]); } /* @@ -808,58 +1091,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else if (cstate->routine) - cstate->routine->CopyToStart(cstate, tupDesc); - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->routine->CopyToStart(cstate, tupDesc); if (cstate->rel) { @@ -898,15 +1130,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } - else if (cstate->routine) - cstate->routine->CopyToEnd(cstate); + cstate->routine->CopyToEnd(cstate); MemoryContextDelete(cstate->rowcontext); @@ -922,7 +1146,6 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; MemoryContextReset(cstate->rowcontext); @@ -931,69 +1154,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - if (cstate->routine) - { - cstate->routine->CopyToOneRow(cstate, slot); - MemoryContextSwitchTo(oldcontext); - return; - } - - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - - if (!cstate->opts.binary) - { - bool need_delim = false; - - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - char *string; - - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - - if (isnull) - CopySendString(cstate, cstate->opts.null_print_client); - else - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1]); - else - CopyAttributeOutText(cstate, string); - } - } - } - else - { - foreach_int(attnum, cstate->attnumlist) - { - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - bytea *outputbytes; - - if (isnull) - CopySendInt32(cstate, -1); - else - { - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->routine->CopyToOneRow(cstate, slot); MemoryContextSwitchTo(oldcontext); } -- 2.45.2 From aadcd4bb260d14fc40ab69178fcc85271307b4a9 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:40:54 +0900 Subject: [PATCH v23 03/10] Add support for adding custom COPY TO format This uses the handler approach like tablesample. The approach creates an internal function that returns an internal struct. In this case, a COPY TO handler returns a CopyToRoutine. This also add a test module for custom COPY TO handler. --- src/backend/commands/copy.c | 82 ++++++++++++++++--- src/backend/commands/copyto.c | 4 +- src/backend/nodes/Makefile | 1 + src/backend/nodes/gen_node_support.pl | 2 + src/backend/utils/adt/pseudotypes.c | 1 + src/include/catalog/pg_proc.dat | 6 ++ src/include/catalog/pg_type.dat | 6 ++ src/include/commands/copy.h | 1 + src/include/commands/copyapi.h | 2 + src/include/nodes/meson.build | 1 + src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + src/test/modules/test_copy_format/.gitignore | 4 + src/test/modules/test_copy_format/Makefile | 23 ++++++ .../expected/test_copy_format.out | 17 ++++ src/test/modules/test_copy_format/meson.build | 33 ++++++++ .../test_copy_format/sql/test_copy_format.sql | 5 ++ .../test_copy_format--1.0.sql | 8 ++ .../test_copy_format/test_copy_format.c | 63 ++++++++++++++ .../test_copy_format/test_copy_format.control | 4 + 20 files changed, 251 insertions(+), 14 deletions(-) create mode 100644 src/test/modules/test_copy_format/.gitignore create mode 100644 src/test/modules/test_copy_format/Makefile create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out create mode 100644 src/test/modules/test_copy_format/meson.build create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql create mode 100644 src/test/modules/test_copy_format/test_copy_format.c create mode 100644 src/test/modules/test_copy_format/test_copy_format.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3485ba8663f..02528fbcc1f 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -32,6 +32,7 @@ #include "parser/parse_coerce.h" #include "parser/parse_collate.h" #include "parser/parse_expr.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -462,6 +463,73 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) return COPY_LOG_VERBOSITY_DEFAULT; /* keep compiler quiet */ } +/* + * Process the "format" option. + * + * This function checks whether the option value is a built-in format such as + * "text" and "csv" or not. If the option value isn't a built-in format, this + * function finds a COPY format handler that returns a CopyToRoutine (for + * is_from == false). If no COPY format handler is found, this function + * reports an error. + */ +static void +ProcessCopyOptionFormat(ParseState *pstate, + CopyFormatOptions *opts_out, + bool is_from, + DefElem *defel) +{ + char *format; + Oid funcargtypes[1]; + Oid handlerOid = InvalidOid; + Datum datum; + Node *routine; + + format = defGetString(defel); + + /* built-in formats */ + if (strcmp(format, "text") == 0) + /* default format */ return; + else if (strcmp(format, "csv") == 0) + { + opts_out->csv_mode = true; + return; + } + else if (strcmp(format, "binary") == 0) + { + opts_out->binary = true; + return; + } + + /* custom format */ + if (!is_from) + { + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); + } + if (!OidIsValid(handlerOid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY format \"%s\" not recognized", format), + parser_errposition(pstate, defel->location))); + + datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); + routine = (Node *) DatumGetPointer(datum); + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + + opts_out->routine = routine; +} + /* * Process the statement option list for COPY. * @@ -505,22 +573,10 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(defel->defname, "format") == 0) { - char *fmt = defGetString(defel); - if (format_specified) errorConflictingDefElem(defel, pstate); format_specified = true; - if (strcmp(fmt, "text") == 0) - /* default format */ ; - else if (strcmp(fmt, "csv") == 0) - opts_out->csv_mode = true; - else if (strcmp(fmt, "binary") == 0) - opts_out->binary = true; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY format \"%s\" not recognized", fmt), - parser_errposition(pstate, defel->location))); + ProcessCopyOptionFormat(pstate, opts_out, is_from, defel); } else if (strcmp(defel->defname, "freeze") == 0) { diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 46f3507a8b5..1f1d2baf9be 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = { static const CopyToRoutine * CopyToGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyToRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyToRoutineCSV; else if (opts.binary) return &CopyToRoutineBinary; diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile index 66bbad8e6e0..173ee11811c 100644 --- a/src/backend/nodes/Makefile +++ b/src/backend/nodes/Makefile @@ -49,6 +49,7 @@ node_headers = \ access/sdir.h \ access/tableam.h \ access/tsmapi.h \ + commands/copyapi.h \ commands/event_trigger.h \ commands/trigger.h \ executor/tuptable.h \ diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 81df3bdf95f..428ab4f0d93 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -61,6 +61,7 @@ my @all_input_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h @@ -85,6 +86,7 @@ my @nodetag_only_files = qw( access/sdir.h access/tableam.h access/tsmapi.h + commands/copyapi.h commands/event_trigger.h commands/trigger.h executor/tuptable.h diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c index e189e9b79d2..25f24ab95d2 100644 --- a/src/backend/utils/adt/pseudotypes.c +++ b/src/backend/utils/adt/pseudotypes.c @@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler); +PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler); PSEUDOTYPE_DUMMY_IO_FUNCS(internal); PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement); PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f23321a41f1..6af90a26374 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -7761,6 +7761,12 @@ { oid => '3312', descr => 'I/O', proname => 'tsm_handler_out', prorettype => 'cstring', proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' }, +{ oid => '8753', descr => 'I/O', + proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler', + proargtypes => 'cstring', prosrc => 'copy_handler_in' }, +{ oid => '8754', descr => 'I/O', + proname => 'copy_handler_out', prorettype => 'cstring', + proargtypes => 'copy_handler', prosrc => 'copy_handler_out' }, { oid => '267', descr => 'I/O', proname => 'table_am_handler_in', proisstrict => 'f', prorettype => 'table_am_handler', proargtypes => 'cstring', diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index ceff66ccde1..793dd671935 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -633,6 +633,12 @@ typcategory => 'P', typinput => 'tsm_handler_in', typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, +{ oid => '8752', + descr => 'pseudo-type for the result of a copy to method function', + typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', + typcategory => 'P', typinput => 'copy_handler_in', + typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', + typalign => 'i' }, { oid => '269', descr => 'pseudo-type for the result of a table AM handler function', typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p', diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 4002a7f5382..7659d8ae32f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -87,6 +87,7 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; /* These are private in commands/copy[from|to].c */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 5ce24f195dc..05b7d92ddba 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -26,6 +26,8 @@ typedef struct CopyToStateData *CopyToState; */ typedef struct CopyToRoutine { + NodeTag type; + /* * Called when COPY TO is started to set up the output functions * associated with the relation's attributes reading from. `finfo` can be diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build index b665e55b657..103df1a7873 100644 --- a/src/include/nodes/meson.build +++ b/src/include/nodes/meson.build @@ -11,6 +11,7 @@ node_support_input_i = [ 'access/sdir.h', 'access/tableam.h', 'access/tsmapi.h', + 'commands/copyapi.h', 'commands/event_trigger.h', 'commands/trigger.h', 'executor/tuptable.h', diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index c0d3cf0e14b..33e3a49a4fb 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ spgist_name_ops \ test_bloomfilter \ test_copy_callbacks \ + test_copy_format \ test_custom_rmgrs \ test_ddl_deparse \ test_dsa \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index c829b619530..75b6ab1b6a9 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -14,6 +14,7 @@ subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') subdir('test_copy_callbacks') +subdir('test_copy_format') subdir('test_custom_rmgrs') subdir('test_ddl_deparse') subdir('test_dsa') diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_copy_format/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile new file mode 100644 index 00000000000..8497f91624d --- /dev/null +++ b/src/test/modules/test_copy_format/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_format/Makefile + +MODULE_big = test_copy_format +OBJS = \ + $(WIN32RES) \ + test_copy_format.o +PGFILEDESC = "test_copy_format - test custom COPY FORMAT" + +EXTENSION = test_copy_format +DATA = test_copy_format--1.0.sql + +REGRESS = test_copy_format + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_format +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out new file mode 100644 index 00000000000..606c78f6878 --- /dev/null +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -0,0 +1,17 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +ERROR: COPY format "test_copy_format" not recognized +LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')... + ^ +COPY public.test TO stdout WITH (format 'test_copy_format'); +NOTICE: test_copy_format: is_from=false +NOTICE: CopyToOutFunc: atttypid=21 +NOTICE: CopyToOutFunc: atttypid=23 +NOTICE: CopyToOutFunc: atttypid=20 +NOTICE: CopyToStart: natts=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToOneRow: tts_nvalid=3 +NOTICE: CopyToEnd diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build new file mode 100644 index 00000000000..4cefe7b709a --- /dev/null +++ b/src/test/modules/test_copy_format/meson.build @@ -0,0 +1,33 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +test_copy_format_sources = files( + 'test_copy_format.c', +) + +if host_system == 'windows' + test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_format', + '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',]) +endif + +test_copy_format = shared_module('test_copy_format', + test_copy_format_sources, + kwargs: pg_test_mod_args, +) +test_install_libs += test_copy_format + +test_install_data += files( + 'test_copy_format.control', + 'test_copy_format--1.0.sql', +) + +tests += { + 'name': 'test_copy_format', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_format', + ], + }, +} diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql new file mode 100644 index 00000000000..9406b3be3d4 --- /dev/null +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -0,0 +1,5 @@ +CREATE EXTENSION test_copy_format; +CREATE TABLE public.test (a smallint, b integer, c bigint); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +COPY public.test FROM stdin WITH (format 'test_copy_format'); +COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql b/src/test/modules/test_copy_format/test_copy_format--1.0.sql new file mode 100644 index 00000000000..d24ea03ce99 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit + +CREATE FUNCTION test_copy_format(internal) + RETURNS copy_handler + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c new file mode 100644 index 00000000000..e064f40473b --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -0,0 +1,63 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_format.c + * Code for testing custom COPY format. + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_format/test_copy_format.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "commands/copyapi.h" +#include "commands/defrem.h" + +PG_MODULE_MAGIC; + +static void +CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) +{ + ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid))); +} + +static void +CopyToStart(CopyToState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts))); +} + +static void +CopyToOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid))); +} + +static void +CopyToEnd(CopyToState cstate) +{ + ereport(NOTICE, (errmsg("CopyToEnd"))); +} + +static const CopyToRoutine CopyToRoutineTestCopyFormat = { + .type = T_CopyToRoutine, + .CopyToOutFunc = CopyToOutFunc, + .CopyToStart = CopyToStart, + .CopyToOneRow = CopyToOneRow, + .CopyToEnd = CopyToEnd, +}; + +PG_FUNCTION_INFO_V1(test_copy_format); +Datum +test_copy_format(PG_FUNCTION_ARGS) +{ + bool is_from = PG_GETARG_BOOL(0); + + ereport(NOTICE, + (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); + + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); +} diff --git a/src/test/modules/test_copy_format/test_copy_format.control b/src/test/modules/test_copy_format/test_copy_format.control new file mode 100644 index 00000000000..f05a6362358 --- /dev/null +++ b/src/test/modules/test_copy_format/test_copy_format.control @@ -0,0 +1,4 @@ +comment = 'Test code for custom COPY format' +default_version = '1.0' +module_pathname = '$libdir/test_copy_format' +relocatable = true -- 2.45.2 From eedabed5e0e1a9f8f9e5591b806c6f606229874a Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:56:36 +0900 Subject: [PATCH v23 04/10] Export CopyToStateData It's for custom COPY TO format handlers implemented as extension. This just moves codes. This doesn't change codes except CopyDest enum values. CopyDest/CopyFrom enum values such as COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for CopyDest enum values. For example, COPY_FILE in CopyDest is renamed to COPY_DEST_FILE. Note that this isn't enough to implement custom COPY TO format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY TO format handler 2. Export CopySendEndOfRow() to flush buffer --- src/backend/commands/copyto.c | 77 ++---------------- src/include/commands/copy.h | 77 +----------------- src/include/commands/copyapi.h | 137 ++++++++++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 145 deletions(-) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 1f1d2baf9be..fb68f42ce1e 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -37,67 +37,6 @@ #include "utils/rel.h" #include "utils/snapmgr.h" -/* - * Represents the different dest cases we need to worry about at - * the bottom level - */ -typedef enum CopyDest -{ - COPY_FILE, /* to file (or a piped program) */ - COPY_FRONTEND, /* to frontend */ - COPY_CALLBACK, /* to callback function */ -} CopyDest; - -/* - * This struct contains all the state variables used throughout a COPY TO - * operation. - * - * Multi-byte encodings: all supported client-side encodings encode multi-byte - * characters by having the first byte's high bit set. Subsequent bytes of the - * character can have the high bit not set. When scanning data in such an - * encoding to look for a match to a single-byte (ie ASCII) character, we must - * use the full pg_encoding_mblen() machinery to skip over multibyte - * characters, else we might find a false match to a trailing byte. In - * supported server encodings, there is no possibility of a false match, and - * it's faster to make useless comparisons to trailing bytes than it is to - * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true - * when we have to do it the hard way. - */ -typedef struct CopyToStateData -{ - /* format routine */ - const CopyToRoutine *routine; - - /* low-level state data */ - CopyDest copy_dest; /* type of copy source/destination */ - FILE *copy_file; /* used if copy_dest == COPY_FILE */ - StringInfo fe_msgbuf; /* used for all dests during COPY TO */ - - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy to */ - QueryDesc *queryDesc; /* executable query to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDOUT */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_dest_cb data_dest_cb; /* function for writing data */ - - CopyFormatOptions opts; - Node *whereClause; /* WHERE condition (or NULL) */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - FmgrInfo *out_functions; /* lookup info for output functions */ - MemoryContext rowcontext; /* per-row evaluation context */ - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyToStateData; - /* DestReceiver for COPY (query) TO */ typedef struct { @@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) { switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: /* Default line termination depends on platform */ #ifndef WIN32 CopySendChar(cstate, '\n'); @@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate) CopySendString(cstate, "\r\n"); #endif break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ CopySendChar(cstate, '\n'); break; @@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_FRONTEND; + cstate->copy_dest = COPY_DEST_FRONTEND; } static void @@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { - case COPY_FILE: + case COPY_DEST_FILE: if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate) errmsg("could not write to COPY file: %m"))); } break; - case COPY_FRONTEND: + case COPY_DEST_FRONTEND: /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; - case COPY_CALLBACK: + case COPY_DEST_CALLBACK: cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); break; } @@ -929,12 +868,12 @@ BeginCopyTo(ParseState *pstate, /* See Multibyte encoding comment above */ cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); - cstate->copy_dest = COPY_FILE; /* default */ + cstate->copy_dest = COPY_DEST_FILE; /* default */ if (data_dest_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_dest = COPY_CALLBACK; + cstate->copy_dest = COPY_DEST_CALLBACK; cstate->data_dest_cb = data_dest_cb; } else if (pipe) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 7659d8ae32f..dd645eaa030 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -14,88 +14,15 @@ #ifndef COPY_H #define COPY_H -#include "nodes/execnodes.h" +#include "commands/copyapi.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" -/* - * Represents whether a header line should be present, and whether it must - * match the actual names (which implies "true"). - */ -typedef enum CopyHeaderChoice -{ - COPY_HEADER_FALSE = 0, - COPY_HEADER_TRUE, - COPY_HEADER_MATCH, -} CopyHeaderChoice; - -/* - * Represents where to save input processing errors. More values to be added - * in the future. - */ -typedef enum CopyOnErrorChoice -{ - COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ - COPY_ON_ERROR_IGNORE, /* ignore errors */ -} CopyOnErrorChoice; - -/* - * Represents verbosity of logged messages by COPY command. - */ -typedef enum CopyLogVerbosityChoice -{ - COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */ - COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is - * the default, assign 0 */ - COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ -} CopyLogVerbosityChoice; - -/* - * A struct to hold COPY options, in a parsed form. All of these are related - * to formatting, except for 'freeze', which doesn't really belong here, but - * it's expedient to parse it along with all the other options. - */ -typedef struct CopyFormatOptions -{ - /* parameters from the COPY command */ - int file_encoding; /* file or remote side's character encoding, - * -1 if not specified */ - bool binary; /* binary format? */ - bool freeze; /* freeze rows on loading? */ - bool csv_mode; /* Comma Separated Value format? */ - CopyHeaderChoice header_line; /* header line? */ - char *null_print; /* NULL marker string (server encoding!) */ - int null_print_len; /* length of same */ - char *null_print_client; /* same converted to file encoding */ - char *default_print; /* DEFAULT marker string */ - int default_print_len; /* length of same */ - char *delim; /* column delimiter (must be 1 byte) */ - char *quote; /* CSV quote char (must be 1 byte) */ - char *escape; /* CSV escape char (must be 1 byte) */ - List *force_quote; /* list of column names */ - bool force_quote_all; /* FORCE_QUOTE *? */ - bool *force_quote_flags; /* per-column CSV FQ flags */ - List *force_notnull; /* list of column names */ - bool force_notnull_all; /* FORCE_NOT_NULL *? */ - bool *force_notnull_flags; /* per-column CSV FNN flags */ - List *force_null; /* list of column names */ - bool force_null_all; /* FORCE_NULL *? */ - bool *force_null_flags; /* per-column CSV FN flags */ - bool convert_selectively; /* do selective binary conversion? */ - CopyOnErrorChoice on_error; /* what to do when error happened */ - CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ - int64 reject_limit; /* maximum tolerable number of errors */ - List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine (can be NULL) */ -} CopyFormatOptions; - -/* These are private in commands/copy[from|to].c */ +/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); -typedef void (*copy_data_dest_cb) (void *data, int len); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 05b7d92ddba..b6ddb5f6216 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -14,10 +14,82 @@ #ifndef COPYAPI_H #define COPYAPI_H +#include "commands/trigger.h" +#include "executor/execdesc.h" #include "executor/tuptable.h" #include "nodes/execnodes.h" -/* This is private in commands/copyto.c */ +/* + * Represents whether a header line should be present, and whether it must + * match the actual names (which implies "true"). + */ +typedef enum CopyHeaderChoice +{ + COPY_HEADER_FALSE = 0, + COPY_HEADER_TRUE, + COPY_HEADER_MATCH, +} CopyHeaderChoice; + +/* + * Represents where to save input processing errors. More values to be added + * in the future. + */ +typedef enum CopyOnErrorChoice +{ + COPY_ON_ERROR_STOP = 0, /* immediately throw errors, default */ + COPY_ON_ERROR_IGNORE, /* ignore errors */ +} CopyOnErrorChoice; + +/* + * Represents verbosity of logged messages by COPY command. + */ +typedef enum CopyLogVerbosityChoice +{ + COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */ + COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is + * the default, assign 0 */ + COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */ +} CopyLogVerbosityChoice; + +/* + * A struct to hold COPY options, in a parsed form. All of these are related + * to formatting, except for 'freeze', which doesn't really belong here, but + * it's expedient to parse it along with all the other options. + */ +typedef struct CopyFormatOptions +{ + /* parameters from the COPY command */ + int file_encoding; /* file or remote side's character encoding, + * -1 if not specified */ + bool binary; /* binary format? */ + bool freeze; /* freeze rows on loading? */ + bool csv_mode; /* Comma Separated Value format? */ + CopyHeaderChoice header_line; /* header line? */ + char *null_print; /* NULL marker string (server encoding!) */ + int null_print_len; /* length of same */ + char *null_print_client; /* same converted to file encoding */ + char *default_print; /* DEFAULT marker string */ + int default_print_len; /* length of same */ + char *delim; /* column delimiter (must be 1 byte) */ + char *quote; /* CSV quote char (must be 1 byte) */ + char *escape; /* CSV escape char (must be 1 byte) */ + List *force_quote; /* list of column names */ + bool force_quote_all; /* FORCE_QUOTE *? */ + bool *force_quote_flags; /* per-column CSV FQ flags */ + List *force_notnull; /* list of column names */ + bool force_notnull_all; /* FORCE_NOT_NULL *? */ + bool *force_notnull_flags; /* per-column CSV FNN flags */ + List *force_null; /* list of column names */ + bool force_null_all; /* FORCE_NULL *? */ + bool *force_null_flags; /* per-column CSV FN flags */ + bool convert_selectively; /* do selective binary conversion? */ + CopyOnErrorChoice on_error; /* what to do when error happened */ + CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ + int64 reject_limit; /* maximum tolerable number of errors */ + List *convert_select; /* list of column names (can be NIL) */ + Node *routine; /* CopyToRoutine (can be NULL) */ +} CopyFormatOptions; + typedef struct CopyToStateData *CopyToState; /* @@ -57,4 +129,67 @@ typedef struct CopyToRoutine void (*CopyToEnd) (CopyToState cstate); } CopyToRoutine; +/* + * Represents the different dest cases we need to worry about at + * the bottom level + */ +typedef enum CopyDest +{ + COPY_DEST_FILE, /* to file (or a piped program) */ + COPY_DEST_FRONTEND, /* to frontend */ + COPY_DEST_CALLBACK, /* to callback function */ +} CopyDest; + +typedef void (*copy_data_dest_cb) (void *data, int len); + +/* + * This struct contains all the state variables used throughout a COPY TO + * operation. + * + * Multi-byte encodings: all supported client-side encodings encode multi-byte + * characters by having the first byte's high bit set. Subsequent bytes of the + * character can have the high bit not set. When scanning data in such an + * encoding to look for a match to a single-byte (ie ASCII) character, we must + * use the full pg_encoding_mblen() machinery to skip over multibyte + * characters, else we might find a false match to a trailing byte. In + * supported server encodings, there is no possibility of a false match, and + * it's faster to make useless comparisons to trailing bytes than it is to + * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true + * when we have to do it the hard way. + */ +typedef struct CopyToStateData +{ + /* format routine */ + const CopyToRoutine *routine; + + /* low-level state data */ + CopyDest copy_dest; /* type of copy source/destination */ + FILE *copy_file; /* used if copy_dest == COPY_FILE */ + StringInfo fe_msgbuf; /* used for all dests during COPY TO */ + + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy to */ + QueryDesc *queryDesc; /* executable query to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDOUT */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ + + CopyFormatOptions opts; + Node *whereClause; /* WHERE condition (or NULL) */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + FmgrInfo *out_functions; /* lookup info for output functions */ + MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyToStateData; + #endif /* COPYAPI_H */ -- 2.45.2 From 90174da9f00b736b7c4900f046fcceae8cce74d5 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sat, 28 Sep 2024 23:59:34 +0900 Subject: [PATCH v23 05/10] Add support for implementing custom COPY TO format as extension * Add CopyToStateData::opaque that can be used to keep data for custom COPY TO format implementation * Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf as CopyToStateFlush() --- src/backend/commands/copyto.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 5 +++++ 2 files changed, 19 insertions(+) diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index fb68f42ce1e..93b041352c5 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate) resetStringInfo(fe_msgbuf); } +/* + * CopyToStateFlush + * + * Export CopySendEndOfRow() for extensions. We want to keep + * CopySendEndOfRow() as a static function for + * optimization. CopySendEndOfRow() calls in this file may be optimized by a + * compiler. + */ +void +CopyToStateFlush(CopyToState cstate) +{ + CopySendEndOfRow(cstate); +} + /* * These functions do apply some data conversion */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index b6ddb5f6216..310a37ba728 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -190,6 +190,11 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyToStateData; +extern void CopyToStateFlush(CopyToState cstate); + #endif /* COPYAPI_H */ -- 2.45.2 From e5a3351614bea13c8e5857c5f05bbc656ce5f34d Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:06:20 +0900 Subject: [PATCH v23 06/10] Add CopyFromRoutine This is for implementing custom COPY FROM format. But this is not enough to implement custom COPY FROM format yet. We'll export some APIs to receive data and add "format" option to COPY FROM later. Existing text/csv/binary format implementations don't use CopyFromRoutine for now. We have a patch for it but we defer it. Because there are some mysterious profile results in spite of we get faster runtimes. See [1] for details. [1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz Note that this doesn't change existing text/csv/binary format implementations. --- src/backend/commands/copyfrom.c | 24 ++++++++++-- src/backend/commands/copyfromparse.c | 5 +++ src/include/commands/copyapi.h | 47 +++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 4 ++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 07cbd5d22b8..909375e81b7 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1635,12 +1635,22 @@ BeginCopyFrom(ParseState *pstate, /* Fetch the input function and typioparam info */ if (cstate->opts.binary) + { getTypeBinaryInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + else if (cstate->routine) + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); + else + { getTypeInputInfo(att->atttypid, &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1780,10 +1790,13 @@ BeginCopyFrom(ParseState *pstate, /* Read and verify binary header */ ReceiveCopyBinaryHeader(cstate); } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) + else if (cstate->routine) { + cstate->routine->CopyFromStart(cstate, tupDesc); + } + else + { + /* create workspace for CopyReadAttributes results */ AttrNumber attr_count = list_length(cstate->attnumlist); cstate->max_fields = attr_count; @@ -1801,6 +1814,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + if (cstate->routine) + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index d1d43b53d83..b104e4a9114 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -1003,6 +1003,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Assert(fieldno == attr_count); } + else if (cstate->routine) + { + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; + } else { /* binary */ diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 310a37ba728..81b2f4e5c1f 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * copyapi.h - * API for COPY TO handlers + * API for COPY TO/FROM handlers * * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group @@ -90,6 +90,51 @@ typedef struct CopyFormatOptions Node *routine; /* CopyToRoutine (can be NULL) */ } CopyFormatOptions; +/* This is private in commands/copyfrom.c */ +typedef struct CopyFromStateData *CopyFromState; + +/* + * API structure for a COPY FROM format implementation. Note this must be + * allocated in a server-lifetime manner, typically as a static const struct. + */ +typedef struct CopyFromRoutine +{ + /* + * Called when COPY FROM is started to set up the input functions + * associated with the relation's attributes writing to. `finfo` can be + * optionally filled to provide the catalog information of the input + * function. `typioparam` can be optionally filled to define the OID of + * the type to pass to the input function. `atttypid` is the OID of data + * type used by the relation's attribute. + */ + void (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); + + /* + * Called when COPY FROM is started. + * + * `tupDesc` is the tuple descriptor of the relation where the data needs + * to be copied. This can be used for any initialization steps required + * by a format. + */ + void (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc); + + /* + * Copy one row to a set of `values` and `nulls` of size tupDesc->natts. + * + * 'econtext' is used to evaluate default expression for each column that + * is either not read from the file or is using the DEFAULT option of COPY + * FROM. It is NULL if no default values are used. + * + * Returns false if there are no more tuples to copy. + */ + bool (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + + /* Called when COPY FROM has ended. */ + void (*CopyFromEnd) (CopyFromState cstate); +} CopyFromRoutine; + typedef struct CopyToStateData *CopyToState; /* diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc783..509b9e92a18 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -15,6 +15,7 @@ #define COPYFROM_INTERNAL_H #include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/trigger.h" #include "nodes/miscnodes.h" @@ -58,6 +59,9 @@ typedef enum CopyInsertMethod */ typedef struct CopyFromStateData { + /* format routine */ + const CopyFromRoutine *routine; + /* low-level state data */ CopySource copy_src; /* type of copy source */ FILE *copy_file; /* used if copy_src == COPY_FILE */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 098e7023486..a8422fa4d35 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -492,6 +492,7 @@ ConvertRowtypeExpr CookedConstraint CopyDest CopyFormatOptions +CopyFromRoutine CopyFromState CopyFromStateData CopyHeaderChoice -- 2.45.2 From 89a406fde3ea0dc013d8747b2130353dd911c4c0 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:09:29 +0900 Subject: [PATCH v23 07/10] Use CopyFromRoutine for the existing formats The existing formats are text, csv and binary. If we find any performance regression by this, we will not merge this to master. This will increase indirect function call costs but this will reduce runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)" branch costs. This uses an optimization based of static inline function and a constant argument call for cstate->opts.csv_mode. For example, CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool is_csv" argument instead of using cstate->opts.csv_mode in it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with false (constant) for "bool is_csv". Compiler will remove "if (is_csv)" branch in it by this optimization. This doesn't change existing logic. This just moves existing codes. --- src/backend/commands/copyfrom.c | 215 ++++++--- src/backend/commands/copyfromparse.c | 530 +++++++++++++---------- src/include/commands/copy.h | 2 - src/include/commands/copyfrom_internal.h | 8 + 4 files changed, 471 insertions(+), 284 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 909375e81b7..e6ea9ce1602 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); + +/* + * CopyFromRoutine implementations for text and CSV. + */ + +/* + * CopyFromTextLikeInFunc + * + * Assign input function data for a relation's attribute in text/CSV format. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromTextLikeStart + * + * Start of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * CopyFromTextLikeEnd + * + * End of COPY FROM for text/CSV format. + */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * CopyFromRoutine implementation for "binary". + */ + +/* + * CopyFromBinaryInFunc + * + * Assign input function data for a relation's attribute in binary format. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* + * CopyFromBinaryStart + * + * Start of COPY FROM for binary format. + */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * CopyFromBinaryEnd + * + * End of COPY FROM for binary format. + */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* + * Routines assigned to each format. ++ + * CSV and text share the same implementation, at the exception of the + * per-row callback. + */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* + * Define the COPY FROM routines to use for a format. + */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + + /* * error context callback for COPY FROM * @@ -1396,7 +1547,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1428,6 +1578,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1583,25 +1736,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1634,23 +1768,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - { - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } - else if (cstate->routine) - cstate->routine->CopyFromInFunc(cstate, att->atttypid, - &in_functions[attnum - 1], - &typioparams[attnum - 1]); - - else - { - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - } + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1785,23 +1905,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - else if (cstate->routine) - { - cstate->routine->CopyFromStart(cstate, tupDesc); - } - else - { - /* create workspace for CopyReadAttributes results */ - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1814,8 +1918,7 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { - if (cstate->routine) - cstate->routine->CopyFromEnd(cstate); + cstate->routine->CopyFromEnd(cstate); /* No COPY FROM related resources except memory. */ if (cstate->is_program) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index b104e4a9114..0447c4df7e0 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -140,8 +140,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static pg_attribute_always_inline bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, @@ -741,8 +741,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * * NOTE: force_not_null option are not applied to the returned fields. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static pg_attribute_always_inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -759,13 +759,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is + * constant at caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -809,7 +813,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -819,8 +823,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) if (done && cstate->line_buf.len == 0) return false; - /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + /* + * Parse the line into de-escaped field values + * + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -830,6 +839,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } +/* + * CopyFromTextLikeOneRow + * + * Copy one row to a set of `values` and `nulls` for the text and CSV + * formats. + * + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + */ +static pg_attribute_always_inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; + + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) + { + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; + } + } + + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; + + if (string != NULL) + nulls[m] = false; + + if (cstate->defaults[m]) + { + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } + + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + + cstate->num_errors++; + + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; + + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); + } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); + + /* reset relname_only */ + cstate->relname_only = false; + } + + return true; + } + + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } + + Assert(fieldno == attr_count); + + return true; +} + + +/* + * CopyFromTextOneRow + * + * Per-row callback for COPY FROM with text format. + */ +bool +CopyFromTextOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* + * CopyFromCSVOneRow + * + * Per-row callback for COPY FROM with CSV format. + */ +bool +CopyFromCSVOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} + +/* + * CopyFromBinaryOneRow + * + * Copy one row to a set of `values` and `nulls` for the binary format. + */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + cstate->cur_lineno++; + + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } + + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. + */ + char dummy; + + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return true; +} + /* * Read next tuple from file for COPY FROM. Return false if no more tuples. * @@ -847,221 +1117,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) - { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; - - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; - - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; - - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; - - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } - - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } - - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; - - if (string != NULL) - nulls[m] = false; - - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } - - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - - cstate->num_errors++; - - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; - - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": nullinput", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; - } - - return true; - } - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; - } - - Assert(fieldno == attr_count); - } - else if (cstate->routine) - { - if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) - return false; - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - - cstate->cur_lineno++; - - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; - - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } - - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } - } + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; /* * Now compute and insert any defaults available for the columns not @@ -1092,7 +1162,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1100,7 +1170,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1167,8 +1237,8 @@ CopyReadLine(CopyFromState cstate) /* * CopyReadLineText - inner loop of CopyReadLine for text mode */ -static bool -CopyReadLineText(CopyFromState cstate) +static pg_attribute_always_inline bool +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1183,7 +1253,11 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant at + * caller. + */ + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1260,7 +1334,11 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + /* + * is_csv will be optimized away by compiler, as argument is constant + * at caller. + */ + if (is_csv) { /* * If character is '\r', we may need to look ahead below. Force @@ -1299,7 +1377,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1327,10 +1405,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1344,10 +1422,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1355,15 +1433,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1375,7 +1453,7 @@ CopyReadLineText(CopyFromState cstate) * Process backslash, except in CSV mode where backslash is a normal * character. */ - if (c == '\\' && !cstate->opts.csv_mode) + if (c == '\\' && !is_csv) { char c2; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index dd645eaa030..e5696839637 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -35,8 +35,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); -extern bool NextCopyFromRawFields(CopyFromState cstate, - char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); extern char *CopyLimitPrintoutLength(const char *str); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 509b9e92a18..c11b5ff3cc0 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -187,4 +187,12 @@ typedef struct CopyFromStateData extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); +/* Callbacks for CopyFromRoutine->CopyFromOneRow */ +extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + #endif /* COPYFROM_INTERNAL_H */ -- 2.45.2 From de95d4bd606519df51c8d68b5ab2f3009dfd0cf7 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:17:53 +0900 Subject: [PATCH v23 08/10] Add support for adding custom COPY FROM format This uses the same handler for COPY TO and COPY FROM but uses different routine. This uses CopyToRoutine for COPY TO and CopyFromRoutine for COPY FROM. PostgreSQL calls a COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM and false for COPY TO: copy_handler(true) returns CopyToRoutine copy_handler(false) returns CopyFromRoutine This also add a test module for custom COPY FROM handler. --- src/backend/commands/copy.c | 52 ++++++++++++------- src/backend/commands/copyfrom.c | 4 +- src/include/catalog/pg_type.dat | 2 +- src/include/commands/copyapi.h | 5 +- .../expected/test_copy_format.out | 10 ++-- .../test_copy_format/sql/test_copy_format.sql | 1 + .../test_copy_format/test_copy_format.c | 39 +++++++++++++- 7 files changed, 87 insertions(+), 26 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 02528fbcc1f..c8643b2dee7 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -469,8 +469,8 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate) * This function checks whether the option value is a built-in format such as * "text" and "csv" or not. If the option value isn't a built-in format, this * function finds a COPY format handler that returns a CopyToRoutine (for - * is_from == false). If no COPY format handler is found, this function - * reports an error. + * is_from == false) or CopyFromRountine (for is_from == true). If no COPY + * format handler is found, this function reports an error. */ static void ProcessCopyOptionFormat(ParseState *pstate, @@ -501,12 +501,9 @@ ProcessCopyOptionFormat(ParseState *pstate, } /* custom format */ - if (!is_from) - { - funcargtypes[0] = INTERNALOID; - handlerOid = LookupFuncName(list_make1(makeString(format)), 1, - funcargtypes, true); - } + funcargtypes[0] = INTERNALOID; + handlerOid = LookupFuncName(list_make1(makeString(format)), 1, + funcargtypes, true); if (!OidIsValid(handlerOid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -515,17 +512,34 @@ ProcessCopyOptionFormat(ParseState *pstate, datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from)); routine = (Node *) DatumGetPointer(datum); - if (routine == NULL || !IsA(routine, CopyToRoutine)) - ereport( - ERROR, - (errcode( - ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("COPY handler function " - "%s(%u) did not return a " - "CopyToRoutine struct", - format, handlerOid), - parser_errposition( - pstate, defel->location))); + if (is_from) + { + if (routine == NULL || !IsA(routine, CopyFromRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyFromRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } + else + { + if (routine == NULL || !IsA(routine, CopyToRoutine)) + ereport( + ERROR, + (errcode( + ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY handler function " + "%s(%u) did not return a " + "CopyToRoutine struct", + format, handlerOid), + parser_errposition( + pstate, defel->location))); + } opts_out->routine = routine; } diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index e6ea9ce1602..932f1ff4f6e 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = { static const CopyFromRoutine * CopyFromGetRoutine(CopyFormatOptions opts) { - if (opts.csv_mode) + if (opts.routine) + return (const CopyFromRoutine *) opts.routine; + else if (opts.csv_mode) return &CopyFromRoutineCSV; else if (opts.binary) return &CopyFromRoutineBinary; diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat index 793dd671935..37ebfa0908f 100644 --- a/src/include/catalog/pg_type.dat +++ b/src/include/catalog/pg_type.dat @@ -634,7 +634,7 @@ typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-', typalign => 'i' }, { oid => '8752', - descr => 'pseudo-type for the result of a copy to method function', + descr => 'pseudo-type for the result of a copy to/from method function', typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p', typcategory => 'P', typinput => 'copy_handler_in', typoutput => 'copy_handler_out', typreceive => '-', typsend => '-', diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 81b2f4e5c1f..e9c01492797 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -87,7 +87,8 @@ typedef struct CopyFormatOptions CopyLogVerbosityChoice log_verbosity; /* verbosity of logged messages */ int64 reject_limit; /* maximum tolerable number of errors */ List *convert_select; /* list of column names (can be NIL) */ - Node *routine; /* CopyToRoutine (can be NULL) */ + Node *routine; /* CopyToRoutine or CopyFromRoutine (can be + * NULL) */ } CopyFormatOptions; /* This is private in commands/copyfrom.c */ @@ -99,6 +100,8 @@ typedef struct CopyFromStateData *CopyFromState; */ typedef struct CopyFromRoutine { + NodeTag type; + /* * Called when COPY FROM is started to set up the input functions * associated with the relation's attributes writing to. `finfo` can be diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out b/src/test/modules/test_copy_format/expected/test_copy_format.out index 606c78f6878..4ed7c0b12db 100644 --- a/src/test/modules/test_copy_format/expected/test_copy_format.out +++ b/src/test/modules/test_copy_format/expected/test_copy_format.out @@ -2,9 +2,13 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (format 'test_copy_format'); -ERROR: COPY format "test_copy_format" not recognized -LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')... - ^ +NOTICE: test_copy_format: is_from=true +NOTICE: CopyFromInFunc: atttypid=21 +NOTICE: CopyFromInFunc: atttypid=23 +NOTICE: CopyFromInFunc: atttypid=20 +NOTICE: CopyFromStart: natts=3 +NOTICE: CopyFromOneRow +NOTICE: CopyFromEnd COPY public.test TO stdout WITH (format 'test_copy_format'); NOTICE: test_copy_format: is_from=false NOTICE: CopyToOutFunc: atttypid=21 diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql b/src/test/modules/test_copy_format/sql/test_copy_format.sql index 9406b3be3d4..e805f7cb011 100644 --- a/src/test/modules/test_copy_format/sql/test_copy_format.sql +++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql @@ -2,4 +2,5 @@ CREATE EXTENSION test_copy_format; CREATE TABLE public.test (a smallint, b integer, c bigint); INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); COPY public.test FROM stdin WITH (format 'test_copy_format'); +\. COPY public.test TO stdout WITH (format 'test_copy_format'); diff --git a/src/test/modules/test_copy_format/test_copy_format.c b/src/test/modules/test_copy_format/test_copy_format.c index e064f40473b..f6b105659ab 100644 --- a/src/test/modules/test_copy_format/test_copy_format.c +++ b/src/test/modules/test_copy_format/test_copy_format.c @@ -18,6 +18,40 @@ PG_MODULE_MAGIC; +static void +CopyFromInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid))); +} + +static void +CopyFromStart(CopyFromState cstate, TupleDesc tupDesc) +{ + ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts))); +} + +static bool +CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + ereport(NOTICE, (errmsg("CopyFromOneRow"))); + return false; +} + +static void +CopyFromEnd(CopyFromState cstate) +{ + ereport(NOTICE, (errmsg("CopyFromEnd"))); +} + +static const CopyFromRoutine CopyFromRoutineTestCopyFormat = { + .type = T_CopyFromRoutine, + .CopyFromInFunc = CopyFromInFunc, + .CopyFromStart = CopyFromStart, + .CopyFromOneRow = CopyFromOneRow, + .CopyFromEnd = CopyFromEnd, +}; + static void CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo) { @@ -59,5 +93,8 @@ test_copy_format(PG_FUNCTION_ARGS) ereport(NOTICE, (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false"))); - PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); + if (is_from) + PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat); + else + PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat); } -- 2.45.2 From 0f4d74f1750467f4ac3b87b24e03271579793651 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:28:02 +0900 Subject: [PATCH v23 09/10] Export CopyFromStateData It's for custom COPY FROM format handlers implemented as extension. This just moves codes. This doesn't change codes except CopySource enum values. This changes COPY_ prefix of CopySource enum values to COPY_SOURCE_ prefix like the CopyDest enum values prefix change. For example, COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE. Note that this isn't enough to implement custom COPY FROM format handlers as extension. We'll do the followings in a subsequent commit: 1. Add an opaque space for custom COPY FROM format handler 2. Export CopyReadBinaryData() to read the next data --- src/backend/commands/copyfrom.c | 4 +- src/backend/commands/copyfromparse.c | 10 +- src/include/commands/copy.h | 5 - src/include/commands/copyapi.h | 168 ++++++++++++++++++++++- src/include/commands/copyfrom_internal.h | 165 ---------------------- 5 files changed, 174 insertions(+), 178 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 932f1ff4f6e..d758e66c6a1 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -1716,7 +1716,7 @@ BeginCopyFrom(ParseState *pstate, pg_encoding_to_char(GetDatabaseEncoding())))); } - cstate->copy_src = COPY_FILE; /* default */ + cstate->copy_src = COPY_SOURCE_FILE; /* default */ cstate->whereClause = whereClause; @@ -1844,7 +1844,7 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; - cstate->copy_src = COPY_CALLBACK; + cstate->copy_src = COPY_SOURCE_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 0447c4df7e0..ccfbacb4a37 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -171,7 +171,7 @@ ReceiveCopyBegin(CopyFromState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_src = COPY_FRONTEND; + cstate->copy_src = COPY_SOURCE_FRONTEND; cstate->fe_msgbuf = makeStringInfo(); /* We *must* flush here to ensure FE knows it can send. */ pq_flush(); @@ -239,7 +239,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) switch (cstate->copy_src) { - case COPY_FILE: + case COPY_SOURCE_FILE: bytesread = fread(databuf, 1, maxread, cstate->copy_file); if (ferror(cstate->copy_file)) ereport(ERROR, @@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) if (bytesread == 0) cstate->raw_reached_eof = true; break; - case COPY_FRONTEND: + case COPY_SOURCE_FRONTEND: while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof) { int avail; @@ -331,7 +331,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; - case COPY_CALLBACK: + case COPY_SOURCE_CALLBACK: bytesread = cstate->data_source_cb(databuf, minread, maxread); break; } @@ -1179,7 +1179,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv) * after \. up to the protocol end of copy data. (XXX maybe better * not to treat \. as special?) */ - if (cstate->copy_src == COPY_FRONTEND) + if (cstate->copy_src == COPY_SOURCE_FRONTEND) { int inbytes; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index e5696839637..e2411848e9f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -19,11 +19,6 @@ #include "parser/parse_node.h" #include "tcop/dest.h" -/* This is private in commands/copyfrom.c */ -typedef struct CopyFromStateData *CopyFromState; - -typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); - extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, uint64 *processed); diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index e9c01492797..0274e3487c3 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -91,7 +91,6 @@ typedef struct CopyFormatOptions * NULL) */ } CopyFormatOptions; -/* This is private in commands/copyfrom.c */ typedef struct CopyFromStateData *CopyFromState; /* @@ -138,6 +137,173 @@ typedef struct CopyFromRoutine void (*CopyFromEnd) (CopyFromState cstate); } CopyFromRoutine; +/* + * Represents the different source cases we need to worry about at + * the bottom level + */ +typedef enum CopySource +{ + COPY_SOURCE_FILE, /* from file (or a piped program) */ + COPY_SOURCE_FRONTEND, /* from frontend */ + COPY_SOURCE_CALLBACK, /* from callback function */ +} CopySource; + +/* + * Represents the end-of-line terminator type of the input + */ +typedef enum EolType +{ + EOL_UNKNOWN, + EOL_NL, + EOL_CR, + EOL_CRNL, +} EolType; + +/* + * Represents the insert method to be used during COPY FROM. + */ +typedef enum CopyInsertMethod +{ + CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ + CIM_MULTI, /* always use table_multi_insert or + * ExecForeignBatchInsert */ + CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + * ExecForeignBatchInsert only if valid */ +} CopyInsertMethod; + +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); + +/* + * This struct contains all the state variables used throughout a COPY FROM + * operation. + */ +typedef struct CopyFromStateData +{ + /* format routine */ + const CopyFromRoutine *routine; + + /* low-level state data */ + CopySource copy_src; /* type of copy source */ + FILE *copy_file; /* used if copy_src == COPY_FILE */ + StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ + + EolType eol_type; /* EOL type of input */ + int file_encoding; /* file or remote side's character encoding */ + bool need_transcoding; /* file encoding diff from server? */ + Oid conversion_proc; /* encoding conversion function */ + + /* parameters from the COPY command */ + Relation rel; /* relation to copy from */ + List *attnumlist; /* integer list of attnums to copy */ + char *filename; /* filename, or NULL for STDIN */ + bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data */ + + CopyFormatOptions opts; + bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ + Node *whereClause; /* WHERE condition (or NULL) */ + + /* these are just for error messages, see CopyFromErrorCallback */ + const char *cur_relname; /* table name for error messages */ + uint64 cur_lineno; /* line number for error messages */ + const char *cur_attname; /* current att for error messages */ + const char *cur_attval; /* current att value for error messages */ + bool relname_only; /* don't output line number, att, etc. */ + + /* + * Working state + */ + MemoryContext copycontext; /* per-copy execution context */ + + AttrNumber num_defaults; /* count of att that are missing and have + * default value */ + FmgrInfo *in_functions; /* array of input functions for each attrs */ + Oid *typioparams; /* array of element types for in_functions */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions + * execution */ + uint64 num_errors; /* total number of rows which contained soft + * errors */ + int *defmap; /* array of default att numbers related to + * missing att */ + ExprState **defexprs; /* array of default att expressions for all + * att */ + bool *defaults; /* if DEFAULT marker was found for + * corresponding att */ + bool volatile_defexprs; /* is any of defexprs volatile? */ + List *range_table; /* single element list of RangeTblEntry */ + List *rteperminfos; /* single element list of RTEPermissionInfo */ + ExprState *qualexpr; + + TransitionCaptureState *transition_capture; + + /* + * These variables are used to reduce overhead in COPY FROM. + * + * attribute_buf holds the separated, de-escaped text for each field of + * the current line. The CopyReadAttributes functions return arrays of + * pointers into this buffer. We avoid palloc/pfree overhead by re-using + * the buffer on each cycle. + * + * In binary COPY FROM, attribute_buf holds the binary data for the + * current field, but the usage is otherwise similar. + */ + StringInfoData attribute_buf; + + /* field raw data pointers found by COPY FROM */ + + int max_fields; + char **raw_fields; + + /* + * Similarly, line_buf holds the whole input line being processed. The + * input cycle is first to read the whole line into line_buf, and then + * extract the individual attribute fields into attribute_buf. line_buf + * is preserved unmodified so that we can display it in error messages if + * appropriate. (In binary mode, line_buf is not used.) + */ + StringInfoData line_buf; + bool line_buf_valid; /* contains the row being processed? */ + + /* + * input_buf holds input data, already converted to database encoding. + * + * In text mode, CopyReadLine parses this data sufficiently to locate line + * boundaries, then transfers the data to line_buf. We guarantee that + * there is a \0 at input_buf[input_buf_len] at all times. (In binary + * mode, input_buf is not used.) + * + * If encoding conversion is not required, input_buf is not a separate + * buffer but points directly to raw_buf. In that case, input_buf_len + * tracks the number of bytes that have been verified as valid in the + * database encoding, and raw_buf_len is the total number of bytes stored + * in the buffer. + */ +#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ + char *input_buf; + int input_buf_index; /* next byte to process */ + int input_buf_len; /* total # of bytes stored */ + bool input_reached_eof; /* true if we reached EOF */ + bool input_reached_error; /* true if a conversion error happened */ + /* Shorthand for number of unconsumed bytes available in input_buf */ +#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) + + /* + * raw_buf holds raw input data read from the data source (file or client + * connection), not yet converted to the database encoding. Like with + * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. + */ +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ + char *raw_buf; + int raw_buf_index; /* next byte to process */ + int raw_buf_len; /* total # of bytes stored */ + bool raw_reached_eof; /* true if we reached EOF */ + + /* Shorthand for number of unconsumed bytes available in raw_buf */ +#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) + + uint64 bytes_processed; /* number of bytes processed so far */ +} CopyFromStateData; + typedef struct CopyToStateData *CopyToState; /* diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c11b5ff3cc0..3863d26d5b7 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -19,171 +19,6 @@ #include "commands/trigger.h" #include "nodes/miscnodes.h" -/* - * Represents the different source cases we need to worry about at - * the bottom level - */ -typedef enum CopySource -{ - COPY_FILE, /* from file (or a piped program) */ - COPY_FRONTEND, /* from frontend */ - COPY_CALLBACK, /* from callback function */ -} CopySource; - -/* - * Represents the end-of-line terminator type of the input - */ -typedef enum EolType -{ - EOL_UNKNOWN, - EOL_NL, - EOL_CR, - EOL_CRNL, -} EolType; - -/* - * Represents the insert method to be used during COPY FROM. - */ -typedef enum CopyInsertMethod -{ - CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or - * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or - * ExecForeignBatchInsert only if valid */ -} CopyInsertMethod; - -/* - * This struct contains all the state variables used throughout a COPY FROM - * operation. - */ -typedef struct CopyFromStateData -{ - /* format routine */ - const CopyFromRoutine *routine; - - /* low-level state data */ - CopySource copy_src; /* type of copy source */ - FILE *copy_file; /* used if copy_src == COPY_FILE */ - StringInfo fe_msgbuf; /* used if copy_src == COPY_FRONTEND */ - - EolType eol_type; /* EOL type of input */ - int file_encoding; /* file or remote side's character encoding */ - bool need_transcoding; /* file encoding diff from server? */ - Oid conversion_proc; /* encoding conversion function */ - - /* parameters from the COPY command */ - Relation rel; /* relation to copy from */ - List *attnumlist; /* integer list of attnums to copy */ - char *filename; /* filename, or NULL for STDIN */ - bool is_program; /* is 'filename' a program to popen? */ - copy_data_source_cb data_source_cb; /* function for reading data */ - - CopyFormatOptions opts; - bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ - Node *whereClause; /* WHERE condition (or NULL) */ - - /* these are just for error messages, see CopyFromErrorCallback */ - const char *cur_relname; /* table name for error messages */ - uint64 cur_lineno; /* line number for error messages */ - const char *cur_attname; /* current att for error messages */ - const char *cur_attval; /* current att value for error messages */ - bool relname_only; /* don't output line number, att, etc. */ - - /* - * Working state - */ - MemoryContext copycontext; /* per-copy execution context */ - - AttrNumber num_defaults; /* count of att that are missing and have - * default value */ - FmgrInfo *in_functions; /* array of input functions for each attrs */ - Oid *typioparams; /* array of element types for in_functions */ - ErrorSaveContext *escontext; /* soft error trapper during in_functions - * execution */ - uint64 num_errors; /* total number of rows which contained soft - * errors */ - int *defmap; /* array of default att numbers related to - * missing att */ - ExprState **defexprs; /* array of default att expressions for all - * att */ - bool *defaults; /* if DEFAULT marker was found for - * corresponding att */ - bool volatile_defexprs; /* is any of defexprs volatile? */ - List *range_table; /* single element list of RangeTblEntry */ - List *rteperminfos; /* single element list of RTEPermissionInfo */ - ExprState *qualexpr; - - TransitionCaptureState *transition_capture; - - /* - * These variables are used to reduce overhead in COPY FROM. - * - * attribute_buf holds the separated, de-escaped text for each field of - * the current line. The CopyReadAttributes functions return arrays of - * pointers into this buffer. We avoid palloc/pfree overhead by re-using - * the buffer on each cycle. - * - * In binary COPY FROM, attribute_buf holds the binary data for the - * current field, but the usage is otherwise similar. - */ - StringInfoData attribute_buf; - - /* field raw data pointers found by COPY FROM */ - - int max_fields; - char **raw_fields; - - /* - * Similarly, line_buf holds the whole input line being processed. The - * input cycle is first to read the whole line into line_buf, and then - * extract the individual attribute fields into attribute_buf. line_buf - * is preserved unmodified so that we can display it in error messages if - * appropriate. (In binary mode, line_buf is not used.) - */ - StringInfoData line_buf; - bool line_buf_valid; /* contains the row being processed? */ - - /* - * input_buf holds input data, already converted to database encoding. - * - * In text mode, CopyReadLine parses this data sufficiently to locate line - * boundaries, then transfers the data to line_buf. We guarantee that - * there is a \0 at input_buf[input_buf_len] at all times. (In binary - * mode, input_buf is not used.) - * - * If encoding conversion is not required, input_buf is not a separate - * buffer but points directly to raw_buf. In that case, input_buf_len - * tracks the number of bytes that have been verified as valid in the - * database encoding, and raw_buf_len is the total number of bytes stored - * in the buffer. - */ -#define INPUT_BUF_SIZE 65536 /* we palloc INPUT_BUF_SIZE+1 bytes */ - char *input_buf; - int input_buf_index; /* next byte to process */ - int input_buf_len; /* total # of bytes stored */ - bool input_reached_eof; /* true if we reached EOF */ - bool input_reached_error; /* true if a conversion error happened */ - /* Shorthand for number of unconsumed bytes available in input_buf */ -#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index) - - /* - * raw_buf holds raw input data read from the data source (file or client - * connection), not yet converted to the database encoding. Like with - * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len]. - */ -#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ - char *raw_buf; - int raw_buf_index; /* next byte to process */ - int raw_buf_len; /* total # of bytes stored */ - bool raw_reached_eof; /* true if we reached EOF */ - - /* Shorthand for number of unconsumed bytes available in raw_buf */ -#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) - - uint64 bytes_processed; /* number of bytes processed so far */ -} CopyFromStateData; - extern void ReceiveCopyBegin(CopyFromState cstate); extern void ReceiveCopyBinaryHeader(CopyFromState cstate); -- 2.45.2 From f9908c010646fdf182615a2e3632395ae9d9c4f3 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <kou@clear-code.com> Date: Sun, 29 Sep 2024 00:32:31 +0900 Subject: [PATCH v23 10/10] Add support for implementing custom COPY FROM format as extension * Add CopyFromStateData::opaque that can be used to keep data for custom COPY From format implementation * Export CopyReadBinaryData() to read the next data as CopyFromStateRead() --- src/backend/commands/copyfromparse.c | 14 ++++++++++++++ src/include/commands/copyapi.h | 6 ++++++ 2 files changed, 20 insertions(+) diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index ccfbacb4a37..4fa23d992f5 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -730,6 +730,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) return copied_bytes; } +/* + * CopyFromStateRead + * + * Export CopyReadBinaryData() for extensions. We want to keep + * CopyReadBinaryData() as a static function for + * optimization. CopyReadBinaryData() calls in this file may be optimized by + * a compiler. + */ +int +CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes) +{ + return CopyReadBinaryData(cstate, dest, nbytes); +} + /* * Read raw fields in the next line for COPY FROM in text or csv mode. * Return false if no more lines. diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h index 0274e3487c3..2de610ef729 100644 --- a/src/include/commands/copyapi.h +++ b/src/include/commands/copyapi.h @@ -302,8 +302,14 @@ typedef struct CopyFromStateData #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) uint64 bytes_processed; /* number of bytes processed so far */ + + /* For custom format implementation */ + void *opaque; /* private space */ } CopyFromStateData; +extern int CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes); + + typedef struct CopyToStateData *CopyToState; /* -- 2.45.2
pgsql-hackers by date: