From df9e25b8517ab5dc6dd9f73ed7ad91fc20b1f938 Mon Sep 17 00:00:00 2001 From: Zhao Junwang Date: Wed, 6 Dec 2023 19:13:22 +0800 Subject: [PATCH v4] Extract COPY handlers --- src/backend/commands/copy.c | 44 ++++ src/backend/commands/copyfrom.c | 275 ++++++++++++--------- src/backend/commands/copyfromparse.c | 309 ++++++++++++----------- src/backend/commands/copyto.c | 354 +++++++++++++++------------ src/include/commands/copy.h | 49 +++- 5 files changed, 619 insertions(+), 412 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b562..6ae904c1b8 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; + /* Text is the default format. */ + opts_out->handler = CopyHandlerOpsText; /* Extract options from the statement node tree */ foreach(option, options) { @@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) + { opts_out->csv_mode = true; + opts_out->handler = CopyHandlerOpsCSV; + } else if (strcmp(fmt, "binary") == 0) + { opts_out->binary = true; + opts_out->handler = CopyHandlerOpsBinary; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -864,3 +872,39 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) return attnums; } + +const CopyHandlerOps CopyHandlerOpsText = { + .copy_to_start = CopyToFormatTextStart, + .copy_to_one_row = CopyToFormatTextOneRow, + .copy_to_end = CopyToFormatTextEnd, + .copy_from_start = CopyFromFormatTextStart, + .copy_from_next = CopyFromFormatTextNext, + .copy_from_error_callback = CopyFromFormatTextErrorCallback, + .copy_from_end = NULL, +}; + +/* + * We can use the same CopyHandlerOps for both of "text" and "csv" because + * CopyToFormatText*() refer cstate->opts.csv_mode and change their + * behavior. We can split the implementations and stop referring + * cstate->opts.csv_mode later. + */ +const CopyHandlerOps CopyHandlerOpsCSV = { + .copy_to_start = CopyToFormatTextStart, + .copy_to_one_row = CopyToFormatTextOneRow, + .copy_to_end = CopyToFormatTextEnd, + .copy_from_start = CopyFromFormatTextStart, + .copy_from_next = CopyFromFormatTextNext, + .copy_from_error_callback = CopyFromFormatTextErrorCallback, + .copy_from_end = NULL, +}; + +const CopyHandlerOps CopyHandlerOpsBinary = { + .copy_to_start = CopyToFormatBinaryStart, + .copy_to_one_row = CopyToFormatBinaryOneRow, + .copy_to_end = CopyToFormatBinaryEnd, + .copy_from_start = CopyFromFormatBinaryStart, + .copy_from_next = CopyFromFormatBinaryNext, + .copy_from_error_callback = CopyFromFormatBinaryErrorCallback, + .copy_from_end = NULL, +}; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f4861652a9..01c3c1c84f 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -107,6 +107,71 @@ static char *limit_printout_length(const char *str); static void ClosePipeFromProgram(CopyFromState cstate); +void +CopyFromFormatBinaryErrorCallback(CopyFromState cstate) +{ + /* can't usefully display the data */ + if (cstate->cur_attname) + errcontext("COPY %s, line %llu, column %s", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname); + else + errcontext("COPY %s, line %llu", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno); +} + +void +CopyFromFormatTextErrorCallback(CopyFromState cstate) +{ + if (cstate->cur_attname && cstate->cur_attval) + { + /* error is relevant to a particular column */ + char *attval; + + attval = limit_printout_length(cstate->cur_attval); + errcontext("COPY %s, line %llu, column %s: \"%s\"", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval); + pfree(attval); + } + else if (cstate->cur_attname) + { + /* error is relevant to a particular column, value is NULL */ + errcontext("COPY %s, line %llu, column %s: null input", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname); + } + else + { + /* + * Error is relevant to a particular line. + * + * If line_buf still contains the correct line, print it. + */ + if (cstate->line_buf_valid) + { + char *lineval; + + lineval = limit_printout_length(cstate->line_buf.data); + errcontext("COPY %s, line %llu: \"%s\"", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno, lineval); + pfree(lineval); + } + else + { + errcontext("COPY %s, line %llu", + cstate->cur_relname, + (unsigned long long) cstate->cur_lineno); + } + } +} + /* * error context callback for COPY FROM * @@ -123,67 +188,7 @@ CopyFromErrorCallback(void *arg) cstate->cur_relname); return; } - if (cstate->opts.binary) - { - /* can't usefully display the data */ - if (cstate->cur_attname) - errcontext("COPY %s, line %llu, column %s", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname); - else - errcontext("COPY %s, line %llu", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno); - } - else - { - if (cstate->cur_attname && cstate->cur_attval) - { - /* error is relevant to a particular column */ - char *attval; - - attval = limit_printout_length(cstate->cur_attval); - errcontext("COPY %s, line %llu, column %s: \"%s\"", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval); - pfree(attval); - } - else if (cstate->cur_attname) - { - /* error is relevant to a particular column, value is NULL */ - errcontext("COPY %s, line %llu, column %s: null input", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname); - } - else - { - /* - * Error is relevant to a particular line. - * - * If line_buf still contains the correct line, print it. - */ - if (cstate->line_buf_valid) - { - char *lineval; - - lineval = limit_printout_length(cstate->line_buf.data); - errcontext("COPY %s, line %llu: \"%s\"", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno, lineval); - pfree(lineval); - } - else - { - errcontext("COPY %s, line %llu", - cstate->cur_relname, - (unsigned long long) cstate->cur_lineno); - } - } - } + cstate->opts.handler.copy_from_error_callback(cstate); } /* @@ -1320,6 +1325,101 @@ CopyFrom(CopyFromState cstate) return processed; } +void +CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber num_phys_attrs; + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + num_phys_attrs = tupDesc->natts; + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeBinaryInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; +} + +void +CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc) +{ + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + AttrNumber attr_count, + num_phys_attrs; + + num_phys_attrs = tupDesc->natts; + + /* + * If encoding conversion is needed, we need another buffer to hold + * the converted input data. Otherwise, we can just point input_buf + * to the same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* create workspace for CopyReadAttributes results */ + attr_count = list_length(cstate->attnumlist); + + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); + + /* + * Pick up the required catalog information for each attribute in the + * relation, including the input function, the element type (to pass to + * the input function), and info about defaults and constraints. (Which + * input function we use depends on text/binary format choice.) + */ + in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); + + for (int attnum = 1; attnum <= num_phys_attrs; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); + + /* We don't need info for dropped attributes */ + if (att->attisdropped) + continue; + + /* Fetch the input function and typioparam info */ + getTypeInputInfo(att->atttypid, + &in_func_oid, &typioparams[attnum - 1]); + fmgr_info(in_func_oid, &in_functions[attnum - 1]); + } + cstate->in_functions = in_functions; + cstate->typioparams = typioparams; +} + /* * Setup to read tuples from a file for COPY FROM. * @@ -1348,9 +1448,6 @@ BeginCopyFrom(ParseState *pstate, TupleDesc tupDesc; AttrNumber num_phys_attrs, num_defaults; - FmgrInfo *in_functions; - Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1518,25 +1615,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1546,17 +1624,10 @@ BeginCopyFrom(ParseState *pstate, cstate->rteperminfos = pstate->p_rteperminfos; } + cstate->opts.handler.copy_from_start(cstate, tupDesc); + num_defaults = 0; volatile_defexprs = false; - - /* - * Pick up the required catalog information for each attribute in the - * relation, including the input function, the element type (to pass to - * the input function), and info about defaults and constraints. (Which - * input function we use depends on text/binary format choice.) - */ - in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); defmap = (int *) palloc(num_phys_attrs * sizeof(int)); defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); @@ -1568,15 +1639,6 @@ BeginCopyFrom(ParseState *pstate, if (att->attisdropped) continue; - /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); - /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1636,8 +1698,6 @@ BeginCopyFrom(ParseState *pstate, cstate->bytes_processed = 0; /* We keep those variables in cstate. */ - cstate->in_functions = in_functions; - cstate->typioparams = typioparams; cstate->defmap = defmap; cstate->defexprs = defexprs; cstate->volatile_defexprs = volatile_defexprs; @@ -1716,15 +1776,6 @@ BeginCopyFrom(ParseState *pstate, ReceiveCopyBinaryHeader(cstate); } - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } - MemoryContextSwitchTo(oldcontext); return cstate; diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f553734582..e840ebb108 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return true; } -/* - * Read next tuple from file for COPY FROM. Return false if no more tuples. - * - * 'econtext' is used to evaluate default expression for each column that is - * either not read from the file or is using the DEFAULT option of COPY FROM. - * It can be NULL when no default values are used, i.e. when all columns are - * read from the file, and DEFAULT option is unset. - * - * 'values' and 'nulls' arrays must be the same length as columns of the - * relation passed to BeginCopyFrom. This function fills the arrays. - */ bool -NextCopyFrom(CopyFromState cstate, ExprContext *econtext, - Datum *values, bool *nulls) +CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) { TupleDesc tupDesc; - AttrNumber num_phys_attrs, - attr_count, - num_defaults = cstate->num_defaults; + AttrNumber attr_count; + int16 fld_count; + ListCell *cur; FmgrInfo *in_functions = cstate->in_functions; Oid *typioparams = cstate->typioparams; - int i; - int *defmap = cstate->defmap; - ExprState **defexprs = cstate->defexprs; - tupDesc = RelationGetDescr(cstate->rel); - num_phys_attrs = tupDesc->natts; attr_count = list_length(cstate->attnumlist); - /* Initialize all values for row to NULL */ - MemSet(values, 0, num_phys_attrs * sizeof(Datum)); - MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + cstate->cur_lineno++; - if (!cstate->opts.binary) + if (!CopyGetInt16(cstate, &fld_count)) { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + if (fld_count == -1) + { + /* + * Received EOF marker. Wait for the protocol-level EOF, and + * complain if it doesn't come immediately. In COPY FROM STDIN, + * this ensures that we correctly handle CopyFail, if client + * chooses to send that now. When copying from file, we could + * ignore the rest of the file like in text mode, but we choose to + * be consistent with the COPY FROM STDIN case. + */ + char dummy; - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + errmsg("received copy data after EOF marker"))); + return false; + } - fieldno = 0; + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + tupDesc = RelationGetDescr(cstate->rel); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; + return true; +} - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) - { - /* ignore input field, leaving column as NULL */ - continue; - } +bool +CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno = 0; + char *string; - if (cstate->opts.csv_mode) - { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } - } + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; + tupDesc = RelationGetDescr(cstate->rel); + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); - if (string != NULL) - nulls[m] = false; + if (fieldno >= fldct) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; - if (cstate->defaults[m]) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) + { + /* ignore input field, leaving column as NULL */ + continue; + } + + if (cstate->opts.csv_mode) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) { /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. + * FORCE_NOT_NULL option is set and column is NULL - + * convert it to the NULL string. */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + string = cstate->opts.null_print; + } + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) + { + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the + * string would already have been set to NULL. Convert it + * to NULL as specified. + */ + string = NULL; } - else - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); - - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; } - Assert(fieldno == attr_count); - } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; - cstate->cur_lineno++; + if (string != NULL) + nulls[m] = false; - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) + if (cstate->defaults[m]) { /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. */ - char dummy; + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } + else + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; + } - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } + Assert(fieldno == attr_count); + return true; +} + +/* + * Read next tuple from file for COPY FROM. Return false if no more tuples. + * + * 'econtext' is used to evaluate default expression for each column that is + * either not read from the file or is using the DEFAULT option of COPY FROM. + * It can be NULL when no default values are used, i.e. when all columns are + * read from the file, and DEFAULT option is unset. + * + * 'values' and 'nulls' arrays must be the same length as columns of the + * relation passed to BeginCopyFrom. This function fills the arrays. + */ +bool +NextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber num_phys_attrs, + num_defaults = cstate->num_defaults; + int i; + int *defmap = cstate->defmap; + ExprState **defexprs = cstate->defexprs; + + tupDesc = RelationGetDescr(cstate->rel); + num_phys_attrs = tupDesc->natts; + + /* Initialize all values for row to NULL */ + MemSet(values, 0, num_phys_attrs * sizeof(Datum)); + MemSet(nulls, true, num_phys_attrs * sizeof(bool)); + MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); + + if (!cstate->opts.handler.copy_from_next(cstate, econtext, values, nulls)) + { + return false; } /* diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c66a047c4a..4538bc6292 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyHandlerOps implementation of COPY TO for "text" and "csv". + * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior. + * We can split this implementation and stop referring cstate->opts.csv_mode + * later. + */ + +static void +CopyToFormatTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopySendEndOfRow(cstate); +} + +void +CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file + * encoding, because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToFormatTextSendEndOfRow(cstate); + } +} + +void +CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + CopySendString(cstate, cstate->opts.null_print_client); + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToFormatTextSendEndOfRow(cstate); +} + +void +CopyToFormatTextEnd(CopyToState cstate) +{ +} + +/* + * CopyHandlerOps implementation for "binary" COPY TO. + */ + +void +CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* Generate header for a binary copy */ + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + CopySendInt32(cstate, 0); + /* No header extension */ + CopySendInt32(cstate, 0); +} + +void +CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + CopySendInt32(cstate, -1); + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +void +CopyToFormatBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} /* * Send copy start/stop messages for frontend copies. These have changed @@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.handler.copy_to_start(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.handler.copy_to_end(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.handler.copy_to_one_row(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b90b..5b3ffcd190 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -30,6 +30,34 @@ typedef enum CopyHeaderChoice COPY_HEADER_MATCH, } CopyHeaderChoice; +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* Routines for a COPY HANDLER implementation. */ +typedef struct CopyHandlerOps +{ + /* Called when COPY TO is started. This will send a header. */ + void (*copy_to_start) (CopyToState cstate, TupleDesc tupDesc); + + /* Copy one row for COPY TO. */ + void (*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO is ended. This will send a trailer. */ + void (*copy_to_end) (CopyToState cstate); + + void (*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc); + bool (*copy_from_next) (CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); + void (*copy_from_error_callback) (CopyFromState cstate); + void (*copy_from_end) (CopyFromState cstate); +} CopyHandlerOps; + +/* Predefined CopyToFormatOps for "text", "csv" and "binary". */ +extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsText; +extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsCSV; +extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsBinary; + /* * A struct to hold COPY options, in a parsed form. All of these are related * to formatting, except for 'freeze', which doesn't really belong here, but @@ -63,12 +91,9 @@ typedef struct CopyFormatOptions bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ + CopyHandlerOps handler; /* copy handler operations */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); typedef void (*copy_data_dest_cb) (void *data, int len); @@ -102,4 +127,20 @@ extern uint64 DoCopyTo(CopyToState cstate); extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); +extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatTextEnd(CopyToState cstate); +extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatTextErrorCallback(CopyFromState cstate); + +extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc); +extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot); +extern void CopyToFormatBinaryEnd(CopyToState cstate); +extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); +extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate); + #endif /* COPY_H */ -- 2.41.0