Re: Make COPY format extendable: Extract COPY TO format implementations - Mailing list pgsql-hackers

From Sutou Kouhei
Subject Re: Make COPY format extendable: Extract COPY TO format implementations
Date
Msg-id 20241105.174328.1705956947135248653.kou@clear-code.com
Whole thread Raw
In response to Make COPY format extendable: Extract COPY TO format implementations  (Sutou Kouhei <kou@clear-code.com>)
List pgsql-hackers
Hi,

In <CAD21AoAdj-EJOH1o2fTLke-uskSvuenT--fKW9nkLzYcLwU_eg@mail.gmail.com>
  "Re: Make COPY format extendable: Extract COPY TO format implementations" on Mon, 4 Nov 2024 22:19:07 -0800,
  Masahiko Sawada <sawada.mshk@gmail.com> wrote:

> I've further investigated the performance regression, and found out it
> might be relevant that the compiler doesn't inline the
> CopyFromTextLikeOneRow() function. It might be worth testing with
> pg_attribute_always_inline instead of 'inline' as below:

Wow! Good catch!

I've rebased on the current master and updated the v20 and
v21 patch sets with "pg_attribute_always_inline" not
"inline".

The v22 patch set is for the v20 patch set.
(TO/FROM changes are in one commit.)

The v23 patch set is for the v21 patch set.
(TO/FROM changes are separated for easy to merge only FROM
or TO part.)


I'll run benchmark on my environment again.


Thanks,
-- 
kou
From 960414f4d256b0d250a70156aac50f88e07de19a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Mon, 4 Mar 2024 13:52:34 +0900
Subject: [PATCH v22 1/5] Add CopyToRoutine/CopyFromRountine

They are for implementing custom COPY TO/FROM format. But this is not
enough to implement custom COPY TO/FROM format yet. We'll export some
APIs to receive/send data and add "format" option to COPY TO/FROM
later.

Existing text/csv/binary format implementations don't use
CopyToRoutine/CopyFromRoutine for now. We have a patch for it but we
defer it. Because there are some mysterious profile results in spite
of we get faster runtimes. See [1] for details.

[1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz

Note that this doesn't change existing text/csv/binary format
implementations.
---
 src/backend/commands/copyfrom.c          |  24 +++++-
 src/backend/commands/copyfromparse.c     |   5 ++
 src/backend/commands/copyto.c            |  31 ++++++-
 src/include/commands/copyapi.h           | 101 +++++++++++++++++++++++
 src/include/commands/copyfrom_internal.h |   4 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 159 insertions(+), 8 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b8..909375e81b7 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1635,12 +1635,22 @@ BeginCopyFrom(ParseState *pstate,
 
         /* Fetch the input function and typioparam info */
         if (cstate->opts.binary)
+        {
             getTypeBinaryInputInfo(att->atttypid,
                                    &in_func_oid, &typioparams[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                            &in_functions[attnum - 1],
+                                            &typioparams[attnum - 1]);
+
         else
+        {
             getTypeInputInfo(att->atttypid,
                              &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1780,10 +1790,13 @@ BeginCopyFrom(ParseState *pstate,
         /* Read and verify binary header */
         ReceiveCopyBinaryHeader(cstate);
     }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
+    else if (cstate->routine)
     {
+        cstate->routine->CopyFromStart(cstate, tupDesc);
+    }
+    else
+    {
+        /* create workspace for CopyReadAttributes results */
         AttrNumber    attr_count = list_length(cstate->attnumlist);
 
         cstate->max_fields = attr_count;
@@ -1801,6 +1814,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    if (cstate->routine)
+        cstate->routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index d1d43b53d83..b104e4a9114 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1003,6 +1003,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 
         Assert(fieldno == attr_count);
     }
+    else if (cstate->routine)
+    {
+        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+            return false;
+    }
     else
     {
         /* binary */
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index f55e6d96751..405e1782685 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -20,6 +20,7 @@
 
 #include "access/tableam.h"
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -64,6 +65,9 @@ typedef enum CopyDest
  */
 typedef struct CopyToStateData
 {
+    /* format routine */
+    const CopyToRoutine *routine;
+
     /* low-level state data */
     CopyDest    copy_dest;        /* type of copy source/destination */
     FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
@@ -776,14 +780,22 @@ DoCopyTo(CopyToState cstate)
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
         if (cstate->opts.binary)
+        {
             getTypeBinaryOutputInfo(attr->atttypid,
                                     &out_func_oid,
                                     &isvarlena);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                           &cstate->out_functions[attnum - 1]);
         else
+        {
             getTypeOutputInfo(attr->atttypid,
                               &out_func_oid,
                               &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
     }
 
     /*
@@ -810,6 +822,8 @@ DoCopyTo(CopyToState cstate)
         tmp = 0;
         CopySendInt32(cstate, tmp);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToStart(cstate, tupDesc);
     else
     {
         /*
@@ -891,6 +905,8 @@ DoCopyTo(CopyToState cstate)
         /* Need to flush out the trailer */
         CopySendEndOfRow(cstate);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -912,15 +928,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
+    /* Make sure the tuple is fully deconstructed */
+    slot_getallattrs(slot);
+
+    if (cstate->routine)
+    {
+        cstate->routine->CopyToOneRow(cstate, slot);
+        MemoryContextSwitchTo(oldcontext);
+        return;
+    }
+
     if (cstate->opts.binary)
     {
         /* Binary per-tuple header */
         CopySendInt16(cstate, list_length(cstate->attnumlist));
     }
 
-    /* Make sure the tuple is fully deconstructed */
-    slot_getallattrs(slot);
-
     if (!cstate->opts.binary)
     {
         bool        need_delim = false;
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 00000000000..d1289424c67
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,101 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO/FROM handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/*
+ * API structure for a COPY FROM format implementation.  Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called when COPY FROM is started to set up the input functions
+     * associated with the relation's attributes writing to.  `finfo` can be
+     * optionally filled to provide the catalog information of the input
+     * function.  `typioparam` can be optionally filled to define the OID of
+     * the type to pass to the input function.  `atttypid` is the OID of data
+     * type used by the relation's attribute.
+     */
+    void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                   FmgrInfo *finfo, Oid *typioparam);
+
+    /*
+     * Called when COPY FROM is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation where the data needs
+     * to be copied.  This can be used for any initialization steps required
+     * by a format.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row to a set of `values` and `nulls` of size tupDesc->natts.
+     *
+     * 'econtext' is used to evaluate default expression for each column that
+     * is either not read from the file or is using the DEFAULT option of COPY
+     * FROM.  It is NULL if no default values are used.
+     *
+     * Returns false if there are no more tuples to copy.
+     */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                   Datum *values, bool *nulls);
+
+    /* Called when COPY FROM has ended. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
+/*
+ * API structure for a COPY TO format implementation.   Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called when COPY TO is started to set up the output functions
+     * associated with the relation's attributes reading from.  `finfo` can be
+     * optionally filled to provide the catalog information of the output
+     * function.  `atttypid` is the OID of data type used by the relation's
+     * attribute.
+     */
+    void        (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
+                                  FmgrInfo *finfo);
+
+    /*
+     * Called when COPY TO is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation from where the data
+     * is read.
+     */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row for COPY TO.
+     *
+     * `slot` is the tuple slot where the data is emitted.
+     */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO has ended */
+    void        (*CopyToEnd) (CopyToState cstate);
+} CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc783..509b9e92a18 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -58,6 +59,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+    /* format routine */
+    const CopyFromRoutine *routine;
+
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
     FILE       *copy_file;        /* used if copy_src == COPY_FILE */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1847bbfa95c..a8422fa4d35 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -492,6 +492,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
@@ -503,6 +504,7 @@ CopyMultiInsertInfo
 CopyOnErrorChoice
 CopySource
 CopyStmt
+CopyToRoutine
 CopyToState
 CopyToStateData
 Cost
-- 
2.45.2

From 78ed1bf847051f09f417980931c031cfa5d93e4c Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jul 2024 16:44:44 +0900
Subject: [PATCH v22 2/5] Use CopyToRoutine/CopyFromRountine for the existing
 formats

The existing formats are text, csv and binary. If we find any
performance regression by this, we will not merge this to master.

This will increase indirect function call costs but this will reduce
runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)"
branch costs.

This uses an optimization based of static inline function and a
constant argument call for cstate->opts.csv_mode. For example,
CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool
is_csv" argument instead of using cstate->opts.csv_mode in
it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with
false (constant) for "bool is_csv". Compiler will remove "if (is_csv)"
branch in it by this optimization.

This doesn't change existing logic. This just moves existing codes.
---
 src/backend/commands/copyfrom.c          | 215 ++++++---
 src/backend/commands/copyfromparse.c     | 530 +++++++++++++----------
 src/backend/commands/copyto.c            | 477 +++++++++++++-------
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyfrom_internal.h |   8 +
 5 files changed, 790 insertions(+), 442 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 909375e81b7..e6ea9ce1602 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations for text and CSV.
+ */
+
+/*
+ * CopyFromTextLikeInFunc
+ *
+ * Assign input function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid,
+                       FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromTextLikeStart
+ *
+ * Start of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Create workspace for CopyReadAttributes results; used by CSV and text
+     * format.
+     */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * CopyFromTextLikeEnd
+ *
+ * End of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+/*
+ * CopyFromBinaryInFunc
+ *
+ * Assign input function data for a relation's attribute in binary format.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                     FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromBinaryStart
+ *
+ * Start of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * CopyFromBinaryEnd
+ *
+ * End of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * Routines assigned to each format.
++
+ * CSV and text share the same implementation, at the exception of the
+ * per-row callback.
+ */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromInFunc = CopyFromBinaryInFunc,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/*
+ * Define the COPY FROM routines to use for a format.
+ */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyFromRoutineCSV;
+    else if (opts.binary)
+        return &CopyFromRoutineBinary;
+
+    /* default is text */
+    return &CopyFromRoutineText;
+}
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1396,7 +1547,6 @@ BeginCopyFrom(ParseState *pstate,
                 num_defaults;
     FmgrInfo   *in_functions;
     Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1428,6 +1578,9 @@ BeginCopyFrom(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyFromGetRoutine(cstate->opts);
+
     /* Process the target relation */
     cstate->rel = rel;
 
@@ -1583,25 +1736,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1634,23 +1768,9 @@ BeginCopyFrom(ParseState *pstate,
             continue;
 
         /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
-                                            &in_functions[attnum - 1],
-                                            &typioparams[attnum - 1]);
-
-        else
-        {
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
+        cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                        &in_functions[attnum - 1],
+                                        &typioparams[attnum - 1]);
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1785,23 +1905,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-    else if (cstate->routine)
-    {
-        cstate->routine->CopyFromStart(cstate, tupDesc);
-    }
-    else
-    {
-        /* create workspace for CopyReadAttributes results */
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1814,8 +1918,7 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
-    if (cstate->routine)
-        cstate->routine->CopyFromEnd(cstate);
+    cstate->routine->CopyFromEnd(cstate);
 
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index b104e4a9114..0447c4df7e0 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -140,8 +140,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static pg_attribute_always_inline bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int    CopyReadAttributesText(CopyFromState cstate);
 static int    CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
@@ -741,8 +741,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  *
  * NOTE: force_not_null option are not applied to the returned fields.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
     int            fldct;
     bool        done;
@@ -759,13 +759,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         tupDesc = RelationGetDescr(cstate->rel);
 
         cstate->cur_lineno++;
-        done = CopyReadLine(cstate);
+        done = CopyReadLine(cstate, is_csv);
 
         if (cstate->opts.header_line == COPY_HEADER_MATCH)
         {
             int            fldnum;
 
-            if (cstate->opts.csv_mode)
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
                 fldct = CopyReadAttributesCSV(cstate);
             else
                 fldct = CopyReadAttributesText(cstate);
@@ -809,7 +813,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     cstate->cur_lineno++;
 
     /* Actually read the line into memory here */
-    done = CopyReadLine(cstate);
+    done = CopyReadLine(cstate, is_csv);
 
     /*
      * EOF at start of line means we're done.  If we see EOF after some
@@ -819,8 +823,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     if (done && cstate->line_buf.len == 0)
         return false;
 
-    /* Parse the line into de-escaped field values */
-    if (cstate->opts.csv_mode)
+    /*
+     * Parse the line into de-escaped field values
+     *
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
         fldct = CopyReadAttributesCSV(cstate);
     else
         fldct = CopyReadAttributesText(cstate);
@@ -830,6 +839,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+/*
+ * CopyFromTextLikeOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the text and CSV
+ * formats.
+ *
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate,
+                       ExprContext *econtext,
+                       Datum *values,
+                       bool *nulls,
+                       bool is_csv)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        if (is_csv)
+        {
+            if (string == NULL &&
+                cstate->opts.force_notnull_flags[m])
+            {
+                /*
+                 * FORCE_NOT_NULL option is set and column is NULL - convert
+                 * it to the NULL string.
+                 */
+                string = cstate->opts.null_print;
+            }
+            else if (string != NULL && cstate->opts.force_null_flags[m]
+                     && strcmp(string, cstate->opts.null_print) == 0)
+            {
+                /*
+                 * FORCE_NULL option is set and column matches the NULL
+                 * string. It must have been quoted, or otherwise the string
+                 * would already have been set to NULL. Convert it to NULL as
+                 * specified.
+                 */
+                string = NULL;
+            }
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+
+            cstate->num_errors++;
+
+            if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+            {
+                /*
+                 * Since we emit line number and column info in the below
+                 * notice message, we suppress error context information other
+                 * than the relation name.
+                 */
+                Assert(!cstate->relname_only);
+                cstate->relname_only = true;
+
+                if (cstate->cur_attval)
+                {
+                    char       *attval;
+
+                    attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname,
+                                   attval));
+                    pfree(attval);
+                }
+                else
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null
input",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname));
+
+                /* reset relname_only */
+                cstate->relname_only = false;
+            }
+
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+
+/*
+ * CopyFromTextOneRow
+ *
+ * Per-row callback for COPY FROM with text format.
+ */
+bool
+CopyFromTextOneRow(CopyFromState cstate,
+                   ExprContext *econtext,
+                   Datum *values,
+                   bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/*
+ * CopyFromCSVOneRow
+ *
+ * Per-row callback for COPY FROM with CSV format.
+ */
+bool
+CopyFromCSVOneRow(CopyFromState cstate,
+                  ExprContext *econtext,
+                  Datum *values,
+                  bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
+
+/*
+ * CopyFromBinaryOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the binary format.
+ */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                     Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -847,221 +1117,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
-
-                cstate->num_errors++;
-
-                if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-                {
-                    /*
-                     * Since we emit line number and column info in the below
-                     * notice message, we suppress error context information
-                     * other than the relation name.
-                     */
-                    Assert(!cstate->relname_only);
-                    cstate->relname_only = true;
-
-                    if (cstate->cur_attval)
-                    {
-                        char       *attval;
-
-                        attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname,
-                                       attval));
-                        pfree(attval);
-                    }
-                    else
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
nullinput",
 
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname));
-
-                    /* reset relname_only */
-                    cstate->relname_only = false;
-                }
-
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else if (cstate->routine)
-    {
-        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
-            return false;
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
@@ -1092,7 +1162,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
     bool        result;
 
@@ -1100,7 +1170,7 @@ CopyReadLine(CopyFromState cstate)
     cstate->line_buf_valid = false;
 
     /* Parse data and transfer into line_buf */
-    result = CopyReadLineText(cstate);
+    result = CopyReadLineText(cstate, is_csv);
 
     if (result)
     {
@@ -1167,8 +1237,8 @@ CopyReadLine(CopyFromState cstate)
 /*
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
-static bool
-CopyReadLineText(CopyFromState cstate)
+static pg_attribute_always_inline bool
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
     char       *copy_input_buf;
     int            input_buf_ptr;
@@ -1183,7 +1253,11 @@ CopyReadLineText(CopyFromState cstate)
     char        quotec = '\0';
     char        escapec = '\0';
 
-    if (cstate->opts.csv_mode)
+    /*
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
     {
         quotec = cstate->opts.quote[0];
         escapec = cstate->opts.escape[0];
@@ -1260,7 +1334,11 @@ CopyReadLineText(CopyFromState cstate)
         prev_raw_ptr = input_buf_ptr;
         c = copy_input_buf[input_buf_ptr++];
 
-        if (cstate->opts.csv_mode)
+        /*
+         * is_csv will be optimized away by compiler, as argument is constant
+         * at caller.
+         */
+        if (is_csv)
         {
             /*
              * If character is '\r', we may need to look ahead below.  Force
@@ -1299,7 +1377,7 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \r */
-        if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\r' && (!is_csv || !in_quote))
         {
             /* Check for \r\n on first line, _and_ handle \r\n. */
             if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1327,10 +1405,10 @@ CopyReadLineText(CopyFromState cstate)
                     if (cstate->eol_type == EOL_CRNL)
                         ereport(ERROR,
                                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errmsg("literal carriage return found in data") :
                                  errmsg("unquoted carriage return found in data"),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errhint("Use \"\\r\" to represent carriage return.") :
                                  errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1344,10 +1422,10 @@ CopyReadLineText(CopyFromState cstate)
             else if (cstate->eol_type == EOL_NL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal carriage return found in data") :
                          errmsg("unquoted carriage return found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\r\" to represent carriage return.") :
                          errhint("Use quoted CSV field to represent carriage return.")));
             /* If reach here, we have found the line terminator */
@@ -1355,15 +1433,15 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \n */
-        if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\n' && (!is_csv || !in_quote))
         {
             if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal newline found in data") :
                          errmsg("unquoted newline found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\n\" to represent newline.") :
                          errhint("Use quoted CSV field to represent newline.")));
             cstate->eol_type = EOL_NL;    /* in case not set yet */
@@ -1375,7 +1453,7 @@ CopyReadLineText(CopyFromState cstate)
          * Process backslash, except in CSV mode where backslash is a normal
          * character.
          */
-        if (c == '\\' && !cstate->opts.csv_mode)
+        if (c == '\\' && !is_csv)
         {
             char        c2;
 
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 405e1782685..46f3507a8b5 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToTextLikeSendEndOfRow
+ *
+ * Apply line terminations for a line sent in text or CSV format depending
+ * on the destination, then send the end of a row.
+ */
+static pg_attribute_always_inline void
+CopyToTextLikeSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+
+    /* Now take the actions related to the end of a row */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextLikeStart
+ *
+ * Start of COPY TO for text and CSV format.
+ */
+static void
+CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        ListCell   *cur;
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopyToTextLikeSendEndOfRow(cstate);
+    }
+}
+
+/*
+ * CopyToTextLikeOutFunc
+ *
+ * Assign output function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+
+/*
+ * CopyToTextLikeOneRow
+ *
+ * Process one row for text/CSV format.
+ *
+ * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
+ */
+static pg_attribute_always_inline void
+CopyToTextLikeOneRow(CopyToState cstate,
+                     TupleTableSlot *slot,
+                     bool is_csv)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1],
+                                        value);
+
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1]);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopyToTextLikeSendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with text format.
+ */
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, false);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with CSV format.
+ */
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, true);
+}
+
+/*
+ * CopyToTextLikeEnd
+ *
+ * End of COPY TO for text/CSV format.
+ */
+static void
+CopyToTextLikeEnd(CopyToState cstate)
+{
+    /* Nothing to do here */
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+/*
+ * CopyToBinaryStart
+ *
+ * Start of COPY TO for binary format.
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /* Generate header for a binary copy */
+    int32        tmp;
+
+    /* Signature */
+    CopySendData(cstate, BinarySignature, 11);
+    /* Flags field */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+    /* No header extension */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+}
+
+/*
+ * CopyToBinaryOutFunc
+ *
+ * Assign output function data for a relation's attribute in binary format.
+ */
+static void
+CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyToBinaryOneRow
+ *
+ * Process one row for binary format.
+ */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+                                           value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToBinaryEnd
+ *
+ * End of COPY TO for binary format.
+ */
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CSV and text share the same implementation, at the exception of the
+ * output representation and per-row callbacks.
+ */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOutFunc = CopyToBinaryOutFunc,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/*
+ * Define the COPY TO routines to use for a format.  This should be called
+ * after options are parsed.
+ */
+static const CopyToRoutine *
+CopyToGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyToRoutineCSV;
+    else if (opts.binary)
+        return &CopyToRoutineBinary;
+
+    /* default is text */
+    return &CopyToRoutineText;
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyToGetRoutine(cstate->opts);
+
     /* Process the source/target relation or query */
     if (rel)
     {
@@ -775,27 +1075,10 @@ DoCopyTo(CopyToState cstate)
     foreach(cur, cstate->attnumlist)
     {
         int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
-                                           &cstate->out_functions[attnum - 1]);
-        else
-        {
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
+        cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                       &cstate->out_functions[attnum - 1]);
     }
 
     /*
@@ -808,58 +1091,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToStart(cstate, tupDesc);
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -898,15 +1130,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToEnd(cstate);
+    cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -922,7 +1146,6 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
 
     MemoryContextReset(cstate->rowcontext);
@@ -931,69 +1154,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    if (cstate->routine)
-    {
-        cstate->routine->CopyToOneRow(cstate, slot);
-        MemoryContextSwitchTo(oldcontext);
-        return;
-    }
-
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
-    if (!cstate->opts.binary)
-    {
-        bool        need_delim = false;
-
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            char       *string;
-
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-
-            if (isnull)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1]);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-        }
-    }
-    else
-    {
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            bytea       *outputbytes;
-
-            if (isnull)
-                CopySendInt32(cstate, -1);
-            else
-            {
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 4002a7f5382..f2409013fba 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                          Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 509b9e92a18..c11b5ff3cc0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -187,4 +187,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* Callbacks for CopyFromRoutine->CopyFromOneRow */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                               Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                              Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                 Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.45.2

From 328bb34d626fdbcc2cb2e2013c46e24e6123faef Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jul 2024 17:39:41 +0900
Subject: [PATCH v22 3/5] Add support for adding custom COPY TO/FROM format

This uses the handler approach like tablesample. The approach creates
an internal function that returns an internal struct. In this case,
a COPY TO handler returns a CopyToRoutine and a COPY FROM handler
returns a CopyFromRoutine.

This uses the same handler for COPY TO and COPY FROM. PostgreSQL calls a
COPY TO/FROM handler with "is_from" argument. It's true for COPY FROM
and false for COPY TO:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine

This also add a test module for custom COPY TO/FROM handler.
---
 src/backend/commands/copy.c                   |  96 ++++++++++++++---
 src/backend/commands/copyfrom.c               |   4 +-
 src/backend/commands/copyto.c                 |   4 +-
 src/backend/nodes/Makefile                    |   1 +
 src/backend/nodes/gen_node_support.pl         |   2 +
 src/backend/utils/adt/pseudotypes.c           |   1 +
 src/include/catalog/pg_proc.dat               |   6 ++
 src/include/catalog/pg_type.dat               |   6 ++
 src/include/commands/copy.h                   |   2 +
 src/include/commands/copyapi.h                |   4 +
 src/include/nodes/meson.build                 |   1 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 src/test/modules/test_copy_format/.gitignore  |   4 +
 src/test/modules/test_copy_format/Makefile    |  23 ++++
 .../expected/test_copy_format.out             |  21 ++++
 src/test/modules/test_copy_format/meson.build |  33 ++++++
 .../test_copy_format/sql/test_copy_format.sql |   6 ++
 .../test_copy_format--1.0.sql                 |   8 ++
 .../test_copy_format/test_copy_format.c       | 100 ++++++++++++++++++
 .../test_copy_format/test_copy_format.control |   4 +
 21 files changed, 313 insertions(+), 15 deletions(-)
 create mode 100644 src/test/modules/test_copy_format/.gitignore
 create mode 100644 src/test/modules/test_copy_format/Makefile
 create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out
 create mode 100644 src/test/modules/test_copy_format/meson.build
 create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.c
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3485ba8663f..c8643b2dee7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -32,6 +32,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -462,6 +463,87 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
     return COPY_LOG_VERBOSITY_DEFAULT;    /* keep compiler quiet */
 }
 
+/*
+ * Process the "format" option.
+ *
+ * This function checks whether the option value is a built-in format such as
+ * "text" and "csv" or not. If the option value isn't a built-in format, this
+ * function finds a COPY format handler that returns a CopyToRoutine (for
+ * is_from == false) or CopyFromRountine (for is_from == true). If no COPY
+ * format handler is found, this function reports an error.
+ */
+static void
+ProcessCopyOptionFormat(ParseState *pstate,
+                        CopyFormatOptions *opts_out,
+                        bool is_from,
+                        DefElem *defel)
+{
+    char       *format;
+    Oid            funcargtypes[1];
+    Oid            handlerOid = InvalidOid;
+    Datum        datum;
+    Node       *routine;
+
+    format = defGetString(defel);
+
+    /* built-in formats */
+    if (strcmp(format, "text") == 0)
+         /* default format */ return;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        return;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        return;
+    }
+
+    /* custom format */
+    funcargtypes[0] = INTERNALOID;
+    handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                funcargtypes, true);
+    if (!OidIsValid(handlerOid))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
+    routine = (Node *) DatumGetPointer(datum);
+    if (is_from)
+    {
+        if (routine == NULL || !IsA(routine, CopyFromRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyFromRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+    else
+    {
+        if (routine == NULL || !IsA(routine, CopyToRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyToRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+
+    opts_out->routine = routine;
+}
+
 /*
  * Process the statement option list for COPY.
  *
@@ -505,22 +587,10 @@ ProcessCopyOptions(ParseState *pstate,
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
-            else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+            ProcessCopyOptionFormat(pstate, opts_out, is_from, defel);
         }
         else if (strcmp(defel->defname, "freeze") == 0)
         {
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e6ea9ce1602..932f1ff4f6e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = {
 static const CopyFromRoutine *
 CopyFromGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyFromRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyFromRoutineCSV;
     else if (opts.binary)
         return &CopyFromRoutineBinary;
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 46f3507a8b5..1f1d2baf9be 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = {
 static const CopyToRoutine *
 CopyToGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyToRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyToRoutineCSV;
     else if (opts.binary)
         return &CopyToRoutineBinary;
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index 66bbad8e6e0..173ee11811c 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -49,6 +49,7 @@ node_headers = \
     access/sdir.h \
     access/tableam.h \
     access/tsmapi.h \
+    commands/copyapi.h \
     commands/event_trigger.h \
     commands/trigger.h \
     executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index 81df3bdf95f..428ab4f0d93 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index e189e9b79d2..25f24ab95d2 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f23321a41f1..6af90a26374 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7761,6 +7761,12 @@
 { oid => '3312', descr => 'I/O',
   proname => 'tsm_handler_out', prorettype => 'cstring',
   proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' },
+{ oid => '8753', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler',
+  proargtypes => 'cstring', prosrc => 'copy_handler_in' },
+{ oid => '8754', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '267', descr => 'I/O',
   proname => 'table_am_handler_in', proisstrict => 'f',
   prorettype => 'table_am_handler', proargtypes => 'cstring',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index ceff66ccde1..37ebfa0908f 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -633,6 +633,12 @@
   typcategory => 'P', typinput => 'tsm_handler_in',
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '8752',
+  descr => 'pseudo-type for the result of a copy to/from method function',
+  typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '269',
   descr => 'pseudo-type for the result of a table AM handler function',
   typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2409013fba..63f3e8e1af7 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -87,6 +87,8 @@ typedef struct CopyFormatOptions
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     int64        reject_limit;    /* maximum tolerable number of errors */
     List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
+                                 * NULL) */
 } CopyFormatOptions;
 
 /* These are private in commands/copy[from|to].c */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index d1289424c67..e049a45a4b1 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -27,6 +27,8 @@ typedef struct CopyToStateData *CopyToState;
  */
 typedef struct CopyFromRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY FROM is started to set up the input functions
      * associated with the relation's attributes writing to.  `finfo` can be
@@ -69,6 +71,8 @@ typedef struct CopyFromRoutine
  */
 typedef struct CopyToRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY TO is started to set up the output functions
      * associated with the relation's attributes reading from.  `finfo` can be
diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build
index b665e55b657..103df1a7873 100644
--- a/src/include/nodes/meson.build
+++ b/src/include/nodes/meson.build
@@ -11,6 +11,7 @@ node_support_input_i = [
   'access/sdir.h',
   'access/tableam.h',
   'access/tsmapi.h',
+  'commands/copyapi.h',
   'commands/event_trigger.h',
   'commands/trigger.h',
   'executor/tuptable.h',
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index c0d3cf0e14b..33e3a49a4fb 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
           spgist_name_ops \
           test_bloomfilter \
           test_copy_callbacks \
+          test_copy_format \
           test_custom_rmgrs \
           test_ddl_deparse \
           test_dsa \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index c829b619530..75b6ab1b6a9 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -14,6 +14,7 @@ subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
+subdir('test_copy_format')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
 subdir('test_dsa')
diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_copy_format/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile
new file mode 100644
index 00000000000..8497f91624d
--- /dev/null
+++ b/src/test/modules/test_copy_format/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_format/Makefile
+
+MODULE_big = test_copy_format
+OBJS = \
+    $(WIN32RES) \
+    test_copy_format.o
+PGFILEDESC = "test_copy_format - test custom COPY FORMAT"
+
+EXTENSION = test_copy_format
+DATA = test_copy_format--1.0.sql
+
+REGRESS = test_copy_format
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_format
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
new file mode 100644
index 00000000000..4ed7c0b12db
--- /dev/null
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -0,0 +1,21 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromEnd
+COPY public.test TO stdout WITH (format 'test_copy_format');
+NOTICE:  test_copy_format: is_from=false
+NOTICE:  CopyToOutFunc: atttypid=21
+NOTICE:  CopyToOutFunc: atttypid=23
+NOTICE:  CopyToOutFunc: atttypid=20
+NOTICE:  CopyToStart: natts=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build
new file mode 100644
index 00000000000..4cefe7b709a
--- /dev/null
+++ b/src/test/modules/test_copy_format/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_copy_format_sources = files(
+  'test_copy_format.c',
+)
+
+if host_system == 'windows'
+  test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_format',
+    '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',])
+endif
+
+test_copy_format = shared_module('test_copy_format',
+  test_copy_format_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_copy_format
+
+test_install_data += files(
+  'test_copy_format.control',
+  'test_copy_format--1.0.sql',
+)
+
+tests += {
+  'name': 'test_copy_format',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_format',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
new file mode 100644
index 00000000000..e805f7cb011
--- /dev/null
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -0,0 +1,6 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+\.
+COPY public.test TO stdout WITH (format 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql
b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
new file mode 100644
index 00000000000..d24ea03ce99
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit
+
+CREATE FUNCTION test_copy_format(internal)
+    RETURNS copy_handler
+    AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
new file mode 100644
index 00000000000..f6b105659ab
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -0,0 +1,100 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_format.c
+ *        Code for testing custom COPY format.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *        src/test/modules/test_copy_format/test_copy_format.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "commands/copyapi.h"
+#include "commands/defrem.h"
+
+PG_MODULE_MAGIC;
+
+static void
+CopyFromInFunc(CopyFromState cstate, Oid atttypid,
+               FmgrInfo *finfo, Oid *typioparam)
+{
+    ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts)));
+}
+
+static bool
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    ereport(NOTICE, (errmsg("CopyFromOneRow")));
+    return false;
+}
+
+static void
+CopyFromEnd(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromEnd")));
+}
+
+static const CopyFromRoutine CopyFromRoutineTestCopyFormat = {
+    .type = T_CopyFromRoutine,
+    .CopyFromInFunc = CopyFromInFunc,
+    .CopyFromStart = CopyFromStart,
+    .CopyFromOneRow = CopyFromOneRow,
+    .CopyFromEnd = CopyFromEnd,
+};
+
+static void
+CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyToStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts)));
+}
+
+static void
+CopyToOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid)));
+}
+
+static void
+CopyToEnd(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToEnd")));
+}
+
+static const CopyToRoutine CopyToRoutineTestCopyFormat = {
+    .type = T_CopyToRoutine,
+    .CopyToOutFunc = CopyToOutFunc,
+    .CopyToStart = CopyToStart,
+    .CopyToOneRow = CopyToOneRow,
+    .CopyToEnd = CopyToEnd,
+};
+
+PG_FUNCTION_INFO_V1(test_copy_format);
+Datum
+test_copy_format(PG_FUNCTION_ARGS)
+{
+    bool        is_from = PG_GETARG_BOOL(0);
+
+    ereport(NOTICE,
+            (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
+
+    if (is_from)
+        PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat);
+    else
+        PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+}
diff --git a/src/test/modules/test_copy_format/test_copy_format.control
b/src/test/modules/test_copy_format/test_copy_format.control
new file mode 100644
index 00000000000..f05a6362358
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.control
@@ -0,0 +1,4 @@
+comment = 'Test code for custom COPY format'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_format'
+relocatable = true
-- 
2.45.2

From 682b868c825409d44aec0d3ad32ece63aaed309f Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 14:54:10 +0900
Subject: [PATCH v22 4/5] Export CopyToStateData and CopyFromStateData

It's for custom COPY TO/FROM format handlers implemented as extension.

This just moves codes. This doesn't change codes except
CopyDest/CopySource enum values. CopyDest/CopySource enum values such as
COPY_FILE are conflicted each other. So COPY_DEST_ prefix instead of
COPY_ prefix is used for CopyDest enum values and COPY_SOURCE_ prefix
instead of COPY_ prefix is used for CopySource enum values. For example,
COPY_FILE in CopyDest is renamed to COPY_DEST_FILE and COPY_FILE in
CopySource is renamed to COPY_SOURCE_FILE.

Note that this isn't enough to implement custom COPY TO/FROM format
handlers as extension. We'll do the followings in a subsequent commit:

For custom COPY TO format handler:

1. Add an opaque space for custom COPY TO format handler
2. Export CopySendEndOfRow() to flush buffer

For custom COPY FROM format handler:

1. Add an opaque space for custom COPY FROM format handler
2. Export CopyReadBinaryData() to read the next data
---
 src/backend/commands/copyfrom.c          |   4 +-
 src/backend/commands/copyfromparse.c     |  10 +-
 src/backend/commands/copyto.c            |  77 +-----
 src/include/commands/copy.h              |  81 +-----
 src/include/commands/copyapi.h           | 309 ++++++++++++++++++++++-
 src/include/commands/copyfrom_internal.h | 165 ------------
 6 files changed, 323 insertions(+), 323 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 932f1ff4f6e..d758e66c6a1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1716,7 +1716,7 @@ BeginCopyFrom(ParseState *pstate,
                             pg_encoding_to_char(GetDatabaseEncoding()))));
     }
 
-    cstate->copy_src = COPY_FILE;    /* default */
+    cstate->copy_src = COPY_SOURCE_FILE;    /* default */
 
     cstate->whereClause = whereClause;
 
@@ -1844,7 +1844,7 @@ BeginCopyFrom(ParseState *pstate,
     if (data_source_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_src = COPY_CALLBACK;
+        cstate->copy_src = COPY_SOURCE_CALLBACK;
         cstate->data_source_cb = data_source_cb;
     }
     else if (pipe)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 0447c4df7e0..ccfbacb4a37 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -171,7 +171,7 @@ ReceiveCopyBegin(CopyFromState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_src = COPY_FRONTEND;
+    cstate->copy_src = COPY_SOURCE_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
     /* We *must* flush here to ensure FE knows it can send. */
     pq_flush();
@@ -239,7 +239,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
 
     switch (cstate->copy_src)
     {
-        case COPY_FILE:
+        case COPY_SOURCE_FILE:
             bytesread = fread(databuf, 1, maxread, cstate->copy_file);
             if (ferror(cstate->copy_file))
                 ereport(ERROR,
@@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
             if (bytesread == 0)
                 cstate->raw_reached_eof = true;
             break;
-        case COPY_FRONTEND:
+        case COPY_SOURCE_FRONTEND:
             while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
             {
                 int            avail;
@@ -331,7 +331,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
                 bytesread += avail;
             }
             break;
-        case COPY_CALLBACK:
+        case COPY_SOURCE_CALLBACK:
             bytesread = cstate->data_source_cb(databuf, minread, maxread);
             break;
     }
@@ -1179,7 +1179,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv)
          * after \. up to the protocol end of copy data.  (XXX maybe better
          * not to treat \. as special?)
          */
-        if (cstate->copy_src == COPY_FRONTEND)
+        if (cstate->copy_src == COPY_SOURCE_FRONTEND)
         {
             int            inbytes;
 
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 1f1d2baf9be..fb68f42ce1e 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -37,67 +37,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * Represents the different dest cases we need to worry about at
- * the bottom level
- */
-typedef enum CopyDest
-{
-    COPY_FILE,                    /* to file (or a piped program) */
-    COPY_FRONTEND,                /* to frontend */
-    COPY_CALLBACK,                /* to callback function */
-} CopyDest;
-
-/*
- * This struct contains all the state variables used throughout a COPY TO
- * operation.
- *
- * Multi-byte encodings: all supported client-side encodings encode multi-byte
- * characters by having the first byte's high bit set. Subsequent bytes of the
- * character can have the high bit not set. When scanning data in such an
- * encoding to look for a match to a single-byte (ie ASCII) character, we must
- * use the full pg_encoding_mblen() machinery to skip over multibyte
- * characters, else we might find a false match to a trailing byte. In
- * supported server encodings, there is no possibility of a false match, and
- * it's faster to make useless comparisons to trailing bytes than it is to
- * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
- * when we have to do it the hard way.
- */
-typedef struct CopyToStateData
-{
-    /* format routine */
-    const CopyToRoutine *routine;
-
-    /* low-level state data */
-    CopyDest    copy_dest;        /* type of copy source/destination */
-    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
-
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy to */
-    QueryDesc  *queryDesc;        /* executable query to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDOUT */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_dest_cb data_dest_cb; /* function for writing data */
-
-    CopyFormatOptions opts;
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    FmgrInfo   *out_functions;    /* lookup info for output functions */
-    MemoryContext rowcontext;    /* per-row evaluation context */
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyToStateData;
-
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
 {
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             /* Default line termination depends on platform */
 #ifndef WIN32
             CopySendChar(cstate, '\n');
@@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
             CopySendString(cstate, "\r\n");
 #endif
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* The FE/BE protocol uses \n as newline for all platforms */
             CopySendChar(cstate, '\n');
             break;
@@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_dest = COPY_FRONTEND;
+    cstate->copy_dest = COPY_DEST_FRONTEND;
 }
 
 static void
@@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate)
 
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate)
                              errmsg("could not write to COPY file: %m")));
             }
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
-        case COPY_CALLBACK:
+        case COPY_DEST_CALLBACK:
             cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
             break;
     }
@@ -929,12 +868,12 @@ BeginCopyTo(ParseState *pstate,
     /* See Multibyte encoding comment above */
     cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
 
-    cstate->copy_dest = COPY_FILE;    /* default */
+    cstate->copy_dest = COPY_DEST_FILE; /* default */
 
     if (data_dest_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_dest = COPY_CALLBACK;
+        cstate->copy_dest = COPY_DEST_CALLBACK;
         cstate->data_dest_cb = data_dest_cb;
     }
     else if (pipe)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 63f3e8e1af7..e2411848e9f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,90 +14,11 @@
 #ifndef COPY_H
 #define COPY_H
 
-#include "nodes/execnodes.h"
+#include "commands/copyapi.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/*
- * Represents whether a header line should be present, and whether it must
- * match the actual names (which implies "true").
- */
-typedef enum CopyHeaderChoice
-{
-    COPY_HEADER_FALSE = 0,
-    COPY_HEADER_TRUE,
-    COPY_HEADER_MATCH,
-} CopyHeaderChoice;
-
-/*
- * Represents where to save input processing errors.  More values to be added
- * in the future.
- */
-typedef enum CopyOnErrorChoice
-{
-    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
-    COPY_ON_ERROR_IGNORE,        /* ignore errors */
-} CopyOnErrorChoice;
-
-/*
- * Represents verbosity of logged messages by COPY command.
- */
-typedef enum CopyLogVerbosityChoice
-{
-    COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */
-    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is
-                                     * the default, assign 0 */
-    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
-} CopyLogVerbosityChoice;
-
-/*
- * A struct to hold COPY options, in a parsed form. All of these are related
- * to formatting, except for 'freeze', which doesn't really belong here, but
- * it's expedient to parse it along with all the other options.
- */
-typedef struct CopyFormatOptions
-{
-    /* parameters from the COPY command */
-    int            file_encoding;    /* file or remote side's character encoding,
-                                 * -1 if not specified */
-    bool        binary;            /* binary format? */
-    bool        freeze;            /* freeze rows on loading? */
-    bool        csv_mode;        /* Comma Separated Value format? */
-    CopyHeaderChoice header_line;    /* header line? */
-    char       *null_print;        /* NULL marker string (server encoding!) */
-    int            null_print_len; /* length of same */
-    char       *null_print_client;    /* same converted to file encoding */
-    char       *default_print;    /* DEFAULT marker string */
-    int            default_print_len;    /* length of same */
-    char       *delim;            /* column delimiter (must be 1 byte) */
-    char       *quote;            /* CSV quote char (must be 1 byte) */
-    char       *escape;            /* CSV escape char (must be 1 byte) */
-    List       *force_quote;    /* list of column names */
-    bool        force_quote_all;    /* FORCE_QUOTE *? */
-    bool       *force_quote_flags;    /* per-column CSV FQ flags */
-    List       *force_notnull;    /* list of column names */
-    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
-    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
-    List       *force_null;        /* list of column names */
-    bool        force_null_all; /* FORCE_NULL *? */
-    bool       *force_null_flags;    /* per-column CSV FN flags */
-    bool        convert_selectively;    /* do selective binary conversion? */
-    CopyOnErrorChoice on_error; /* what to do when error happened */
-    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
-    int64        reject_limit;    /* maximum tolerable number of errors */
-    List       *convert_select; /* list of column names (can be NIL) */
-    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
-                                 * NULL) */
-} CopyFormatOptions;
-
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
-typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-typedef void (*copy_data_dest_cb) (void *data, int len);
-
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index e049a45a4b1..206d4c9fac9 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,12 +14,84 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
+#include "commands/trigger.h"
+#include "executor/execdesc.h"
 #include "executor/tuptable.h"
 #include "nodes/execnodes.h"
 
-/* These are private in commands/copy[from|to].c */
+/*
+ * Represents whether a header line should be present, and whether it must
+ * match the actual names (which implies "true").
+ */
+typedef enum CopyHeaderChoice
+{
+    COPY_HEADER_FALSE = 0,
+    COPY_HEADER_TRUE,
+    COPY_HEADER_MATCH,
+} CopyHeaderChoice;
+
+/*
+ * Represents where to save input processing errors.  More values to be added
+ * in the future.
+ */
+typedef enum CopyOnErrorChoice
+{
+    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
+    COPY_ON_ERROR_IGNORE,        /* ignore errors */
+} CopyOnErrorChoice;
+
+/*
+ * Represents verbosity of logged messages by COPY command.
+ */
+typedef enum CopyLogVerbosityChoice
+{
+    COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */
+    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is
+                                     * the default, assign 0 */
+    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
+} CopyLogVerbosityChoice;
+
+/*
+ * A struct to hold COPY options, in a parsed form. All of these are related
+ * to formatting, except for 'freeze', which doesn't really belong here, but
+ * it's expedient to parse it along with all the other options.
+ */
+typedef struct CopyFormatOptions
+{
+    /* parameters from the COPY command */
+    int            file_encoding;    /* file or remote side's character encoding,
+                                 * -1 if not specified */
+    bool        binary;            /* binary format? */
+    bool        freeze;            /* freeze rows on loading? */
+    bool        csv_mode;        /* Comma Separated Value format? */
+    CopyHeaderChoice header_line;    /* header line? */
+    char       *null_print;        /* NULL marker string (server encoding!) */
+    int            null_print_len; /* length of same */
+    char       *null_print_client;    /* same converted to file encoding */
+    char       *default_print;    /* DEFAULT marker string */
+    int            default_print_len;    /* length of same */
+    char       *delim;            /* column delimiter (must be 1 byte) */
+    char       *quote;            /* CSV quote char (must be 1 byte) */
+    char       *escape;            /* CSV escape char (must be 1 byte) */
+    List       *force_quote;    /* list of column names */
+    bool        force_quote_all;    /* FORCE_QUOTE *? */
+    bool       *force_quote_flags;    /* per-column CSV FQ flags */
+    List       *force_notnull;    /* list of column names */
+    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
+    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
+    List       *force_null;        /* list of column names */
+    bool        force_null_all; /* FORCE_NULL *? */
+    bool       *force_null_flags;    /* per-column CSV FN flags */
+    bool        convert_selectively;    /* do selective binary conversion? */
+    CopyOnErrorChoice on_error; /* what to do when error happened */
+    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
+    int64        reject_limit;    /* maximum tolerable number of errors */
+    List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
+                                 * NULL) */
+} CopyFormatOptions;
+
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 /*
  * API structure for a COPY FROM format implementation.  Note this must be
@@ -65,6 +137,176 @@ typedef struct CopyFromRoutine
     void        (*CopyFromEnd) (CopyFromState cstate);
 } CopyFromRoutine;
 
+/*
+ * Represents the different source cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopySource
+{
+    COPY_SOURCE_FILE,            /* from file (or a piped program) */
+    COPY_SOURCE_FRONTEND,        /* from frontend */
+    COPY_SOURCE_CALLBACK,        /* from callback function */
+} CopySource;
+
+/*
+ * Represents the end-of-line terminator type of the input
+ */
+typedef enum EolType
+{
+    EOL_UNKNOWN,
+    EOL_NL,
+    EOL_CR,
+    EOL_CRNL,
+} EolType;
+
+/*
+ * Represents the insert method to be used during COPY FROM.
+ */
+typedef enum CopyInsertMethod
+{
+    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
+    CIM_MULTI,                    /* always use table_multi_insert or
+                                 * ExecForeignBatchInsert */
+    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
+                                 * ExecForeignBatchInsert only if valid */
+} CopyInsertMethod;
+
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+
+/*
+ * This struct contains all the state variables used throughout a COPY FROM
+ * operation.
+ */
+typedef struct CopyFromStateData
+{
+    /* format routine */
+    const CopyFromRoutine *routine;
+
+    /* low-level state data */
+    CopySource    copy_src;        /* type of copy source */
+    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
+
+    EolType        eol_type;        /* EOL type of input */
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    Oid            conversion_proc;    /* encoding conversion function */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDIN */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_source_cb data_source_cb; /* function for reading data */
+
+    CopyFormatOptions opts;
+    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /* these are just for error messages, see CopyFromErrorCallback */
+    const char *cur_relname;    /* table name for error messages */
+    uint64        cur_lineno;        /* line number for error messages */
+    const char *cur_attname;    /* current att for error messages */
+    const char *cur_attval;        /* current att value for error messages */
+    bool        relname_only;    /* don't output line number, att, etc. */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    AttrNumber    num_defaults;    /* count of att that are missing and have
+                                 * default value */
+    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
+    Oid           *typioparams;    /* array of element types for in_functions */
+    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
+                                     * execution */
+    uint64        num_errors;        /* total number of rows which contained soft
+                                 * errors */
+    int           *defmap;            /* array of default att numbers related to
+                                 * missing att */
+    ExprState **defexprs;        /* array of default att expressions for all
+                                 * att */
+    bool       *defaults;        /* if DEFAULT marker was found for
+                                 * corresponding att */
+    bool        volatile_defexprs;    /* is any of defexprs volatile? */
+    List       *range_table;    /* single element list of RangeTblEntry */
+    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
+    ExprState  *qualexpr;
+
+    TransitionCaptureState *transition_capture;
+
+    /*
+     * These variables are used to reduce overhead in COPY FROM.
+     *
+     * attribute_buf holds the separated, de-escaped text for each field of
+     * the current line.  The CopyReadAttributes functions return arrays of
+     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
+     * the buffer on each cycle.
+     *
+     * In binary COPY FROM, attribute_buf holds the binary data for the
+     * current field, but the usage is otherwise similar.
+     */
+    StringInfoData attribute_buf;
+
+    /* field raw data pointers found by COPY FROM */
+
+    int            max_fields;
+    char      **raw_fields;
+
+    /*
+     * Similarly, line_buf holds the whole input line being processed. The
+     * input cycle is first to read the whole line into line_buf, and then
+     * extract the individual attribute fields into attribute_buf.  line_buf
+     * is preserved unmodified so that we can display it in error messages if
+     * appropriate.  (In binary mode, line_buf is not used.)
+     */
+    StringInfoData line_buf;
+    bool        line_buf_valid; /* contains the row being processed? */
+
+    /*
+     * input_buf holds input data, already converted to database encoding.
+     *
+     * In text mode, CopyReadLine parses this data sufficiently to locate line
+     * boundaries, then transfers the data to line_buf. We guarantee that
+     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
+     * mode, input_buf is not used.)
+     *
+     * If encoding conversion is not required, input_buf is not a separate
+     * buffer but points directly to raw_buf.  In that case, input_buf_len
+     * tracks the number of bytes that have been verified as valid in the
+     * database encoding, and raw_buf_len is the total number of bytes stored
+     * in the buffer.
+     */
+#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
+    char       *input_buf;
+    int            input_buf_index;    /* next byte to process */
+    int            input_buf_len;    /* total # of bytes stored */
+    bool        input_reached_eof;    /* true if we reached EOF */
+    bool        input_reached_error;    /* true if a conversion error happened */
+    /* Shorthand for number of unconsumed bytes available in input_buf */
+#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
+
+    /*
+     * raw_buf holds raw input data read from the data source (file or client
+     * connection), not yet converted to the database encoding.  Like with
+     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
+     */
+#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
+    char       *raw_buf;
+    int            raw_buf_index;    /* next byte to process */
+    int            raw_buf_len;    /* total # of bytes stored */
+    bool        raw_reached_eof;    /* true if we reached EOF */
+
+    /* Shorthand for number of unconsumed bytes available in raw_buf */
+#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
+
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyFromStateData;
+
+
+typedef struct CopyToStateData *CopyToState;
+
 /*
  * API structure for a COPY TO format implementation.   Note this must be
  * allocated in a server-lifetime manner, typically as a static const struct.
@@ -102,4 +344,67 @@ typedef struct CopyToRoutine
     void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+    COPY_DEST_FILE,                /* to file (or a piped program) */
+    COPY_DEST_FRONTEND,            /* to frontend */
+    COPY_DEST_CALLBACK,            /* to callback function */
+} CopyDest;
+
+typedef void (*copy_data_dest_cb) (void *data, int len);
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+    /* format routine */
+    const CopyToRoutine *routine;
+
+    /* low-level state data */
+    CopyDest    copy_dest;        /* type of copy source/destination */
+    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
+
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy to */
+    QueryDesc  *queryDesc;        /* executable query to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDOUT */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+    CopyFormatOptions opts;
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    FmgrInfo   *out_functions;    /* lookup info for output functions */
+    MemoryContext rowcontext;    /* per-row evaluation context */
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyToStateData;
+
 #endif                            /* COPYAPI_H */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c11b5ff3cc0..3863d26d5b7 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -19,171 +19,6 @@
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
-/*
- * Represents the different source cases we need to worry about at
- * the bottom level
- */
-typedef enum CopySource
-{
-    COPY_FILE,                    /* from file (or a piped program) */
-    COPY_FRONTEND,                /* from frontend */
-    COPY_CALLBACK,                /* from callback function */
-} CopySource;
-
-/*
- *    Represents the end-of-line terminator type of the input
- */
-typedef enum EolType
-{
-    EOL_UNKNOWN,
-    EOL_NL,
-    EOL_CR,
-    EOL_CRNL,
-} EolType;
-
-/*
- * Represents the insert method to be used during COPY FROM.
- */
-typedef enum CopyInsertMethod
-{
-    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
-    CIM_MULTI,                    /* always use table_multi_insert or
-                                 * ExecForeignBatchInsert */
-    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
-                                 * ExecForeignBatchInsert only if valid */
-} CopyInsertMethod;
-
-/*
- * This struct contains all the state variables used throughout a COPY FROM
- * operation.
- */
-typedef struct CopyFromStateData
-{
-    /* format routine */
-    const CopyFromRoutine *routine;
-
-    /* low-level state data */
-    CopySource    copy_src;        /* type of copy source */
-    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
-
-    EolType        eol_type;        /* EOL type of input */
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    Oid            conversion_proc;    /* encoding conversion function */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDIN */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_source_cb data_source_cb; /* function for reading data */
-
-    CopyFormatOptions opts;
-    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /* these are just for error messages, see CopyFromErrorCallback */
-    const char *cur_relname;    /* table name for error messages */
-    uint64        cur_lineno;        /* line number for error messages */
-    const char *cur_attname;    /* current att for error messages */
-    const char *cur_attval;        /* current att value for error messages */
-    bool        relname_only;    /* don't output line number, att, etc. */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    AttrNumber    num_defaults;    /* count of att that are missing and have
-                                 * default value */
-    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
-    Oid           *typioparams;    /* array of element types for in_functions */
-    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
-                                     * execution */
-    uint64        num_errors;        /* total number of rows which contained soft
-                                 * errors */
-    int           *defmap;            /* array of default att numbers related to
-                                 * missing att */
-    ExprState **defexprs;        /* array of default att expressions for all
-                                 * att */
-    bool       *defaults;        /* if DEFAULT marker was found for
-                                 * corresponding att */
-    bool        volatile_defexprs;    /* is any of defexprs volatile? */
-    List       *range_table;    /* single element list of RangeTblEntry */
-    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
-    ExprState  *qualexpr;
-
-    TransitionCaptureState *transition_capture;
-
-    /*
-     * These variables are used to reduce overhead in COPY FROM.
-     *
-     * attribute_buf holds the separated, de-escaped text for each field of
-     * the current line.  The CopyReadAttributes functions return arrays of
-     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
-     * the buffer on each cycle.
-     *
-     * In binary COPY FROM, attribute_buf holds the binary data for the
-     * current field, but the usage is otherwise similar.
-     */
-    StringInfoData attribute_buf;
-
-    /* field raw data pointers found by COPY FROM */
-
-    int            max_fields;
-    char      **raw_fields;
-
-    /*
-     * Similarly, line_buf holds the whole input line being processed. The
-     * input cycle is first to read the whole line into line_buf, and then
-     * extract the individual attribute fields into attribute_buf.  line_buf
-     * is preserved unmodified so that we can display it in error messages if
-     * appropriate.  (In binary mode, line_buf is not used.)
-     */
-    StringInfoData line_buf;
-    bool        line_buf_valid; /* contains the row being processed? */
-
-    /*
-     * input_buf holds input data, already converted to database encoding.
-     *
-     * In text mode, CopyReadLine parses this data sufficiently to locate line
-     * boundaries, then transfers the data to line_buf. We guarantee that
-     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
-     * mode, input_buf is not used.)
-     *
-     * If encoding conversion is not required, input_buf is not a separate
-     * buffer but points directly to raw_buf.  In that case, input_buf_len
-     * tracks the number of bytes that have been verified as valid in the
-     * database encoding, and raw_buf_len is the total number of bytes stored
-     * in the buffer.
-     */
-#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
-    char       *input_buf;
-    int            input_buf_index;    /* next byte to process */
-    int            input_buf_len;    /* total # of bytes stored */
-    bool        input_reached_eof;    /* true if we reached EOF */
-    bool        input_reached_error;    /* true if a conversion error happened */
-    /* Shorthand for number of unconsumed bytes available in input_buf */
-#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
-
-    /*
-     * raw_buf holds raw input data read from the data source (file or client
-     * connection), not yet converted to the database encoding.  Like with
-     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
-     */
-#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
-    char       *raw_buf;
-    int            raw_buf_index;    /* next byte to process */
-    int            raw_buf_len;    /* total # of bytes stored */
-    bool        raw_reached_eof;    /* true if we reached EOF */
-
-    /* Shorthand for number of unconsumed bytes available in raw_buf */
-#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
-
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyFromStateData;
-
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
-- 
2.45.2

From 3f9b4a8caa33960fe11512883177a96939186373 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Tue, 23 Jan 2024 15:12:43 +0900
Subject: [PATCH v22 5/5] Add support for implementing custom COPY TO/FROM
 format as extension

For custom COPY TO format implementation:

* Add CopyToStateData::opaque that can be used to keep data for custom
  COPY TO format implementation
* Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf
  as CopyToStateFlush()

For custom COPY FROM format implementation:

* Add CopyFromStateData::opaque that can be used to keep data for
  custom COPY From format implementation
* Export CopyReadBinaryData() to read the next data as
  CopyFromStateRead()
---
 src/backend/commands/copyfromparse.c | 14 ++++++++++++++
 src/backend/commands/copyto.c        | 14 ++++++++++++++
 src/include/commands/copyapi.h       | 10 ++++++++++
 3 files changed, 38 insertions(+)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index ccfbacb4a37..4fa23d992f5 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -730,6 +730,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
     return copied_bytes;
 }
 
+/*
+ * CopyFromStateRead
+ *
+ * Export CopyReadBinaryData() for extensions. We want to keep
+ * CopyReadBinaryData() as a static function for
+ * optimization. CopyReadBinaryData() calls in this file may be optimized by
+ * a compiler.
+ */
+int
+CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes)
+{
+    return CopyReadBinaryData(cstate, dest, nbytes);
+}
+
 /*
  * Read raw fields in the next line for COPY FROM in text or csv mode.
  * Return false if no more lines.
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fb68f42ce1e..93b041352c5 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate)
     resetStringInfo(fe_msgbuf);
 }
 
+/*
+ * CopyToStateFlush
+ *
+ * Export CopySendEndOfRow() for extensions. We want to keep
+ * CopySendEndOfRow() as a static function for
+ * optimization. CopySendEndOfRow() calls in this file may be optimized by a
+ * compiler.
+ */
+void
+CopyToStateFlush(CopyToState cstate)
+{
+    CopySendEndOfRow(cstate);
+}
+
 /*
  * These functions do apply some data conversion
  */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 206d4c9fac9..2de610ef729 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -302,8 +302,13 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyFromStateData;
 
+extern int    CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes);
+
 
 typedef struct CopyToStateData *CopyToState;
 
@@ -405,6 +410,11 @@ typedef struct CopyToStateData
     FmgrInfo   *out_functions;    /* lookup info for output functions */
     MemoryContext rowcontext;    /* per-row evaluation context */
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyToStateData;
 
+extern void CopyToStateFlush(CopyToState cstate);
+
 #endif                            /* COPYAPI_H */
-- 
2.45.2

From 79470eab70ba8df417796cc9e66eca41b97e74b5 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:24:49 +0900
Subject: [PATCH v23 01/10] Add CopyToRountine

It's for implementing custom COPY TO format. But this is not enough to
implement custom COPY TO format yet. We'll export some APIs to send
data and add "format" option to COPY TO later.

Existing text/csv/binary format implementations don't use
CopyToRoutine for now. We have a patch for it but we defer it. Because
there are some mysterious profile results in spite of we get faster
runtimes. See [1] for details.

[1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz

Note that this doesn't change existing text/csv/binary format
implementations.
---
 src/backend/commands/copyto.c    | 31 ++++++++++++++---
 src/include/commands/copyapi.h   | 58 ++++++++++++++++++++++++++++++++
 src/tools/pgindent/typedefs.list |  1 +
 3 files changed, 86 insertions(+), 4 deletions(-)
 create mode 100644 src/include/commands/copyapi.h

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index f55e6d96751..405e1782685 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -20,6 +20,7 @@
 
 #include "access/tableam.h"
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
@@ -64,6 +65,9 @@ typedef enum CopyDest
  */
 typedef struct CopyToStateData
 {
+    /* format routine */
+    const CopyToRoutine *routine;
+
     /* low-level state data */
     CopyDest    copy_dest;        /* type of copy source/destination */
     FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
@@ -776,14 +780,22 @@ DoCopyTo(CopyToState cstate)
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
         if (cstate->opts.binary)
+        {
             getTypeBinaryOutputInfo(attr->atttypid,
                                     &out_func_oid,
                                     &isvarlena);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                           &cstate->out_functions[attnum - 1]);
         else
+        {
             getTypeOutputInfo(attr->atttypid,
                               &out_func_oid,
                               &isvarlena);
-        fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+        }
     }
 
     /*
@@ -810,6 +822,8 @@ DoCopyTo(CopyToState cstate)
         tmp = 0;
         CopySendInt32(cstate, tmp);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToStart(cstate, tupDesc);
     else
     {
         /*
@@ -891,6 +905,8 @@ DoCopyTo(CopyToState cstate)
         /* Need to flush out the trailer */
         CopySendEndOfRow(cstate);
     }
+    else if (cstate->routine)
+        cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -912,15 +928,22 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     MemoryContextReset(cstate->rowcontext);
     oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
+    /* Make sure the tuple is fully deconstructed */
+    slot_getallattrs(slot);
+
+    if (cstate->routine)
+    {
+        cstate->routine->CopyToOneRow(cstate, slot);
+        MemoryContextSwitchTo(oldcontext);
+        return;
+    }
+
     if (cstate->opts.binary)
     {
         /* Binary per-tuple header */
         CopySendInt16(cstate, list_length(cstate->attnumlist));
     }
 
-    /* Make sure the tuple is fully deconstructed */
-    slot_getallattrs(slot);
-
     if (!cstate->opts.binary)
     {
         bool        need_delim = false;
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
new file mode 100644
index 00000000000..5ce24f195dc
--- /dev/null
+++ b/src/include/commands/copyapi.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * copyapi.h
+ *      API for COPY TO handlers
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/commands/copyapi.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef COPYAPI_H
+#define COPYAPI_H
+
+#include "executor/tuptable.h"
+#include "nodes/execnodes.h"
+
+/* This is private in commands/copyto.c */
+typedef struct CopyToStateData *CopyToState;
+
+/*
+ * API structure for a COPY TO format implementation.   Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyToRoutine
+{
+    /*
+     * Called when COPY TO is started to set up the output functions
+     * associated with the relation's attributes reading from.  `finfo` can be
+     * optionally filled to provide the catalog information of the output
+     * function.  `atttypid` is the OID of data type used by the relation's
+     * attribute.
+     */
+    void        (*CopyToOutFunc) (CopyToState cstate, Oid atttypid,
+                                  FmgrInfo *finfo);
+
+    /*
+     * Called when COPY TO is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation from where the data
+     * is read.
+     */
+    void        (*CopyToStart) (CopyToState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row for COPY TO.
+     *
+     * `slot` is the tuple slot where the data is emitted.
+     */
+    void        (*CopyToOneRow) (CopyToState cstate, TupleTableSlot *slot);
+
+    /* Called when COPY TO has ended */
+    void        (*CopyToEnd) (CopyToState cstate);
+} CopyToRoutine;
+
+#endif                            /* COPYAPI_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1847bbfa95c..098e7023486 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -503,6 +503,7 @@ CopyMultiInsertInfo
 CopyOnErrorChoice
 CopySource
 CopyStmt
+CopyToRoutine
 CopyToState
 CopyToStateData
 Cost
-- 
2.45.2

From 8743273a660c75b49862dbb18991c74131ed776e Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:26:29 +0900
Subject: [PATCH v23 02/10] Use CopyToRountine for the existing formats

The existing formats are text, csv and binary. If we find any
performance regression by this, we will not merge this to master.

This will increase indirect function call costs but this will reduce
runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)"
branch costs.

This uses an optimization based of static inline function and a
constant argument call for cstate->opts.csv_mode. For example,
CopyToTextLikeOneRow() uses this optimization. It accepts the "bool
is_csv" argument instead of using cstate->opts.csv_mode in
it. CopyToTextOneRow() calls CopyToTextLikeOneRow() with
false (constant) for "bool is_csv". Compiler will remove "if (is_csv)"
branch in it by this optimization.

This doesn't change existing logic. This just moves existing codes.
---
 src/backend/commands/copyto.c | 477 +++++++++++++++++++++++-----------
 1 file changed, 319 insertions(+), 158 deletions(-)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 405e1782685..46f3507a8b5 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -128,6 +128,317 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyToRoutine implementations.
+ */
+
+/*
+ * CopyToTextLikeSendEndOfRow
+ *
+ * Apply line terminations for a line sent in text or CSV format depending
+ * on the destination, then send the end of a row.
+ */
+static pg_attribute_always_inline void
+CopyToTextLikeSendEndOfRow(CopyToState cstate)
+{
+    switch (cstate->copy_dest)
+    {
+        case COPY_FILE:
+            /* Default line termination depends on platform */
+#ifndef WIN32
+            CopySendChar(cstate, '\n');
+#else
+            CopySendString(cstate, "\r\n");
+#endif
+            break;
+        case COPY_FRONTEND:
+            /* The FE/BE protocol uses \n as newline for all platforms */
+            CopySendChar(cstate, '\n');
+            break;
+        default:
+            break;
+    }
+
+    /* Now take the actions related to the end of a row */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextLikeStart
+ *
+ * Start of COPY TO for text and CSV format.
+ */
+static void
+CopyToTextLikeStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /*
+     * For non-binary copy, we need to convert null_print to file encoding,
+     * because it will be sent directly with CopySendString.
+     */
+    if (cstate->need_transcoding)
+        cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+                                                          cstate->opts.null_print_len,
+                                                          cstate->file_encoding);
+
+    /* if a header has been requested send the line */
+    if (cstate->opts.header_line)
+    {
+        ListCell   *cur;
+        bool        hdr_delim = false;
+
+        foreach(cur, cstate->attnumlist)
+        {
+            int            attnum = lfirst_int(cur);
+            char       *colname;
+
+            if (hdr_delim)
+                CopySendChar(cstate, cstate->opts.delim[0]);
+            hdr_delim = true;
+
+            colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+            if (cstate->opts.csv_mode)
+                CopyAttributeOutCSV(cstate, colname, false);
+            else
+                CopyAttributeOutText(cstate, colname);
+        }
+
+        CopyToTextLikeSendEndOfRow(cstate);
+    }
+}
+
+/*
+ * CopyToTextLikeOutFunc
+ *
+ * Assign output function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyToTextLikeOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+
+/*
+ * CopyToTextLikeOneRow
+ *
+ * Process one row for text/CSV format.
+ *
+ * Workhorse for CopyToTextOneRow() and CopyToCSVOneRow().
+ */
+static pg_attribute_always_inline void
+CopyToTextLikeOneRow(CopyToState cstate,
+                     TupleTableSlot *slot,
+                     bool is_csv)
+{
+    bool        need_delim = false;
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (need_delim)
+            CopySendChar(cstate, cstate->opts.delim[0]);
+        need_delim = true;
+
+        if (isnull)
+        {
+            CopySendString(cstate, cstate->opts.null_print_client);
+        }
+        else
+        {
+            char       *string;
+
+            string = OutputFunctionCall(&out_functions[attnum - 1],
+                                        value);
+
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
+                CopyAttributeOutCSV(cstate, string,
+                                    cstate->opts.force_quote_flags[attnum - 1]);
+            else
+                CopyAttributeOutText(cstate, string);
+        }
+    }
+
+    CopyToTextLikeSendEndOfRow(cstate);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with text format.
+ */
+static void
+CopyToTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, false);
+}
+
+/*
+ * CopyToTextOneRow
+ *
+ * Per-row callback for COPY TO with CSV format.
+ */
+static void
+CopyToCSVOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    CopyToTextLikeOneRow(cstate, slot, true);
+}
+
+/*
+ * CopyToTextLikeEnd
+ *
+ * End of COPY TO for text/CSV format.
+ */
+static void
+CopyToTextLikeEnd(CopyToState cstate)
+{
+    /* Nothing to do here */
+}
+
+/*
+ * CopyToRoutine implementation for "binary".
+ */
+
+/*
+ * CopyToBinaryStart
+ *
+ * Start of COPY TO for binary format.
+ */
+static void
+CopyToBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    /* Generate header for a binary copy */
+    int32        tmp;
+
+    /* Signature */
+    CopySendData(cstate, BinarySignature, 11);
+    /* Flags field */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+    /* No header extension */
+    tmp = 0;
+    CopySendInt32(cstate, tmp);
+}
+
+/*
+ * CopyToBinaryOutFunc
+ *
+ * Assign output function data for a relation's attribute in binary format.
+ */
+static void
+CopyToBinaryOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    Oid            func_oid;
+    bool        is_varlena;
+
+    /* Set output function for an attribute */
+    getTypeBinaryOutputInfo(atttypid, &func_oid, &is_varlena);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyToBinaryOneRow
+ *
+ * Process one row for binary format.
+ */
+static void
+CopyToBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    FmgrInfo   *out_functions = cstate->out_functions;
+
+    /* Binary per-tuple header */
+    CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+    foreach_int(attnum, cstate->attnumlist)
+    {
+        Datum        value = slot->tts_values[attnum - 1];
+        bool        isnull = slot->tts_isnull[attnum - 1];
+
+        if (isnull)
+        {
+            CopySendInt32(cstate, -1);
+        }
+        else
+        {
+            bytea       *outputbytes;
+
+            outputbytes = SendFunctionCall(&out_functions[attnum - 1],
+                                           value);
+            CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+            CopySendData(cstate, VARDATA(outputbytes),
+                         VARSIZE(outputbytes) - VARHDRSZ);
+        }
+    }
+
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CopyToBinaryEnd
+ *
+ * End of COPY TO for binary format.
+ */
+static void
+CopyToBinaryEnd(CopyToState cstate)
+{
+    /* Generate trailer for a binary copy */
+    CopySendInt16(cstate, -1);
+    /* Need to flush out the trailer */
+    CopySendEndOfRow(cstate);
+}
+
+/*
+ * CSV and text share the same implementation, at the exception of the
+ * output representation and per-row callbacks.
+ */
+static const CopyToRoutine CopyToRoutineText = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToTextOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineCSV = {
+    .CopyToStart = CopyToTextLikeStart,
+    .CopyToOutFunc = CopyToTextLikeOutFunc,
+    .CopyToOneRow = CopyToCSVOneRow,
+    .CopyToEnd = CopyToTextLikeEnd,
+};
+
+static const CopyToRoutine CopyToRoutineBinary = {
+    .CopyToStart = CopyToBinaryStart,
+    .CopyToOutFunc = CopyToBinaryOutFunc,
+    .CopyToOneRow = CopyToBinaryOneRow,
+    .CopyToEnd = CopyToBinaryEnd,
+};
+
+/*
+ * Define the COPY TO routines to use for a format.  This should be called
+ * after options are parsed.
+ */
+static const CopyToRoutine *
+CopyToGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyToRoutineCSV;
+    else if (opts.binary)
+        return &CopyToRoutineBinary;
+
+    /* default is text */
+    return &CopyToRoutineText;
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -195,16 +506,6 @@ CopySendEndOfRow(CopyToState cstate)
     switch (cstate->copy_dest)
     {
         case COPY_FILE:
-            if (!cstate->opts.binary)
-            {
-                /* Default line termination depends on platform */
-#ifndef WIN32
-                CopySendChar(cstate, '\n');
-#else
-                CopySendString(cstate, "\r\n");
-#endif
-            }
-
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -239,10 +540,6 @@ CopySendEndOfRow(CopyToState cstate)
             }
             break;
         case COPY_FRONTEND:
-            /* The FE/BE protocol uses \n as newline for all platforms */
-            if (!cstate->opts.binary)
-                CopySendChar(cstate, '\n');
-
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
@@ -430,6 +727,9 @@ BeginCopyTo(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyToGetRoutine(cstate->opts);
+
     /* Process the source/target relation or query */
     if (rel)
     {
@@ -775,27 +1075,10 @@ DoCopyTo(CopyToState cstate)
     foreach(cur, cstate->attnumlist)
     {
         int            attnum = lfirst_int(cur);
-        Oid            out_func_oid;
-        bool        isvarlena;
         Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
 
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryOutputInfo(attr->atttypid,
-                                    &out_func_oid,
-                                    &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
-                                           &cstate->out_functions[attnum - 1]);
-        else
-        {
-            getTypeOutputInfo(attr->atttypid,
-                              &out_func_oid,
-                              &isvarlena);
-            fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-        }
+        cstate->routine->CopyToOutFunc(cstate, attr->atttypid,
+                                       &cstate->out_functions[attnum - 1]);
     }
 
     /*
@@ -808,58 +1091,7 @@ DoCopyTo(CopyToState cstate)
                                                "COPY TO",
                                                ALLOCSET_DEFAULT_SIZES);
 
-    if (cstate->opts.binary)
-    {
-        /* Generate header for a binary copy */
-        int32        tmp;
-
-        /* Signature */
-        CopySendData(cstate, BinarySignature, 11);
-        /* Flags field */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-        /* No header extension */
-        tmp = 0;
-        CopySendInt32(cstate, tmp);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToStart(cstate, tupDesc);
-    else
-    {
-        /*
-         * For non-binary copy, we need to convert null_print to file
-         * encoding, because it will be sent directly with CopySendString.
-         */
-        if (cstate->need_transcoding)
-            cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-                                                              cstate->opts.null_print_len,
-                                                              cstate->file_encoding);
-
-        /* if a header has been requested send the line */
-        if (cstate->opts.header_line)
-        {
-            bool        hdr_delim = false;
-
-            foreach(cur, cstate->attnumlist)
-            {
-                int            attnum = lfirst_int(cur);
-                char       *colname;
-
-                if (hdr_delim)
-                    CopySendChar(cstate, cstate->opts.delim[0]);
-                hdr_delim = true;
-
-                colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, colname, false);
-                else
-                    CopyAttributeOutText(cstate, colname);
-            }
-
-            CopySendEndOfRow(cstate);
-        }
-    }
+    cstate->routine->CopyToStart(cstate, tupDesc);
 
     if (cstate->rel)
     {
@@ -898,15 +1130,7 @@ DoCopyTo(CopyToState cstate)
         processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
     }
 
-    if (cstate->opts.binary)
-    {
-        /* Generate trailer for a binary copy */
-        CopySendInt16(cstate, -1);
-        /* Need to flush out the trailer */
-        CopySendEndOfRow(cstate);
-    }
-    else if (cstate->routine)
-        cstate->routine->CopyToEnd(cstate);
+    cstate->routine->CopyToEnd(cstate);
 
     MemoryContextDelete(cstate->rowcontext);
 
@@ -922,7 +1146,6 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-    FmgrInfo   *out_functions = cstate->out_functions;
     MemoryContext oldcontext;
 
     MemoryContextReset(cstate->rowcontext);
@@ -931,69 +1154,7 @@ CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
     /* Make sure the tuple is fully deconstructed */
     slot_getallattrs(slot);
 
-    if (cstate->routine)
-    {
-        cstate->routine->CopyToOneRow(cstate, slot);
-        MemoryContextSwitchTo(oldcontext);
-        return;
-    }
-
-    if (cstate->opts.binary)
-    {
-        /* Binary per-tuple header */
-        CopySendInt16(cstate, list_length(cstate->attnumlist));
-    }
-
-    if (!cstate->opts.binary)
-    {
-        bool        need_delim = false;
-
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            char       *string;
-
-            if (need_delim)
-                CopySendChar(cstate, cstate->opts.delim[0]);
-            need_delim = true;
-
-            if (isnull)
-                CopySendString(cstate, cstate->opts.null_print_client);
-            else
-            {
-                string = OutputFunctionCall(&out_functions[attnum - 1],
-                                            value);
-                if (cstate->opts.csv_mode)
-                    CopyAttributeOutCSV(cstate, string,
-                                        cstate->opts.force_quote_flags[attnum - 1]);
-                else
-                    CopyAttributeOutText(cstate, string);
-            }
-        }
-    }
-    else
-    {
-        foreach_int(attnum, cstate->attnumlist)
-        {
-            Datum        value = slot->tts_values[attnum - 1];
-            bool        isnull = slot->tts_isnull[attnum - 1];
-            bytea       *outputbytes;
-
-            if (isnull)
-                CopySendInt32(cstate, -1);
-            else
-            {
-                outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-                                               value);
-                CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-                CopySendData(cstate, VARDATA(outputbytes),
-                             VARSIZE(outputbytes) - VARHDRSZ);
-            }
-        }
-    }
-
-    CopySendEndOfRow(cstate);
+    cstate->routine->CopyToOneRow(cstate, slot);
 
     MemoryContextSwitchTo(oldcontext);
 }
-- 
2.45.2

From aadcd4bb260d14fc40ab69178fcc85271307b4a9 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:40:54 +0900
Subject: [PATCH v23 03/10] Add support for adding custom COPY TO format

This uses the handler approach like tablesample. The approach creates
an internal function that returns an internal struct. In this case,
a COPY TO handler returns a CopyToRoutine.

This also add a test module for custom COPY TO handler.
---
 src/backend/commands/copy.c                   | 82 ++++++++++++++++---
 src/backend/commands/copyto.c                 |  4 +-
 src/backend/nodes/Makefile                    |  1 +
 src/backend/nodes/gen_node_support.pl         |  2 +
 src/backend/utils/adt/pseudotypes.c           |  1 +
 src/include/catalog/pg_proc.dat               |  6 ++
 src/include/catalog/pg_type.dat               |  6 ++
 src/include/commands/copy.h                   |  1 +
 src/include/commands/copyapi.h                |  2 +
 src/include/nodes/meson.build                 |  1 +
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_copy_format/.gitignore  |  4 +
 src/test/modules/test_copy_format/Makefile    | 23 ++++++
 .../expected/test_copy_format.out             | 17 ++++
 src/test/modules/test_copy_format/meson.build | 33 ++++++++
 .../test_copy_format/sql/test_copy_format.sql |  5 ++
 .../test_copy_format--1.0.sql                 |  8 ++
 .../test_copy_format/test_copy_format.c       | 63 ++++++++++++++
 .../test_copy_format/test_copy_format.control |  4 +
 20 files changed, 251 insertions(+), 14 deletions(-)
 create mode 100644 src/test/modules/test_copy_format/.gitignore
 create mode 100644 src/test/modules/test_copy_format/Makefile
 create mode 100644 src/test/modules/test_copy_format/expected/test_copy_format.out
 create mode 100644 src/test/modules/test_copy_format/meson.build
 create mode 100644 src/test/modules/test_copy_format/sql/test_copy_format.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format--1.0.sql
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.c
 create mode 100644 src/test/modules/test_copy_format/test_copy_format.control

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3485ba8663f..02528fbcc1f 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -32,6 +32,7 @@
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -462,6 +463,73 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
     return COPY_LOG_VERBOSITY_DEFAULT;    /* keep compiler quiet */
 }
 
+/*
+ * Process the "format" option.
+ *
+ * This function checks whether the option value is a built-in format such as
+ * "text" and "csv" or not. If the option value isn't a built-in format, this
+ * function finds a COPY format handler that returns a CopyToRoutine (for
+ * is_from == false). If no COPY format handler is found, this function
+ * reports an error.
+ */
+static void
+ProcessCopyOptionFormat(ParseState *pstate,
+                        CopyFormatOptions *opts_out,
+                        bool is_from,
+                        DefElem *defel)
+{
+    char       *format;
+    Oid            funcargtypes[1];
+    Oid            handlerOid = InvalidOid;
+    Datum        datum;
+    Node       *routine;
+
+    format = defGetString(defel);
+
+    /* built-in formats */
+    if (strcmp(format, "text") == 0)
+         /* default format */ return;
+    else if (strcmp(format, "csv") == 0)
+    {
+        opts_out->csv_mode = true;
+        return;
+    }
+    else if (strcmp(format, "binary") == 0)
+    {
+        opts_out->binary = true;
+        return;
+    }
+
+    /* custom format */
+    if (!is_from)
+    {
+        funcargtypes[0] = INTERNALOID;
+        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                    funcargtypes, true);
+    }
+    if (!OidIsValid(handlerOid))
+        ereport(ERROR,
+                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY format \"%s\" not recognized", format),
+                 parser_errposition(pstate, defel->location)));
+
+    datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
+    routine = (Node *) DatumGetPointer(datum);
+    if (routine == NULL || !IsA(routine, CopyToRoutine))
+        ereport(
+                ERROR,
+                (errcode(
+                         ERRCODE_INVALID_PARAMETER_VALUE),
+                 errmsg("COPY handler function "
+                        "%s(%u) did not return a "
+                        "CopyToRoutine struct",
+                        format, handlerOid),
+                 parser_errposition(
+                                    pstate, defel->location)));
+
+    opts_out->routine = routine;
+}
+
 /*
  * Process the statement option list for COPY.
  *
@@ -505,22 +573,10 @@ ProcessCopyOptions(ParseState *pstate,
 
         if (strcmp(defel->defname, "format") == 0)
         {
-            char       *fmt = defGetString(defel);
-
             if (format_specified)
                 errorConflictingDefElem(defel, pstate);
             format_specified = true;
-            if (strcmp(fmt, "text") == 0)
-                 /* default format */ ;
-            else if (strcmp(fmt, "csv") == 0)
-                opts_out->csv_mode = true;
-            else if (strcmp(fmt, "binary") == 0)
-                opts_out->binary = true;
-            else
-                ereport(ERROR,
-                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                         errmsg("COPY format \"%s\" not recognized", fmt),
-                         parser_errposition(pstate, defel->location)));
+            ProcessCopyOptionFormat(pstate, opts_out, is_from, defel);
         }
         else if (strcmp(defel->defname, "freeze") == 0)
         {
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 46f3507a8b5..1f1d2baf9be 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -431,7 +431,9 @@ static const CopyToRoutine CopyToRoutineBinary = {
 static const CopyToRoutine *
 CopyToGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyToRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyToRoutineCSV;
     else if (opts.binary)
         return &CopyToRoutineBinary;
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index 66bbad8e6e0..173ee11811c 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -49,6 +49,7 @@ node_headers = \
     access/sdir.h \
     access/tableam.h \
     access/tsmapi.h \
+    commands/copyapi.h \
     commands/event_trigger.h \
     commands/trigger.h \
     executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index 81df3bdf95f..428ab4f0d93 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copyapi.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index e189e9b79d2..25f24ab95d2 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -370,6 +370,7 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(fdw_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(table_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(index_am_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(tsm_handler);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
 PSEUDOTYPE_DUMMY_IO_FUNCS(internal);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index f23321a41f1..6af90a26374 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7761,6 +7761,12 @@
 { oid => '3312', descr => 'I/O',
   proname => 'tsm_handler_out', prorettype => 'cstring',
   proargtypes => 'tsm_handler', prosrc => 'tsm_handler_out' },
+{ oid => '8753', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f', prorettype => 'copy_handler',
+  proargtypes => 'cstring', prosrc => 'copy_handler_in' },
+{ oid => '8754', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '267', descr => 'I/O',
   proname => 'table_am_handler_in', proisstrict => 'f',
   prorettype => 'table_am_handler', proargtypes => 'cstring',
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index ceff66ccde1..793dd671935 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -633,6 +633,12 @@
   typcategory => 'P', typinput => 'tsm_handler_in',
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '8752',
+  descr => 'pseudo-type for the result of a copy to method function',
+  typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '269',
   descr => 'pseudo-type for the result of a table AM handler function',
   typname => 'table_am_handler', typlen => '4', typbyval => 't', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 4002a7f5382..7659d8ae32f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -87,6 +87,7 @@ typedef struct CopyFormatOptions
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     int64        reject_limit;    /* maximum tolerable number of errors */
     List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine (can be NULL) */
 } CopyFormatOptions;
 
 /* These are private in commands/copy[from|to].c */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 5ce24f195dc..05b7d92ddba 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -26,6 +26,8 @@ typedef struct CopyToStateData *CopyToState;
  */
 typedef struct CopyToRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY TO is started to set up the output functions
      * associated with the relation's attributes reading from.  `finfo` can be
diff --git a/src/include/nodes/meson.build b/src/include/nodes/meson.build
index b665e55b657..103df1a7873 100644
--- a/src/include/nodes/meson.build
+++ b/src/include/nodes/meson.build
@@ -11,6 +11,7 @@ node_support_input_i = [
   'access/sdir.h',
   'access/tableam.h',
   'access/tsmapi.h',
+  'commands/copyapi.h',
   'commands/event_trigger.h',
   'commands/trigger.h',
   'executor/tuptable.h',
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index c0d3cf0e14b..33e3a49a4fb 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -15,6 +15,7 @@ SUBDIRS = \
           spgist_name_ops \
           test_bloomfilter \
           test_copy_callbacks \
+          test_copy_format \
           test_custom_rmgrs \
           test_ddl_deparse \
           test_dsa \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index c829b619530..75b6ab1b6a9 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -14,6 +14,7 @@ subdir('spgist_name_ops')
 subdir('ssl_passphrase_callback')
 subdir('test_bloomfilter')
 subdir('test_copy_callbacks')
+subdir('test_copy_format')
 subdir('test_custom_rmgrs')
 subdir('test_ddl_deparse')
 subdir('test_dsa')
diff --git a/src/test/modules/test_copy_format/.gitignore b/src/test/modules/test_copy_format/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_copy_format/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_copy_format/Makefile b/src/test/modules/test_copy_format/Makefile
new file mode 100644
index 00000000000..8497f91624d
--- /dev/null
+++ b/src/test/modules/test_copy_format/Makefile
@@ -0,0 +1,23 @@
+# src/test/modules/test_copy_format/Makefile
+
+MODULE_big = test_copy_format
+OBJS = \
+    $(WIN32RES) \
+    test_copy_format.o
+PGFILEDESC = "test_copy_format - test custom COPY FORMAT"
+
+EXTENSION = test_copy_format
+DATA = test_copy_format--1.0.sql
+
+REGRESS = test_copy_format
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_copy_format
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
new file mode 100644
index 00000000000..606c78f6878
--- /dev/null
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -0,0 +1,17 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+ERROR:  COPY format "test_copy_format" not recognized
+LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')...
+                                          ^
+COPY public.test TO stdout WITH (format 'test_copy_format');
+NOTICE:  test_copy_format: is_from=false
+NOTICE:  CopyToOutFunc: atttypid=21
+NOTICE:  CopyToOutFunc: atttypid=23
+NOTICE:  CopyToOutFunc: atttypid=20
+NOTICE:  CopyToStart: natts=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToOneRow: tts_nvalid=3
+NOTICE:  CopyToEnd
diff --git a/src/test/modules/test_copy_format/meson.build b/src/test/modules/test_copy_format/meson.build
new file mode 100644
index 00000000000..4cefe7b709a
--- /dev/null
+++ b/src/test/modules/test_copy_format/meson.build
@@ -0,0 +1,33 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+test_copy_format_sources = files(
+  'test_copy_format.c',
+)
+
+if host_system == 'windows'
+  test_copy_format_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_copy_format',
+    '--FILEDESC', 'test_copy_format - test custom COPY FORMAT',])
+endif
+
+test_copy_format = shared_module('test_copy_format',
+  test_copy_format_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_copy_format
+
+test_install_data += files(
+  'test_copy_format.control',
+  'test_copy_format--1.0.sql',
+)
+
+tests += {
+  'name': 'test_copy_format',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'regress': {
+    'sql': [
+      'test_copy_format',
+    ],
+  },
+}
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
new file mode 100644
index 00000000000..9406b3be3d4
--- /dev/null
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -0,0 +1,5 @@
+CREATE EXTENSION test_copy_format;
+CREATE TABLE public.test (a smallint, b integer, c bigint);
+INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
+COPY public.test FROM stdin WITH (format 'test_copy_format');
+COPY public.test TO stdout WITH (format 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format--1.0.sql
b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
new file mode 100644
index 00000000000..d24ea03ce99
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format--1.0.sql
@@ -0,0 +1,8 @@
+/* src/test/modules/test_copy_format/test_copy_format--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_copy_format" to load this file. \quit
+
+CREATE FUNCTION test_copy_format(internal)
+    RETURNS copy_handler
+    AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
new file mode 100644
index 00000000000..e064f40473b
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -0,0 +1,63 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_copy_format.c
+ *        Code for testing custom COPY format.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *        src/test/modules/test_copy_format/test_copy_format.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "commands/copyapi.h"
+#include "commands/defrem.h"
+
+PG_MODULE_MAGIC;
+
+static void
+CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
+{
+    ereport(NOTICE, (errmsg("CopyToOutFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyToStart(CopyToState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyToStart: natts=%d", tupDesc->natts)));
+}
+
+static void
+CopyToOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+    ereport(NOTICE, (errmsg("CopyToOneRow: tts_nvalid=%u", slot->tts_nvalid)));
+}
+
+static void
+CopyToEnd(CopyToState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyToEnd")));
+}
+
+static const CopyToRoutine CopyToRoutineTestCopyFormat = {
+    .type = T_CopyToRoutine,
+    .CopyToOutFunc = CopyToOutFunc,
+    .CopyToStart = CopyToStart,
+    .CopyToOneRow = CopyToOneRow,
+    .CopyToEnd = CopyToEnd,
+};
+
+PG_FUNCTION_INFO_V1(test_copy_format);
+Datum
+test_copy_format(PG_FUNCTION_ARGS)
+{
+    bool        is_from = PG_GETARG_BOOL(0);
+
+    ereport(NOTICE,
+            (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
+
+    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+}
diff --git a/src/test/modules/test_copy_format/test_copy_format.control
b/src/test/modules/test_copy_format/test_copy_format.control
new file mode 100644
index 00000000000..f05a6362358
--- /dev/null
+++ b/src/test/modules/test_copy_format/test_copy_format.control
@@ -0,0 +1,4 @@
+comment = 'Test code for custom COPY format'
+default_version = '1.0'
+module_pathname = '$libdir/test_copy_format'
+relocatable = true
-- 
2.45.2

From eedabed5e0e1a9f8f9e5591b806c6f606229874a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:56:36 +0900
Subject: [PATCH v23 04/10] Export CopyToStateData

It's for custom COPY TO format handlers implemented as extension.

This just moves codes. This doesn't change codes except CopyDest enum
values. CopyDest/CopyFrom enum values such as COPY_FILE are conflicted
each other. So COPY_DEST_ prefix instead of COPY_ prefix is used for
CopyDest enum values. For example, COPY_FILE in CopyDest is renamed to
COPY_DEST_FILE.

Note that this isn't enough to implement custom COPY TO format
handlers as extension. We'll do the followings in a subsequent commit:

1. Add an opaque space for custom COPY TO format handler
2. Export CopySendEndOfRow() to flush buffer
---
 src/backend/commands/copyto.c  |  77 ++----------------
 src/include/commands/copy.h    |  77 +-----------------
 src/include/commands/copyapi.h | 137 ++++++++++++++++++++++++++++++++-
 3 files changed, 146 insertions(+), 145 deletions(-)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 1f1d2baf9be..fb68f42ce1e 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -37,67 +37,6 @@
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
-/*
- * Represents the different dest cases we need to worry about at
- * the bottom level
- */
-typedef enum CopyDest
-{
-    COPY_FILE,                    /* to file (or a piped program) */
-    COPY_FRONTEND,                /* to frontend */
-    COPY_CALLBACK,                /* to callback function */
-} CopyDest;
-
-/*
- * This struct contains all the state variables used throughout a COPY TO
- * operation.
- *
- * Multi-byte encodings: all supported client-side encodings encode multi-byte
- * characters by having the first byte's high bit set. Subsequent bytes of the
- * character can have the high bit not set. When scanning data in such an
- * encoding to look for a match to a single-byte (ie ASCII) character, we must
- * use the full pg_encoding_mblen() machinery to skip over multibyte
- * characters, else we might find a false match to a trailing byte. In
- * supported server encodings, there is no possibility of a false match, and
- * it's faster to make useless comparisons to trailing bytes than it is to
- * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
- * when we have to do it the hard way.
- */
-typedef struct CopyToStateData
-{
-    /* format routine */
-    const CopyToRoutine *routine;
-
-    /* low-level state data */
-    CopyDest    copy_dest;        /* type of copy source/destination */
-    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
-
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy to */
-    QueryDesc  *queryDesc;        /* executable query to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDOUT */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_dest_cb data_dest_cb; /* function for writing data */
-
-    CopyFormatOptions opts;
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    FmgrInfo   *out_functions;    /* lookup info for output functions */
-    MemoryContext rowcontext;    /* per-row evaluation context */
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyToStateData;
-
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -143,7 +82,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
 {
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             /* Default line termination depends on platform */
 #ifndef WIN32
             CopySendChar(cstate, '\n');
@@ -151,7 +90,7 @@ CopyToTextLikeSendEndOfRow(CopyToState cstate)
             CopySendString(cstate, "\r\n");
 #endif
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* The FE/BE protocol uses \n as newline for all platforms */
             CopySendChar(cstate, '\n');
             break;
@@ -460,7 +399,7 @@ SendCopyBegin(CopyToState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_dest = COPY_FRONTEND;
+    cstate->copy_dest = COPY_DEST_FRONTEND;
 }
 
 static void
@@ -507,7 +446,7 @@ CopySendEndOfRow(CopyToState cstate)
 
     switch (cstate->copy_dest)
     {
-        case COPY_FILE:
+        case COPY_DEST_FILE:
             if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
                        cstate->copy_file) != 1 ||
                 ferror(cstate->copy_file))
@@ -541,11 +480,11 @@ CopySendEndOfRow(CopyToState cstate)
                              errmsg("could not write to COPY file: %m")));
             }
             break;
-        case COPY_FRONTEND:
+        case COPY_DEST_FRONTEND:
             /* Dump the accumulated row as one CopyData message */
             (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
             break;
-        case COPY_CALLBACK:
+        case COPY_DEST_CALLBACK:
             cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
             break;
     }
@@ -929,12 +868,12 @@ BeginCopyTo(ParseState *pstate,
     /* See Multibyte encoding comment above */
     cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
 
-    cstate->copy_dest = COPY_FILE;    /* default */
+    cstate->copy_dest = COPY_DEST_FILE; /* default */
 
     if (data_dest_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_dest = COPY_CALLBACK;
+        cstate->copy_dest = COPY_DEST_CALLBACK;
         cstate->data_dest_cb = data_dest_cb;
     }
     else if (pipe)
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 7659d8ae32f..dd645eaa030 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,88 +14,15 @@
 #ifndef COPY_H
 #define COPY_H
 
-#include "nodes/execnodes.h"
+#include "commands/copyapi.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/*
- * Represents whether a header line should be present, and whether it must
- * match the actual names (which implies "true").
- */
-typedef enum CopyHeaderChoice
-{
-    COPY_HEADER_FALSE = 0,
-    COPY_HEADER_TRUE,
-    COPY_HEADER_MATCH,
-} CopyHeaderChoice;
-
-/*
- * Represents where to save input processing errors.  More values to be added
- * in the future.
- */
-typedef enum CopyOnErrorChoice
-{
-    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
-    COPY_ON_ERROR_IGNORE,        /* ignore errors */
-} CopyOnErrorChoice;
-
-/*
- * Represents verbosity of logged messages by COPY command.
- */
-typedef enum CopyLogVerbosityChoice
-{
-    COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */
-    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is
-                                     * the default, assign 0 */
-    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
-} CopyLogVerbosityChoice;
-
-/*
- * A struct to hold COPY options, in a parsed form. All of these are related
- * to formatting, except for 'freeze', which doesn't really belong here, but
- * it's expedient to parse it along with all the other options.
- */
-typedef struct CopyFormatOptions
-{
-    /* parameters from the COPY command */
-    int            file_encoding;    /* file or remote side's character encoding,
-                                 * -1 if not specified */
-    bool        binary;            /* binary format? */
-    bool        freeze;            /* freeze rows on loading? */
-    bool        csv_mode;        /* Comma Separated Value format? */
-    CopyHeaderChoice header_line;    /* header line? */
-    char       *null_print;        /* NULL marker string (server encoding!) */
-    int            null_print_len; /* length of same */
-    char       *null_print_client;    /* same converted to file encoding */
-    char       *default_print;    /* DEFAULT marker string */
-    int            default_print_len;    /* length of same */
-    char       *delim;            /* column delimiter (must be 1 byte) */
-    char       *quote;            /* CSV quote char (must be 1 byte) */
-    char       *escape;            /* CSV escape char (must be 1 byte) */
-    List       *force_quote;    /* list of column names */
-    bool        force_quote_all;    /* FORCE_QUOTE *? */
-    bool       *force_quote_flags;    /* per-column CSV FQ flags */
-    List       *force_notnull;    /* list of column names */
-    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
-    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
-    List       *force_null;        /* list of column names */
-    bool        force_null_all; /* FORCE_NULL *? */
-    bool       *force_null_flags;    /* per-column CSV FN flags */
-    bool        convert_selectively;    /* do selective binary conversion? */
-    CopyOnErrorChoice on_error; /* what to do when error happened */
-    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
-    int64        reject_limit;    /* maximum tolerable number of errors */
-    List       *convert_select; /* list of column names (can be NIL) */
-    Node       *routine;        /* CopyToRoutine (can be NULL) */
-} CopyFormatOptions;
-
-/* These are private in commands/copy[from|to].c */
+/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-typedef void (*copy_data_dest_cb) (void *data, int len);
 
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 05b7d92ddba..b6ddb5f6216 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -14,10 +14,82 @@
 #ifndef COPYAPI_H
 #define COPYAPI_H
 
+#include "commands/trigger.h"
+#include "executor/execdesc.h"
 #include "executor/tuptable.h"
 #include "nodes/execnodes.h"
 
-/* This is private in commands/copyto.c */
+/*
+ * Represents whether a header line should be present, and whether it must
+ * match the actual names (which implies "true").
+ */
+typedef enum CopyHeaderChoice
+{
+    COPY_HEADER_FALSE = 0,
+    COPY_HEADER_TRUE,
+    COPY_HEADER_MATCH,
+} CopyHeaderChoice;
+
+/*
+ * Represents where to save input processing errors.  More values to be added
+ * in the future.
+ */
+typedef enum CopyOnErrorChoice
+{
+    COPY_ON_ERROR_STOP = 0,        /* immediately throw errors, default */
+    COPY_ON_ERROR_IGNORE,        /* ignore errors */
+} CopyOnErrorChoice;
+
+/*
+ * Represents verbosity of logged messages by COPY command.
+ */
+typedef enum CopyLogVerbosityChoice
+{
+    COPY_LOG_VERBOSITY_SILENT = -1, /* logs none */
+    COPY_LOG_VERBOSITY_DEFAULT = 0, /* logs no additional messages. As this is
+                                     * the default, assign 0 */
+    COPY_LOG_VERBOSITY_VERBOSE, /* logs additional messages */
+} CopyLogVerbosityChoice;
+
+/*
+ * A struct to hold COPY options, in a parsed form. All of these are related
+ * to formatting, except for 'freeze', which doesn't really belong here, but
+ * it's expedient to parse it along with all the other options.
+ */
+typedef struct CopyFormatOptions
+{
+    /* parameters from the COPY command */
+    int            file_encoding;    /* file or remote side's character encoding,
+                                 * -1 if not specified */
+    bool        binary;            /* binary format? */
+    bool        freeze;            /* freeze rows on loading? */
+    bool        csv_mode;        /* Comma Separated Value format? */
+    CopyHeaderChoice header_line;    /* header line? */
+    char       *null_print;        /* NULL marker string (server encoding!) */
+    int            null_print_len; /* length of same */
+    char       *null_print_client;    /* same converted to file encoding */
+    char       *default_print;    /* DEFAULT marker string */
+    int            default_print_len;    /* length of same */
+    char       *delim;            /* column delimiter (must be 1 byte) */
+    char       *quote;            /* CSV quote char (must be 1 byte) */
+    char       *escape;            /* CSV escape char (must be 1 byte) */
+    List       *force_quote;    /* list of column names */
+    bool        force_quote_all;    /* FORCE_QUOTE *? */
+    bool       *force_quote_flags;    /* per-column CSV FQ flags */
+    List       *force_notnull;    /* list of column names */
+    bool        force_notnull_all;    /* FORCE_NOT_NULL *? */
+    bool       *force_notnull_flags;    /* per-column CSV FNN flags */
+    List       *force_null;        /* list of column names */
+    bool        force_null_all; /* FORCE_NULL *? */
+    bool       *force_null_flags;    /* per-column CSV FN flags */
+    bool        convert_selectively;    /* do selective binary conversion? */
+    CopyOnErrorChoice on_error; /* what to do when error happened */
+    CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
+    int64        reject_limit;    /* maximum tolerable number of errors */
+    List       *convert_select; /* list of column names (can be NIL) */
+    Node       *routine;        /* CopyToRoutine (can be NULL) */
+} CopyFormatOptions;
+
 typedef struct CopyToStateData *CopyToState;
 
 /*
@@ -57,4 +129,67 @@ typedef struct CopyToRoutine
     void        (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * Represents the different dest cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopyDest
+{
+    COPY_DEST_FILE,                /* to file (or a piped program) */
+    COPY_DEST_FRONTEND,            /* to frontend */
+    COPY_DEST_CALLBACK,            /* to callback function */
+} CopyDest;
+
+typedef void (*copy_data_dest_cb) (void *data, int len);
+
+/*
+ * This struct contains all the state variables used throughout a COPY TO
+ * operation.
+ *
+ * Multi-byte encodings: all supported client-side encodings encode multi-byte
+ * characters by having the first byte's high bit set. Subsequent bytes of the
+ * character can have the high bit not set. When scanning data in such an
+ * encoding to look for a match to a single-byte (ie ASCII) character, we must
+ * use the full pg_encoding_mblen() machinery to skip over multibyte
+ * characters, else we might find a false match to a trailing byte. In
+ * supported server encodings, there is no possibility of a false match, and
+ * it's faster to make useless comparisons to trailing bytes than it is to
+ * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
+ * when we have to do it the hard way.
+ */
+typedef struct CopyToStateData
+{
+    /* format routine */
+    const CopyToRoutine *routine;
+
+    /* low-level state data */
+    CopyDest    copy_dest;        /* type of copy source/destination */
+    FILE       *copy_file;        /* used if copy_dest == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used for all dests during COPY TO */
+
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    bool        encoding_embeds_ascii;    /* ASCII can be non-first byte? */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy to */
+    QueryDesc  *queryDesc;        /* executable query to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDOUT */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_dest_cb data_dest_cb; /* function for writing data */
+
+    CopyFormatOptions opts;
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    FmgrInfo   *out_functions;    /* lookup info for output functions */
+    MemoryContext rowcontext;    /* per-row evaluation context */
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyToStateData;
+
 #endif                            /* COPYAPI_H */
-- 
2.45.2

From 90174da9f00b736b7c4900f046fcceae8cce74d5 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 28 Sep 2024 23:59:34 +0900
Subject: [PATCH v23 05/10] Add support for implementing custom COPY TO format
 as extension

* Add CopyToStateData::opaque that can be used to keep data for custom
  COPY TO format implementation
* Export CopySendEndOfRow() to flush data in CopyToStateData::fe_msgbuf
  as CopyToStateFlush()
---
 src/backend/commands/copyto.c  | 14 ++++++++++++++
 src/include/commands/copyapi.h |  5 +++++
 2 files changed, 19 insertions(+)

diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index fb68f42ce1e..93b041352c5 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -496,6 +496,20 @@ CopySendEndOfRow(CopyToState cstate)
     resetStringInfo(fe_msgbuf);
 }
 
+/*
+ * CopyToStateFlush
+ *
+ * Export CopySendEndOfRow() for extensions. We want to keep
+ * CopySendEndOfRow() as a static function for
+ * optimization. CopySendEndOfRow() calls in this file may be optimized by a
+ * compiler.
+ */
+void
+CopyToStateFlush(CopyToState cstate)
+{
+    CopySendEndOfRow(cstate);
+}
+
 /*
  * These functions do apply some data conversion
  */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index b6ddb5f6216..310a37ba728 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -190,6 +190,11 @@ typedef struct CopyToStateData
     FmgrInfo   *out_functions;    /* lookup info for output functions */
     MemoryContext rowcontext;    /* per-row evaluation context */
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyToStateData;
 
+extern void CopyToStateFlush(CopyToState cstate);
+
 #endif                            /* COPYAPI_H */
-- 
2.45.2

From e5a3351614bea13c8e5857c5f05bbc656ce5f34d Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sun, 29 Sep 2024 00:06:20 +0900
Subject: [PATCH v23 06/10] Add CopyFromRoutine

This is for implementing custom COPY FROM format. But this is not
enough to implement custom COPY FROM format yet. We'll export some
APIs to receive data and add "format" option to COPY FROM later.

Existing text/csv/binary format implementations don't use
CopyFromRoutine for now. We have a patch for it but we defer
it. Because there are some mysterious profile results in spite of we
get faster runtimes. See [1] for details.

[1] https://www.postgresql.org/message-id/ZdbtQJ-p5H1_EDwE%40paquier.xyz

Note that this doesn't change existing text/csv/binary format
implementations.
---
 src/backend/commands/copyfrom.c          | 24 ++++++++++--
 src/backend/commands/copyfromparse.c     |  5 +++
 src/include/commands/copyapi.h           | 47 +++++++++++++++++++++++-
 src/include/commands/copyfrom_internal.h |  4 ++
 src/tools/pgindent/typedefs.list         |  1 +
 5 files changed, 76 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 07cbd5d22b8..909375e81b7 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1635,12 +1635,22 @@ BeginCopyFrom(ParseState *pstate,
 
         /* Fetch the input function and typioparam info */
         if (cstate->opts.binary)
+        {
             getTypeBinaryInputInfo(att->atttypid,
                                    &in_func_oid, &typioparams[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
+        else if (cstate->routine)
+            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                            &in_functions[attnum - 1],
+                                            &typioparams[attnum - 1]);
+
         else
+        {
             getTypeInputInfo(att->atttypid,
                              &in_func_oid, &typioparams[attnum - 1]);
-        fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+        }
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1780,10 +1790,13 @@ BeginCopyFrom(ParseState *pstate,
         /* Read and verify binary header */
         ReceiveCopyBinaryHeader(cstate);
     }
-
-    /* create workspace for CopyReadAttributes results */
-    if (!cstate->opts.binary)
+    else if (cstate->routine)
     {
+        cstate->routine->CopyFromStart(cstate, tupDesc);
+    }
+    else
+    {
+        /* create workspace for CopyReadAttributes results */
         AttrNumber    attr_count = list_length(cstate->attnumlist);
 
         cstate->max_fields = attr_count;
@@ -1801,6 +1814,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+    if (cstate->routine)
+        cstate->routine->CopyFromEnd(cstate);
+
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
     {
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index d1d43b53d83..b104e4a9114 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1003,6 +1003,11 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 
         Assert(fieldno == attr_count);
     }
+    else if (cstate->routine)
+    {
+        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+            return false;
+    }
     else
     {
         /* binary */
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 310a37ba728..81b2f4e5c1f 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copyapi.h
- *      API for COPY TO handlers
+ *      API for COPY TO/FROM handlers
  *
  *
  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
@@ -90,6 +90,51 @@ typedef struct CopyFormatOptions
     Node       *routine;        /* CopyToRoutine (can be NULL) */
 } CopyFormatOptions;
 
+/* This is private in commands/copyfrom.c */
+typedef struct CopyFromStateData *CopyFromState;
+
+/*
+ * API structure for a COPY FROM format implementation.  Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+    /*
+     * Called when COPY FROM is started to set up the input functions
+     * associated with the relation's attributes writing to.  `finfo` can be
+     * optionally filled to provide the catalog information of the input
+     * function.  `typioparam` can be optionally filled to define the OID of
+     * the type to pass to the input function.  `atttypid` is the OID of data
+     * type used by the relation's attribute.
+     */
+    void        (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                   FmgrInfo *finfo, Oid *typioparam);
+
+    /*
+     * Called when COPY FROM is started.
+     *
+     * `tupDesc` is the tuple descriptor of the relation where the data needs
+     * to be copied.  This can be used for any initialization steps required
+     * by a format.
+     */
+    void        (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+    /*
+     * Copy one row to a set of `values` and `nulls` of size tupDesc->natts.
+     *
+     * 'econtext' is used to evaluate default expression for each column that
+     * is either not read from the file or is using the DEFAULT option of COPY
+     * FROM.  It is NULL if no default values are used.
+     *
+     * Returns false if there are no more tuples to copy.
+     */
+    bool        (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                   Datum *values, bool *nulls);
+
+    /* Called when COPY FROM has ended. */
+    void        (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
 typedef struct CopyToStateData *CopyToState;
 
 /*
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index cad52fcc783..509b9e92a18 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -15,6 +15,7 @@
 #define COPYFROM_INTERNAL_H
 
 #include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
@@ -58,6 +59,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+    /* format routine */
+    const CopyFromRoutine *routine;
+
     /* low-level state data */
     CopySource    copy_src;        /* type of copy source */
     FILE       *copy_file;        /* used if copy_src == COPY_FILE */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 098e7023486..a8422fa4d35 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -492,6 +492,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice
-- 
2.45.2

From 89a406fde3ea0dc013d8747b2130353dd911c4c0 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sun, 29 Sep 2024 00:09:29 +0900
Subject: [PATCH v23 07/10] Use CopyFromRoutine for the existing formats

The existing formats are text, csv and binary. If we find any
performance regression by this, we will not merge this to master.

This will increase indirect function call costs but this will reduce
runtime "if (cstate->opts.binary)" and "if (cstate->opts.csv_mode)"
branch costs.

This uses an optimization based of static inline function and a
constant argument call for cstate->opts.csv_mode. For example,
CopyFromTextLikeOneRow() uses this optimization. It accepts the "bool
is_csv" argument instead of using cstate->opts.csv_mode in
it. CopyFromTextOneRow() calls CopyFromTextLikeOneRow() with
false (constant) for "bool is_csv". Compiler will remove "if (is_csv)"
branch in it by this optimization.

This doesn't change existing logic. This just moves existing codes.
---
 src/backend/commands/copyfrom.c          | 215 ++++++---
 src/backend/commands/copyfromparse.c     | 530 +++++++++++++----------
 src/include/commands/copy.h              |   2 -
 src/include/commands/copyfrom_internal.h |   8 +
 4 files changed, 471 insertions(+), 284 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 909375e81b7..e6ea9ce1602 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,157 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+
+/*
+ * CopyFromRoutine implementations for text and CSV.
+ */
+
+/*
+ * CopyFromTextLikeInFunc
+ *
+ * Assign input function data for a relation's attribute in text/CSV format.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid,
+                       FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromTextLikeStart
+ *
+ * Start of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    AttrNumber    attr_count;
+
+    /*
+     * If encoding conversion is needed, we need another buffer to hold the
+     * converted input data.  Otherwise, we can just point input_buf to the
+     * same buffer as raw_buf.
+     */
+    if (cstate->need_transcoding)
+    {
+        cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+        cstate->input_buf_index = cstate->input_buf_len = 0;
+    }
+    else
+        cstate->input_buf = cstate->raw_buf;
+    cstate->input_reached_eof = false;
+
+    initStringInfo(&cstate->line_buf);
+
+    /*
+     * Create workspace for CopyReadAttributes results; used by CSV and text
+     * format.
+     */
+    attr_count = list_length(cstate->attnumlist);
+    cstate->max_fields = attr_count;
+    cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * CopyFromTextLikeEnd
+ *
+ * End of COPY FROM for text/CSV format.
+ */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * CopyFromRoutine implementation for "binary".
+ */
+
+/*
+ * CopyFromBinaryInFunc
+ *
+ * Assign input function data for a relation's attribute in binary format.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                     FmgrInfo *finfo, Oid *typioparam)
+{
+    Oid            func_oid;
+
+    getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+    fmgr_info(func_oid, finfo);
+}
+
+/*
+ * CopyFromBinaryStart
+ *
+ * Start of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    /* Read and verify binary header */
+    ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * CopyFromBinaryEnd
+ *
+ * End of COPY FROM for binary format.
+ */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+    /* nothing to do */
+}
+
+/*
+ * Routines assigned to each format.
++
+ * CSV and text share the same implementation, at the exception of the
+ * per-row callback.
+ */
+static const CopyFromRoutine CopyFromRoutineText = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromTextOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineCSV = {
+    .CopyFromInFunc = CopyFromTextLikeInFunc,
+    .CopyFromStart = CopyFromTextLikeStart,
+    .CopyFromOneRow = CopyFromCSVOneRow,
+    .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+static const CopyFromRoutine CopyFromRoutineBinary = {
+    .CopyFromInFunc = CopyFromBinaryInFunc,
+    .CopyFromStart = CopyFromBinaryStart,
+    .CopyFromOneRow = CopyFromBinaryOneRow,
+    .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/*
+ * Define the COPY FROM routines to use for a format.
+ */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+    if (opts.csv_mode)
+        return &CopyFromRoutineCSV;
+    else if (opts.binary)
+        return &CopyFromRoutineBinary;
+
+    /* default is text */
+    return &CopyFromRoutineText;
+}
+
+
 /*
  * error context callback for COPY FROM
  *
@@ -1396,7 +1547,6 @@ BeginCopyFrom(ParseState *pstate,
                 num_defaults;
     FmgrInfo   *in_functions;
     Oid           *typioparams;
-    Oid            in_func_oid;
     int           *defmap;
     ExprState **defexprs;
     MemoryContext oldcontext;
@@ -1428,6 +1578,9 @@ BeginCopyFrom(ParseState *pstate,
     /* Extract options from the statement node tree */
     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+    /* Set format routine */
+    cstate->routine = CopyFromGetRoutine(cstate->opts);
+
     /* Process the target relation */
     cstate->rel = rel;
 
@@ -1583,25 +1736,6 @@ BeginCopyFrom(ParseState *pstate,
     cstate->raw_buf_index = cstate->raw_buf_len = 0;
     cstate->raw_reached_eof = false;
 
-    if (!cstate->opts.binary)
-    {
-        /*
-         * If encoding conversion is needed, we need another buffer to hold
-         * the converted input data.  Otherwise, we can just point input_buf
-         * to the same buffer as raw_buf.
-         */
-        if (cstate->need_transcoding)
-        {
-            cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-            cstate->input_buf_index = cstate->input_buf_len = 0;
-        }
-        else
-            cstate->input_buf = cstate->raw_buf;
-        cstate->input_reached_eof = false;
-
-        initStringInfo(&cstate->line_buf);
-    }
-
     initStringInfo(&cstate->attribute_buf);
 
     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1634,23 +1768,9 @@ BeginCopyFrom(ParseState *pstate,
             continue;
 
         /* Fetch the input function and typioparam info */
-        if (cstate->opts.binary)
-        {
-            getTypeBinaryInputInfo(att->atttypid,
-                                   &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
-        else if (cstate->routine)
-            cstate->routine->CopyFromInFunc(cstate, att->atttypid,
-                                            &in_functions[attnum - 1],
-                                            &typioparams[attnum - 1]);
-
-        else
-        {
-            getTypeInputInfo(att->atttypid,
-                             &in_func_oid, &typioparams[attnum - 1]);
-            fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-        }
+        cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                        &in_functions[attnum - 1],
+                                        &typioparams[attnum - 1]);
 
         /* Get default info if available */
         defexprs[attnum - 1] = NULL;
@@ -1785,23 +1905,7 @@ BeginCopyFrom(ParseState *pstate,
 
     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-    if (cstate->opts.binary)
-    {
-        /* Read and verify binary header */
-        ReceiveCopyBinaryHeader(cstate);
-    }
-    else if (cstate->routine)
-    {
-        cstate->routine->CopyFromStart(cstate, tupDesc);
-    }
-    else
-    {
-        /* create workspace for CopyReadAttributes results */
-        AttrNumber    attr_count = list_length(cstate->attnumlist);
-
-        cstate->max_fields = attr_count;
-        cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-    }
+    cstate->routine->CopyFromStart(cstate, tupDesc);
 
     MemoryContextSwitchTo(oldcontext);
 
@@ -1814,8 +1918,7 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
-    if (cstate->routine)
-        cstate->routine->CopyFromEnd(cstate);
+    cstate->routine->CopyFromEnd(cstate);
 
     /* No COPY FROM related resources except memory. */
     if (cstate->is_program)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index b104e4a9114..0447c4df7e0 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -140,8 +140,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static pg_attribute_always_inline bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int    CopyReadAttributesText(CopyFromState cstate);
 static int    CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
@@ -741,8 +741,8 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  *
  * NOTE: force_not_null option are not applied to the returned fields.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
     int            fldct;
     bool        done;
@@ -759,13 +759,17 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
         tupDesc = RelationGetDescr(cstate->rel);
 
         cstate->cur_lineno++;
-        done = CopyReadLine(cstate);
+        done = CopyReadLine(cstate, is_csv);
 
         if (cstate->opts.header_line == COPY_HEADER_MATCH)
         {
             int            fldnum;
 
-            if (cstate->opts.csv_mode)
+            /*
+             * is_csv will be optimized away by compiler, as argument is
+             * constant at caller.
+             */
+            if (is_csv)
                 fldct = CopyReadAttributesCSV(cstate);
             else
                 fldct = CopyReadAttributesText(cstate);
@@ -809,7 +813,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     cstate->cur_lineno++;
 
     /* Actually read the line into memory here */
-    done = CopyReadLine(cstate);
+    done = CopyReadLine(cstate, is_csv);
 
     /*
      * EOF at start of line means we're done.  If we see EOF after some
@@ -819,8 +823,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     if (done && cstate->line_buf.len == 0)
         return false;
 
-    /* Parse the line into de-escaped field values */
-    if (cstate->opts.csv_mode)
+    /*
+     * Parse the line into de-escaped field values
+     *
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
         fldct = CopyReadAttributesCSV(cstate);
     else
         fldct = CopyReadAttributesText(cstate);
@@ -830,6 +839,267 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
     return true;
 }
 
+/*
+ * CopyFromTextLikeOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the text and CSV
+ * formats.
+ *
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate,
+                       ExprContext *econtext,
+                       Datum *values,
+                       bool *nulls,
+                       bool is_csv)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    ExprState **defexprs = cstate->defexprs;
+    char      **field_strings;
+    ListCell   *cur;
+    int            fldct;
+    int            fieldno;
+    char       *string;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    /* read raw fields in the next line */
+    if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+        return false;
+
+    /* check for overflowing fields */
+    if (attr_count > 0 && fldct > attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("extra data after last expected column")));
+
+    fieldno = 0;
+
+    /* Loop to read the user attributes on the line. */
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        if (fieldno >= fldct)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("missing data for column \"%s\"",
+                            NameStr(att->attname))));
+        string = field_strings[fieldno++];
+
+        if (cstate->convert_select_flags &&
+            !cstate->convert_select_flags[m])
+        {
+            /* ignore input field, leaving column as NULL */
+            continue;
+        }
+
+        if (is_csv)
+        {
+            if (string == NULL &&
+                cstate->opts.force_notnull_flags[m])
+            {
+                /*
+                 * FORCE_NOT_NULL option is set and column is NULL - convert
+                 * it to the NULL string.
+                 */
+                string = cstate->opts.null_print;
+            }
+            else if (string != NULL && cstate->opts.force_null_flags[m]
+                     && strcmp(string, cstate->opts.null_print) == 0)
+            {
+                /*
+                 * FORCE_NULL option is set and column matches the NULL
+                 * string. It must have been quoted, or otherwise the string
+                 * would already have been set to NULL. Convert it to NULL as
+                 * specified.
+                 */
+                string = NULL;
+            }
+        }
+
+        cstate->cur_attname = NameStr(att->attname);
+        cstate->cur_attval = string;
+
+        if (string != NULL)
+            nulls[m] = false;
+
+        if (cstate->defaults[m])
+        {
+            /*
+             * The caller must supply econtext and have switched into the
+             * per-tuple memory context in it.
+             */
+            Assert(econtext != NULL);
+            Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+            values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+        }
+
+        /*
+         * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+         */
+        else if (!InputFunctionCallSafe(&in_functions[m],
+                                        string,
+                                        typioparams[m],
+                                        att->atttypmod,
+                                        (Node *) cstate->escontext,
+                                        &values[m]))
+        {
+            Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+
+            cstate->num_errors++;
+
+            if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+            {
+                /*
+                 * Since we emit line number and column info in the below
+                 * notice message, we suppress error context information other
+                 * than the relation name.
+                 */
+                Assert(!cstate->relname_only);
+                cstate->relname_only = true;
+
+                if (cstate->cur_attval)
+                {
+                    char       *attval;
+
+                    attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname,
+                                   attval));
+                    pfree(attval);
+                }
+                else
+                    ereport(NOTICE,
+                            errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null
input",
+                                   (unsigned long long) cstate->cur_lineno,
+                                   cstate->cur_attname));
+
+                /* reset relname_only */
+                cstate->relname_only = false;
+            }
+
+            return true;
+        }
+
+        cstate->cur_attname = NULL;
+        cstate->cur_attval = NULL;
+    }
+
+    Assert(fieldno == attr_count);
+
+    return true;
+}
+
+
+/*
+ * CopyFromTextOneRow
+ *
+ * Per-row callback for COPY FROM with text format.
+ */
+bool
+CopyFromTextOneRow(CopyFromState cstate,
+                   ExprContext *econtext,
+                   Datum *values,
+                   bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/*
+ * CopyFromCSVOneRow
+ *
+ * Per-row callback for COPY FROM with CSV format.
+ */
+bool
+CopyFromCSVOneRow(CopyFromState cstate,
+                  ExprContext *econtext,
+                  Datum *values,
+                  bool *nulls)
+{
+    return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
+
+/*
+ * CopyFromBinaryOneRow
+ *
+ * Copy one row to a set of `values` and `nulls` for the binary format.
+ */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                     Datum *values, bool *nulls)
+{
+    TupleDesc    tupDesc;
+    AttrNumber    attr_count;
+    FmgrInfo   *in_functions = cstate->in_functions;
+    Oid           *typioparams = cstate->typioparams;
+    int16        fld_count;
+    ListCell   *cur;
+
+    tupDesc = RelationGetDescr(cstate->rel);
+    attr_count = list_length(cstate->attnumlist);
+
+    cstate->cur_lineno++;
+
+    if (!CopyGetInt16(cstate, &fld_count))
+    {
+        /* EOF detected (end of file, or protocol-level EOF) */
+        return false;
+    }
+
+    if (fld_count == -1)
+    {
+        /*
+         * Received EOF marker.  Wait for the protocol-level EOF, and complain
+         * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+         * that we correctly handle CopyFail, if client chooses to send that
+         * now.  When copying from file, we could ignore the rest of the file
+         * like in text mode, but we choose to be consistent with the COPY
+         * FROM STDIN case.
+         */
+        char        dummy;
+
+        if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+            ereport(ERROR,
+                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                     errmsg("received copy data after EOF marker")));
+        return false;
+    }
+
+    if (fld_count != attr_count)
+        ereport(ERROR,
+                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                 errmsg("row field count is %d, expected %d",
+                        (int) fld_count, attr_count)));
+
+    foreach(cur, cstate->attnumlist)
+    {
+        int            attnum = lfirst_int(cur);
+        int            m = attnum - 1;
+        Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+        cstate->cur_attname = NameStr(att->attname);
+        values[m] = CopyReadBinaryAttribute(cstate,
+                                            &in_functions[m],
+                                            typioparams[m],
+                                            att->atttypmod,
+                                            &nulls[m]);
+        cstate->cur_attname = NULL;
+    }
+
+    return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
@@ -847,221 +1117,21 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
     TupleDesc    tupDesc;
     AttrNumber    num_phys_attrs,
-                attr_count,
                 num_defaults = cstate->num_defaults;
-    FmgrInfo   *in_functions = cstate->in_functions;
-    Oid           *typioparams = cstate->typioparams;
     int            i;
     int           *defmap = cstate->defmap;
     ExprState **defexprs = cstate->defexprs;
 
     tupDesc = RelationGetDescr(cstate->rel);
     num_phys_attrs = tupDesc->natts;
-    attr_count = list_length(cstate->attnumlist);
 
     /* Initialize all values for row to NULL */
     MemSet(values, 0, num_phys_attrs * sizeof(Datum));
     MemSet(nulls, true, num_phys_attrs * sizeof(bool));
     MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-    if (!cstate->opts.binary)
-    {
-        char      **field_strings;
-        ListCell   *cur;
-        int            fldct;
-        int            fieldno;
-        char       *string;
-
-        /* read raw fields in the next line */
-        if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-            return false;
-
-        /* check for overflowing fields */
-        if (attr_count > 0 && fldct > attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("extra data after last expected column")));
-
-        fieldno = 0;
-
-        /* Loop to read the user attributes on the line. */
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            if (fieldno >= fldct)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("missing data for column \"%s\"",
-                                NameStr(att->attname))));
-            string = field_strings[fieldno++];
-
-            if (cstate->convert_select_flags &&
-                !cstate->convert_select_flags[m])
-            {
-                /* ignore input field, leaving column as NULL */
-                continue;
-            }
-
-            if (cstate->opts.csv_mode)
-            {
-                if (string == NULL &&
-                    cstate->opts.force_notnull_flags[m])
-                {
-                    /*
-                     * FORCE_NOT_NULL option is set and column is NULL -
-                     * convert it to the NULL string.
-                     */
-                    string = cstate->opts.null_print;
-                }
-                else if (string != NULL && cstate->opts.force_null_flags[m]
-                         && strcmp(string, cstate->opts.null_print) == 0)
-                {
-                    /*
-                     * FORCE_NULL option is set and column matches the NULL
-                     * string. It must have been quoted, or otherwise the
-                     * string would already have been set to NULL. Convert it
-                     * to NULL as specified.
-                     */
-                    string = NULL;
-                }
-            }
-
-            cstate->cur_attname = NameStr(att->attname);
-            cstate->cur_attval = string;
-
-            if (string != NULL)
-                nulls[m] = false;
-
-            if (cstate->defaults[m])
-            {
-                /*
-                 * The caller must supply econtext and have switched into the
-                 * per-tuple memory context in it.
-                 */
-                Assert(econtext != NULL);
-                Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-                values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-            }
-
-            /*
-             * If ON_ERROR is specified with IGNORE, skip rows with soft
-             * errors
-             */
-            else if (!InputFunctionCallSafe(&in_functions[m],
-                                            string,
-                                            typioparams[m],
-                                            att->atttypmod,
-                                            (Node *) cstate->escontext,
-                                            &values[m]))
-            {
-                Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
-
-                cstate->num_errors++;
-
-                if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-                {
-                    /*
-                     * Since we emit line number and column info in the below
-                     * notice message, we suppress error context information
-                     * other than the relation name.
-                     */
-                    Assert(!cstate->relname_only);
-                    cstate->relname_only = true;
-
-                    if (cstate->cur_attval)
-                    {
-                        char       *attval;
-
-                        attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
\"%s\"",
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname,
-                                       attval));
-                        pfree(attval);
-                    }
-                    else
-                        ereport(NOTICE,
-                                errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\":
nullinput",
 
-                                       (unsigned long long) cstate->cur_lineno,
-                                       cstate->cur_attname));
-
-                    /* reset relname_only */
-                    cstate->relname_only = false;
-                }
-
-                return true;
-            }
-
-            cstate->cur_attname = NULL;
-            cstate->cur_attval = NULL;
-        }
-
-        Assert(fieldno == attr_count);
-    }
-    else if (cstate->routine)
-    {
-        if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
-            return false;
-    }
-    else
-    {
-        /* binary */
-        int16        fld_count;
-        ListCell   *cur;
-
-        cstate->cur_lineno++;
-
-        if (!CopyGetInt16(cstate, &fld_count))
-        {
-            /* EOF detected (end of file, or protocol-level EOF) */
-            return false;
-        }
-
-        if (fld_count == -1)
-        {
-            /*
-             * Received EOF marker.  Wait for the protocol-level EOF, and
-             * complain if it doesn't come immediately.  In COPY FROM STDIN,
-             * this ensures that we correctly handle CopyFail, if client
-             * chooses to send that now.  When copying from file, we could
-             * ignore the rest of the file like in text mode, but we choose to
-             * be consistent with the COPY FROM STDIN case.
-             */
-            char        dummy;
-
-            if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                ereport(ERROR,
-                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         errmsg("received copy data after EOF marker")));
-            return false;
-        }
-
-        if (fld_count != attr_count)
-            ereport(ERROR,
-                    (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                     errmsg("row field count is %d, expected %d",
-                            (int) fld_count, attr_count)));
-
-        foreach(cur, cstate->attnumlist)
-        {
-            int            attnum = lfirst_int(cur);
-            int            m = attnum - 1;
-            Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-            cstate->cur_attname = NameStr(att->attname);
-            values[m] = CopyReadBinaryAttribute(cstate,
-                                                &in_functions[m],
-                                                typioparams[m],
-                                                att->atttypmod,
-                                                &nulls[m]);
-            cstate->cur_attname = NULL;
-        }
-    }
+    if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+        return false;
 
     /*
      * Now compute and insert any defaults available for the columns not
@@ -1092,7 +1162,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
     bool        result;
 
@@ -1100,7 +1170,7 @@ CopyReadLine(CopyFromState cstate)
     cstate->line_buf_valid = false;
 
     /* Parse data and transfer into line_buf */
-    result = CopyReadLineText(cstate);
+    result = CopyReadLineText(cstate, is_csv);
 
     if (result)
     {
@@ -1167,8 +1237,8 @@ CopyReadLine(CopyFromState cstate)
 /*
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
-static bool
-CopyReadLineText(CopyFromState cstate)
+static pg_attribute_always_inline bool
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
     char       *copy_input_buf;
     int            input_buf_ptr;
@@ -1183,7 +1253,11 @@ CopyReadLineText(CopyFromState cstate)
     char        quotec = '\0';
     char        escapec = '\0';
 
-    if (cstate->opts.csv_mode)
+    /*
+     * is_csv will be optimized away by compiler, as argument is constant at
+     * caller.
+     */
+    if (is_csv)
     {
         quotec = cstate->opts.quote[0];
         escapec = cstate->opts.escape[0];
@@ -1260,7 +1334,11 @@ CopyReadLineText(CopyFromState cstate)
         prev_raw_ptr = input_buf_ptr;
         c = copy_input_buf[input_buf_ptr++];
 
-        if (cstate->opts.csv_mode)
+        /*
+         * is_csv will be optimized away by compiler, as argument is constant
+         * at caller.
+         */
+        if (is_csv)
         {
             /*
              * If character is '\r', we may need to look ahead below.  Force
@@ -1299,7 +1377,7 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \r */
-        if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\r' && (!is_csv || !in_quote))
         {
             /* Check for \r\n on first line, _and_ handle \r\n. */
             if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1327,10 +1405,10 @@ CopyReadLineText(CopyFromState cstate)
                     if (cstate->eol_type == EOL_CRNL)
                         ereport(ERROR,
                                 (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errmsg("literal carriage return found in data") :
                                  errmsg("unquoted carriage return found in data"),
-                                 !cstate->opts.csv_mode ?
+                                 !is_csv ?
                                  errhint("Use \"\\r\" to represent carriage return.") :
                                  errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1344,10 +1422,10 @@ CopyReadLineText(CopyFromState cstate)
             else if (cstate->eol_type == EOL_NL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal carriage return found in data") :
                          errmsg("unquoted carriage return found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\r\" to represent carriage return.") :
                          errhint("Use quoted CSV field to represent carriage return.")));
             /* If reach here, we have found the line terminator */
@@ -1355,15 +1433,15 @@ CopyReadLineText(CopyFromState cstate)
         }
 
         /* Process \n */
-        if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+        if (c == '\n' && (!is_csv || !in_quote))
         {
             if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                 ereport(ERROR,
                         (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errmsg("literal newline found in data") :
                          errmsg("unquoted newline found in data"),
-                         !cstate->opts.csv_mode ?
+                         !is_csv ?
                          errhint("Use \"\\n\" to represent newline.") :
                          errhint("Use quoted CSV field to represent newline.")));
             cstate->eol_type = EOL_NL;    /* in case not set yet */
@@ -1375,7 +1453,7 @@ CopyReadLineText(CopyFromState cstate)
          * Process backslash, except in CSV mode where backslash is a normal
          * character.
          */
-        if (c == '\\' && !cstate->opts.csv_mode)
+        if (c == '\\' && !is_csv)
         {
             char        c2;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index dd645eaa030..e5696839637 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -35,8 +35,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                          Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 509b9e92a18..c11b5ff3cc0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -187,4 +187,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* Callbacks for CopyFromRoutine->CopyFromOneRow */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                               Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                              Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                 Datum *values, bool *nulls);
+
 #endif                            /* COPYFROM_INTERNAL_H */
-- 
2.45.2

From de95d4bd606519df51c8d68b5ab2f3009dfd0cf7 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sun, 29 Sep 2024 00:17:53 +0900
Subject: [PATCH v23 08/10] Add support for adding custom COPY FROM format

This uses the same handler for COPY TO and COPY FROM but uses
different routine. This uses CopyToRoutine for COPY TO and
CopyFromRoutine for COPY FROM. PostgreSQL calls a COPY TO/FROM handler
with "is_from" argument. It's true for COPY FROM and false for COPY
TO:

    copy_handler(true) returns CopyToRoutine
    copy_handler(false) returns CopyFromRoutine

This also add a test module for custom COPY FROM handler.
---
 src/backend/commands/copy.c                   | 52 ++++++++++++-------
 src/backend/commands/copyfrom.c               |  4 +-
 src/include/catalog/pg_type.dat               |  2 +-
 src/include/commands/copyapi.h                |  5 +-
 .../expected/test_copy_format.out             | 10 ++--
 .../test_copy_format/sql/test_copy_format.sql |  1 +
 .../test_copy_format/test_copy_format.c       | 39 +++++++++++++-
 7 files changed, 87 insertions(+), 26 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 02528fbcc1f..c8643b2dee7 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -469,8 +469,8 @@ defGetCopyLogVerbosityChoice(DefElem *def, ParseState *pstate)
  * This function checks whether the option value is a built-in format such as
  * "text" and "csv" or not. If the option value isn't a built-in format, this
  * function finds a COPY format handler that returns a CopyToRoutine (for
- * is_from == false). If no COPY format handler is found, this function
- * reports an error.
+ * is_from == false) or CopyFromRountine (for is_from == true). If no COPY
+ * format handler is found, this function reports an error.
  */
 static void
 ProcessCopyOptionFormat(ParseState *pstate,
@@ -501,12 +501,9 @@ ProcessCopyOptionFormat(ParseState *pstate,
     }
 
     /* custom format */
-    if (!is_from)
-    {
-        funcargtypes[0] = INTERNALOID;
-        handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
-                                    funcargtypes, true);
-    }
+    funcargtypes[0] = INTERNALOID;
+    handlerOid = LookupFuncName(list_make1(makeString(format)), 1,
+                                funcargtypes, true);
     if (!OidIsValid(handlerOid))
         ereport(ERROR,
                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -515,17 +512,34 @@ ProcessCopyOptionFormat(ParseState *pstate,
 
     datum = OidFunctionCall1(handlerOid, BoolGetDatum(is_from));
     routine = (Node *) DatumGetPointer(datum);
-    if (routine == NULL || !IsA(routine, CopyToRoutine))
-        ereport(
-                ERROR,
-                (errcode(
-                         ERRCODE_INVALID_PARAMETER_VALUE),
-                 errmsg("COPY handler function "
-                        "%s(%u) did not return a "
-                        "CopyToRoutine struct",
-                        format, handlerOid),
-                 parser_errposition(
-                                    pstate, defel->location)));
+    if (is_from)
+    {
+        if (routine == NULL || !IsA(routine, CopyFromRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyFromRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
+    else
+    {
+        if (routine == NULL || !IsA(routine, CopyToRoutine))
+            ereport(
+                    ERROR,
+                    (errcode(
+                             ERRCODE_INVALID_PARAMETER_VALUE),
+                     errmsg("COPY handler function "
+                            "%s(%u) did not return a "
+                            "CopyToRoutine struct",
+                            format, handlerOid),
+                     parser_errposition(
+                                        pstate, defel->location)));
+    }
 
     opts_out->routine = routine;
 }
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e6ea9ce1602..932f1ff4f6e 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -247,7 +247,9 @@ static const CopyFromRoutine CopyFromRoutineBinary = {
 static const CopyFromRoutine *
 CopyFromGetRoutine(CopyFormatOptions opts)
 {
-    if (opts.csv_mode)
+    if (opts.routine)
+        return (const CopyFromRoutine *) opts.routine;
+    else if (opts.csv_mode)
         return &CopyFromRoutineCSV;
     else if (opts.binary)
         return &CopyFromRoutineBinary;
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index 793dd671935..37ebfa0908f 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -634,7 +634,7 @@
   typoutput => 'tsm_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
 { oid => '8752',
-  descr => 'pseudo-type for the result of a copy to method function',
+  descr => 'pseudo-type for the result of a copy to/from method function',
   typname => 'copy_handler', typlen => '4', typbyval => 't', typtype => 'p',
   typcategory => 'P', typinput => 'copy_handler_in',
   typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 81b2f4e5c1f..e9c01492797 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -87,7 +87,8 @@ typedef struct CopyFormatOptions
     CopyLogVerbosityChoice log_verbosity;    /* verbosity of logged messages */
     int64        reject_limit;    /* maximum tolerable number of errors */
     List       *convert_select; /* list of column names (can be NIL) */
-    Node       *routine;        /* CopyToRoutine (can be NULL) */
+    Node       *routine;        /* CopyToRoutine or CopyFromRoutine (can be
+                                 * NULL) */
 } CopyFormatOptions;
 
 /* This is private in commands/copyfrom.c */
@@ -99,6 +100,8 @@ typedef struct CopyFromStateData *CopyFromState;
  */
 typedef struct CopyFromRoutine
 {
+    NodeTag        type;
+
     /*
      * Called when COPY FROM is started to set up the input functions
      * associated with the relation's attributes writing to.  `finfo` can be
diff --git a/src/test/modules/test_copy_format/expected/test_copy_format.out
b/src/test/modules/test_copy_format/expected/test_copy_format.out
index 606c78f6878..4ed7c0b12db 100644
--- a/src/test/modules/test_copy_format/expected/test_copy_format.out
+++ b/src/test/modules/test_copy_format/expected/test_copy_format.out
@@ -2,9 +2,13 @@ CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
 COPY public.test FROM stdin WITH (format 'test_copy_format');
-ERROR:  COPY format "test_copy_format" not recognized
-LINE 1: COPY public.test FROM stdin WITH (format 'test_copy_format')...
-                                          ^
+NOTICE:  test_copy_format: is_from=true
+NOTICE:  CopyFromInFunc: atttypid=21
+NOTICE:  CopyFromInFunc: atttypid=23
+NOTICE:  CopyFromInFunc: atttypid=20
+NOTICE:  CopyFromStart: natts=3
+NOTICE:  CopyFromOneRow
+NOTICE:  CopyFromEnd
 COPY public.test TO stdout WITH (format 'test_copy_format');
 NOTICE:  test_copy_format: is_from=false
 NOTICE:  CopyToOutFunc: atttypid=21
diff --git a/src/test/modules/test_copy_format/sql/test_copy_format.sql
b/src/test/modules/test_copy_format/sql/test_copy_format.sql
index 9406b3be3d4..e805f7cb011 100644
--- a/src/test/modules/test_copy_format/sql/test_copy_format.sql
+++ b/src/test/modules/test_copy_format/sql/test_copy_format.sql
@@ -2,4 +2,5 @@ CREATE EXTENSION test_copy_format;
 CREATE TABLE public.test (a smallint, b integer, c bigint);
 INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
 COPY public.test FROM stdin WITH (format 'test_copy_format');
+\.
 COPY public.test TO stdout WITH (format 'test_copy_format');
diff --git a/src/test/modules/test_copy_format/test_copy_format.c
b/src/test/modules/test_copy_format/test_copy_format.c
index e064f40473b..f6b105659ab 100644
--- a/src/test/modules/test_copy_format/test_copy_format.c
+++ b/src/test/modules/test_copy_format/test_copy_format.c
@@ -18,6 +18,40 @@
 
 PG_MODULE_MAGIC;
 
+static void
+CopyFromInFunc(CopyFromState cstate, Oid atttypid,
+               FmgrInfo *finfo, Oid *typioparam)
+{
+    ereport(NOTICE, (errmsg("CopyFromInFunc: atttypid=%d", atttypid)));
+}
+
+static void
+CopyFromStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+    ereport(NOTICE, (errmsg("CopyFromStart: natts=%d", tupDesc->natts)));
+}
+
+static bool
+CopyFromOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+    ereport(NOTICE, (errmsg("CopyFromOneRow")));
+    return false;
+}
+
+static void
+CopyFromEnd(CopyFromState cstate)
+{
+    ereport(NOTICE, (errmsg("CopyFromEnd")));
+}
+
+static const CopyFromRoutine CopyFromRoutineTestCopyFormat = {
+    .type = T_CopyFromRoutine,
+    .CopyFromInFunc = CopyFromInFunc,
+    .CopyFromStart = CopyFromStart,
+    .CopyFromOneRow = CopyFromOneRow,
+    .CopyFromEnd = CopyFromEnd,
+};
+
 static void
 CopyToOutFunc(CopyToState cstate, Oid atttypid, FmgrInfo *finfo)
 {
@@ -59,5 +93,8 @@ test_copy_format(PG_FUNCTION_ARGS)
     ereport(NOTICE,
             (errmsg("test_copy_format: is_from=%s", is_from ? "true" : "false")));
 
-    PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
+    if (is_from)
+        PG_RETURN_POINTER(&CopyFromRoutineTestCopyFormat);
+    else
+        PG_RETURN_POINTER(&CopyToRoutineTestCopyFormat);
 }
-- 
2.45.2

From 0f4d74f1750467f4ac3b87b24e03271579793651 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sun, 29 Sep 2024 00:28:02 +0900
Subject: [PATCH v23 09/10] Export CopyFromStateData

It's for custom COPY FROM format handlers implemented as extension.

This just moves codes. This doesn't change codes except CopySource
enum values. This changes COPY_ prefix of CopySource enum values to
COPY_SOURCE_ prefix like the CopyDest enum values prefix change. For
example, COPY_FILE in CopySource is renamed to COPY_SOURCE_FILE.

Note that this isn't enough to implement custom COPY FROM format
handlers as extension. We'll do the followings in a subsequent commit:

1. Add an opaque space for custom COPY FROM format handler
2. Export CopyReadBinaryData() to read the next data
---
 src/backend/commands/copyfrom.c          |   4 +-
 src/backend/commands/copyfromparse.c     |  10 +-
 src/include/commands/copy.h              |   5 -
 src/include/commands/copyapi.h           | 168 ++++++++++++++++++++++-
 src/include/commands/copyfrom_internal.h | 165 ----------------------
 5 files changed, 174 insertions(+), 178 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 932f1ff4f6e..d758e66c6a1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -1716,7 +1716,7 @@ BeginCopyFrom(ParseState *pstate,
                             pg_encoding_to_char(GetDatabaseEncoding()))));
     }
 
-    cstate->copy_src = COPY_FILE;    /* default */
+    cstate->copy_src = COPY_SOURCE_FILE;    /* default */
 
     cstate->whereClause = whereClause;
 
@@ -1844,7 +1844,7 @@ BeginCopyFrom(ParseState *pstate,
     if (data_source_cb)
     {
         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
-        cstate->copy_src = COPY_CALLBACK;
+        cstate->copy_src = COPY_SOURCE_CALLBACK;
         cstate->data_source_cb = data_source_cb;
     }
     else if (pipe)
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 0447c4df7e0..ccfbacb4a37 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -171,7 +171,7 @@ ReceiveCopyBegin(CopyFromState cstate)
     for (i = 0; i < natts; i++)
         pq_sendint16(&buf, format); /* per-column formats */
     pq_endmessage(&buf);
-    cstate->copy_src = COPY_FRONTEND;
+    cstate->copy_src = COPY_SOURCE_FRONTEND;
     cstate->fe_msgbuf = makeStringInfo();
     /* We *must* flush here to ensure FE knows it can send. */
     pq_flush();
@@ -239,7 +239,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
 
     switch (cstate->copy_src)
     {
-        case COPY_FILE:
+        case COPY_SOURCE_FILE:
             bytesread = fread(databuf, 1, maxread, cstate->copy_file);
             if (ferror(cstate->copy_file))
                 ereport(ERROR,
@@ -248,7 +248,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
             if (bytesread == 0)
                 cstate->raw_reached_eof = true;
             break;
-        case COPY_FRONTEND:
+        case COPY_SOURCE_FRONTEND:
             while (maxread > 0 && bytesread < minread && !cstate->raw_reached_eof)
             {
                 int            avail;
@@ -331,7 +331,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread)
                 bytesread += avail;
             }
             break;
-        case COPY_CALLBACK:
+        case COPY_SOURCE_CALLBACK:
             bytesread = cstate->data_source_cb(databuf, minread, maxread);
             break;
     }
@@ -1179,7 +1179,7 @@ CopyReadLine(CopyFromState cstate, bool is_csv)
          * after \. up to the protocol end of copy data.  (XXX maybe better
          * not to treat \. as special?)
          */
-        if (cstate->copy_src == COPY_FRONTEND)
+        if (cstate->copy_src == COPY_SOURCE_FRONTEND)
         {
             int            inbytes;
 
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index e5696839637..e2411848e9f 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -19,11 +19,6 @@
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
-/* This is private in commands/copyfrom.c */
-typedef struct CopyFromStateData *CopyFromState;
-
-typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
-
 extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
                    int stmt_location, int stmt_len,
                    uint64 *processed);
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index e9c01492797..0274e3487c3 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -91,7 +91,6 @@ typedef struct CopyFormatOptions
                                  * NULL) */
 } CopyFormatOptions;
 
-/* This is private in commands/copyfrom.c */
 typedef struct CopyFromStateData *CopyFromState;
 
 /*
@@ -138,6 +137,173 @@ typedef struct CopyFromRoutine
     void        (*CopyFromEnd) (CopyFromState cstate);
 } CopyFromRoutine;
 
+/*
+ * Represents the different source cases we need to worry about at
+ * the bottom level
+ */
+typedef enum CopySource
+{
+    COPY_SOURCE_FILE,            /* from file (or a piped program) */
+    COPY_SOURCE_FRONTEND,        /* from frontend */
+    COPY_SOURCE_CALLBACK,        /* from callback function */
+} CopySource;
+
+/*
+ * Represents the end-of-line terminator type of the input
+ */
+typedef enum EolType
+{
+    EOL_UNKNOWN,
+    EOL_NL,
+    EOL_CR,
+    EOL_CRNL,
+} EolType;
+
+/*
+ * Represents the insert method to be used during COPY FROM.
+ */
+typedef enum CopyInsertMethod
+{
+    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
+    CIM_MULTI,                    /* always use table_multi_insert or
+                                 * ExecForeignBatchInsert */
+    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
+                                 * ExecForeignBatchInsert only if valid */
+} CopyInsertMethod;
+
+typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+
+/*
+ * This struct contains all the state variables used throughout a COPY FROM
+ * operation.
+ */
+typedef struct CopyFromStateData
+{
+    /* format routine */
+    const CopyFromRoutine *routine;
+
+    /* low-level state data */
+    CopySource    copy_src;        /* type of copy source */
+    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
+    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
+
+    EolType        eol_type;        /* EOL type of input */
+    int            file_encoding;    /* file or remote side's character encoding */
+    bool        need_transcoding;    /* file encoding diff from server? */
+    Oid            conversion_proc;    /* encoding conversion function */
+
+    /* parameters from the COPY command */
+    Relation    rel;            /* relation to copy from */
+    List       *attnumlist;        /* integer list of attnums to copy */
+    char       *filename;        /* filename, or NULL for STDIN */
+    bool        is_program;        /* is 'filename' a program to popen? */
+    copy_data_source_cb data_source_cb; /* function for reading data */
+
+    CopyFormatOptions opts;
+    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
+    Node       *whereClause;    /* WHERE condition (or NULL) */
+
+    /* these are just for error messages, see CopyFromErrorCallback */
+    const char *cur_relname;    /* table name for error messages */
+    uint64        cur_lineno;        /* line number for error messages */
+    const char *cur_attname;    /* current att for error messages */
+    const char *cur_attval;        /* current att value for error messages */
+    bool        relname_only;    /* don't output line number, att, etc. */
+
+    /*
+     * Working state
+     */
+    MemoryContext copycontext;    /* per-copy execution context */
+
+    AttrNumber    num_defaults;    /* count of att that are missing and have
+                                 * default value */
+    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
+    Oid           *typioparams;    /* array of element types for in_functions */
+    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
+                                     * execution */
+    uint64        num_errors;        /* total number of rows which contained soft
+                                 * errors */
+    int           *defmap;            /* array of default att numbers related to
+                                 * missing att */
+    ExprState **defexprs;        /* array of default att expressions for all
+                                 * att */
+    bool       *defaults;        /* if DEFAULT marker was found for
+                                 * corresponding att */
+    bool        volatile_defexprs;    /* is any of defexprs volatile? */
+    List       *range_table;    /* single element list of RangeTblEntry */
+    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
+    ExprState  *qualexpr;
+
+    TransitionCaptureState *transition_capture;
+
+    /*
+     * These variables are used to reduce overhead in COPY FROM.
+     *
+     * attribute_buf holds the separated, de-escaped text for each field of
+     * the current line.  The CopyReadAttributes functions return arrays of
+     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
+     * the buffer on each cycle.
+     *
+     * In binary COPY FROM, attribute_buf holds the binary data for the
+     * current field, but the usage is otherwise similar.
+     */
+    StringInfoData attribute_buf;
+
+    /* field raw data pointers found by COPY FROM */
+
+    int            max_fields;
+    char      **raw_fields;
+
+    /*
+     * Similarly, line_buf holds the whole input line being processed. The
+     * input cycle is first to read the whole line into line_buf, and then
+     * extract the individual attribute fields into attribute_buf.  line_buf
+     * is preserved unmodified so that we can display it in error messages if
+     * appropriate.  (In binary mode, line_buf is not used.)
+     */
+    StringInfoData line_buf;
+    bool        line_buf_valid; /* contains the row being processed? */
+
+    /*
+     * input_buf holds input data, already converted to database encoding.
+     *
+     * In text mode, CopyReadLine parses this data sufficiently to locate line
+     * boundaries, then transfers the data to line_buf. We guarantee that
+     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
+     * mode, input_buf is not used.)
+     *
+     * If encoding conversion is not required, input_buf is not a separate
+     * buffer but points directly to raw_buf.  In that case, input_buf_len
+     * tracks the number of bytes that have been verified as valid in the
+     * database encoding, and raw_buf_len is the total number of bytes stored
+     * in the buffer.
+     */
+#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
+    char       *input_buf;
+    int            input_buf_index;    /* next byte to process */
+    int            input_buf_len;    /* total # of bytes stored */
+    bool        input_reached_eof;    /* true if we reached EOF */
+    bool        input_reached_error;    /* true if a conversion error happened */
+    /* Shorthand for number of unconsumed bytes available in input_buf */
+#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
+
+    /*
+     * raw_buf holds raw input data read from the data source (file or client
+     * connection), not yet converted to the database encoding.  Like with
+     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
+     */
+#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
+    char       *raw_buf;
+    int            raw_buf_index;    /* next byte to process */
+    int            raw_buf_len;    /* total # of bytes stored */
+    bool        raw_reached_eof;    /* true if we reached EOF */
+
+    /* Shorthand for number of unconsumed bytes available in raw_buf */
+#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
+
+    uint64        bytes_processed;    /* number of bytes processed so far */
+} CopyFromStateData;
+
 typedef struct CopyToStateData *CopyToState;
 
 /*
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c11b5ff3cc0..3863d26d5b7 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -19,171 +19,6 @@
 #include "commands/trigger.h"
 #include "nodes/miscnodes.h"
 
-/*
- * Represents the different source cases we need to worry about at
- * the bottom level
- */
-typedef enum CopySource
-{
-    COPY_FILE,                    /* from file (or a piped program) */
-    COPY_FRONTEND,                /* from frontend */
-    COPY_CALLBACK,                /* from callback function */
-} CopySource;
-
-/*
- *    Represents the end-of-line terminator type of the input
- */
-typedef enum EolType
-{
-    EOL_UNKNOWN,
-    EOL_NL,
-    EOL_CR,
-    EOL_CRNL,
-} EolType;
-
-/*
- * Represents the insert method to be used during COPY FROM.
- */
-typedef enum CopyInsertMethod
-{
-    CIM_SINGLE,                    /* use table_tuple_insert or ExecForeignInsert */
-    CIM_MULTI,                    /* always use table_multi_insert or
-                                 * ExecForeignBatchInsert */
-    CIM_MULTI_CONDITIONAL,        /* use table_multi_insert or
-                                 * ExecForeignBatchInsert only if valid */
-} CopyInsertMethod;
-
-/*
- * This struct contains all the state variables used throughout a COPY FROM
- * operation.
- */
-typedef struct CopyFromStateData
-{
-    /* format routine */
-    const CopyFromRoutine *routine;
-
-    /* low-level state data */
-    CopySource    copy_src;        /* type of copy source */
-    FILE       *copy_file;        /* used if copy_src == COPY_FILE */
-    StringInfo    fe_msgbuf;        /* used if copy_src == COPY_FRONTEND */
-
-    EolType        eol_type;        /* EOL type of input */
-    int            file_encoding;    /* file or remote side's character encoding */
-    bool        need_transcoding;    /* file encoding diff from server? */
-    Oid            conversion_proc;    /* encoding conversion function */
-
-    /* parameters from the COPY command */
-    Relation    rel;            /* relation to copy from */
-    List       *attnumlist;        /* integer list of attnums to copy */
-    char       *filename;        /* filename, or NULL for STDIN */
-    bool        is_program;        /* is 'filename' a program to popen? */
-    copy_data_source_cb data_source_cb; /* function for reading data */
-
-    CopyFormatOptions opts;
-    bool       *convert_select_flags;    /* per-column CSV/TEXT CS flags */
-    Node       *whereClause;    /* WHERE condition (or NULL) */
-
-    /* these are just for error messages, see CopyFromErrorCallback */
-    const char *cur_relname;    /* table name for error messages */
-    uint64        cur_lineno;        /* line number for error messages */
-    const char *cur_attname;    /* current att for error messages */
-    const char *cur_attval;        /* current att value for error messages */
-    bool        relname_only;    /* don't output line number, att, etc. */
-
-    /*
-     * Working state
-     */
-    MemoryContext copycontext;    /* per-copy execution context */
-
-    AttrNumber    num_defaults;    /* count of att that are missing and have
-                                 * default value */
-    FmgrInfo   *in_functions;    /* array of input functions for each attrs */
-    Oid           *typioparams;    /* array of element types for in_functions */
-    ErrorSaveContext *escontext;    /* soft error trapper during in_functions
-                                     * execution */
-    uint64        num_errors;        /* total number of rows which contained soft
-                                 * errors */
-    int           *defmap;            /* array of default att numbers related to
-                                 * missing att */
-    ExprState **defexprs;        /* array of default att expressions for all
-                                 * att */
-    bool       *defaults;        /* if DEFAULT marker was found for
-                                 * corresponding att */
-    bool        volatile_defexprs;    /* is any of defexprs volatile? */
-    List       *range_table;    /* single element list of RangeTblEntry */
-    List       *rteperminfos;    /* single element list of RTEPermissionInfo */
-    ExprState  *qualexpr;
-
-    TransitionCaptureState *transition_capture;
-
-    /*
-     * These variables are used to reduce overhead in COPY FROM.
-     *
-     * attribute_buf holds the separated, de-escaped text for each field of
-     * the current line.  The CopyReadAttributes functions return arrays of
-     * pointers into this buffer.  We avoid palloc/pfree overhead by re-using
-     * the buffer on each cycle.
-     *
-     * In binary COPY FROM, attribute_buf holds the binary data for the
-     * current field, but the usage is otherwise similar.
-     */
-    StringInfoData attribute_buf;
-
-    /* field raw data pointers found by COPY FROM */
-
-    int            max_fields;
-    char      **raw_fields;
-
-    /*
-     * Similarly, line_buf holds the whole input line being processed. The
-     * input cycle is first to read the whole line into line_buf, and then
-     * extract the individual attribute fields into attribute_buf.  line_buf
-     * is preserved unmodified so that we can display it in error messages if
-     * appropriate.  (In binary mode, line_buf is not used.)
-     */
-    StringInfoData line_buf;
-    bool        line_buf_valid; /* contains the row being processed? */
-
-    /*
-     * input_buf holds input data, already converted to database encoding.
-     *
-     * In text mode, CopyReadLine parses this data sufficiently to locate line
-     * boundaries, then transfers the data to line_buf. We guarantee that
-     * there is a \0 at input_buf[input_buf_len] at all times.  (In binary
-     * mode, input_buf is not used.)
-     *
-     * If encoding conversion is not required, input_buf is not a separate
-     * buffer but points directly to raw_buf.  In that case, input_buf_len
-     * tracks the number of bytes that have been verified as valid in the
-     * database encoding, and raw_buf_len is the total number of bytes stored
-     * in the buffer.
-     */
-#define INPUT_BUF_SIZE 65536    /* we palloc INPUT_BUF_SIZE+1 bytes */
-    char       *input_buf;
-    int            input_buf_index;    /* next byte to process */
-    int            input_buf_len;    /* total # of bytes stored */
-    bool        input_reached_eof;    /* true if we reached EOF */
-    bool        input_reached_error;    /* true if a conversion error happened */
-    /* Shorthand for number of unconsumed bytes available in input_buf */
-#define INPUT_BUF_BYTES(cstate) ((cstate)->input_buf_len - (cstate)->input_buf_index)
-
-    /*
-     * raw_buf holds raw input data read from the data source (file or client
-     * connection), not yet converted to the database encoding.  Like with
-     * 'input_buf', we guarantee that there is a \0 at raw_buf[raw_buf_len].
-     */
-#define RAW_BUF_SIZE 65536        /* we palloc RAW_BUF_SIZE+1 bytes */
-    char       *raw_buf;
-    int            raw_buf_index;    /* next byte to process */
-    int            raw_buf_len;    /* total # of bytes stored */
-    bool        raw_reached_eof;    /* true if we reached EOF */
-
-    /* Shorthand for number of unconsumed bytes available in raw_buf */
-#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
-
-    uint64        bytes_processed;    /* number of bytes processed so far */
-} CopyFromStateData;
-
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
-- 
2.45.2

From f9908c010646fdf182615a2e3632395ae9d9c4f3 Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sun, 29 Sep 2024 00:32:31 +0900
Subject: [PATCH v23 10/10] Add support for implementing custom COPY FROM
 format as extension

* Add CopyFromStateData::opaque that can be used to keep data for
  custom COPY From format implementation
* Export CopyReadBinaryData() to read the next data as
  CopyFromStateRead()
---
 src/backend/commands/copyfromparse.c | 14 ++++++++++++++
 src/include/commands/copyapi.h       |  6 ++++++
 2 files changed, 20 insertions(+)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index ccfbacb4a37..4fa23d992f5 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -730,6 +730,20 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
     return copied_bytes;
 }
 
+/*
+ * CopyFromStateRead
+ *
+ * Export CopyReadBinaryData() for extensions. We want to keep
+ * CopyReadBinaryData() as a static function for
+ * optimization. CopyReadBinaryData() calls in this file may be optimized by
+ * a compiler.
+ */
+int
+CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes)
+{
+    return CopyReadBinaryData(cstate, dest, nbytes);
+}
+
 /*
  * Read raw fields in the next line for COPY FROM in text or csv mode.
  * Return false if no more lines.
diff --git a/src/include/commands/copyapi.h b/src/include/commands/copyapi.h
index 0274e3487c3..2de610ef729 100644
--- a/src/include/commands/copyapi.h
+++ b/src/include/commands/copyapi.h
@@ -302,8 +302,14 @@ typedef struct CopyFromStateData
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 
     uint64        bytes_processed;    /* number of bytes processed so far */
+
+    /* For custom format implementation */
+    void       *opaque;            /* private space */
 } CopyFromStateData;
 
+extern int    CopyFromStateRead(CopyFromState cstate, char *dest, int nbytes);
+
+
 typedef struct CopyToStateData *CopyToState;
 
 /*
-- 
2.45.2


pgsql-hackers by date:

Previous
From: Peter Smith
Date:
Subject: Re: Pgoutput not capturing the generated columns
Next
From: Peter Eisentraut
Date:
Subject: Re: Doc: typo in config.sgml