diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 99d1457..f87aedd 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -195,7 +195,8 @@ typedef struct CopyStateData * the buffer on each cycle. * * (In binary COPY FROM, attribute_buf holds the binary data for the - * current field, while the other variables are not used.) + * current field copied from raw_buf, which in turn holds raw data read + * from the file.) */ StringInfoData attribute_buf; @@ -219,13 +220,18 @@ typedef struct CopyStateData * Finally, raw_buf holds raw data read from the data source (file or * client connection). CopyReadLine parses this data sufficiently to * locate line boundaries, then transfers the data to line_buf and - * converts it. Note: we guarantee that there is a \0 at - * raw_buf[raw_buf_len]. + * converts it. Also, CopyReadBinaryData() slices the data into + * attributes based on the caller-specified field length in bytes. + * Note: we guarantee that there is a \0 at raw_buf[raw_buf_len]. */ #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */ char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + + /* Shorthand for bytes in raw_buf available for reading */ +#define RAW_BUF_BYTES(cstate)\ + ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyStateData; /* DestReceiver for COPY (query) TO */ @@ -373,6 +379,7 @@ static int CopyReadAttributesCSV(CopyState cstate); static Datum CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); +static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); static void CopyAttributeOutText(CopyState cstate, char *string); static void CopyAttributeOutCSV(CopyState cstate, char *string, bool use_quote, bool single_attr); @@ -391,9 +398,7 @@ static void CopySendEndOfRow(CopyState cstate); static int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); -static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); -static bool CopyGetInt16(CopyState cstate, int16 *val); /* @@ -733,25 +738,6 @@ CopySendInt32(CopyState cstate, int32 val) } /* - * CopyGetInt32 reads an int32 that appears in network byte order - * - * Returns true if OK, false if EOF - */ -static bool -CopyGetInt32(CopyState cstate, int32 *val) -{ - uint32 buf; - - if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int32) pg_ntoh32(buf); - return true; -} - -/* * CopySendInt16 sends an int16 in network byte order */ static void @@ -763,33 +749,16 @@ CopySendInt16(CopyState cstate, int16 val) CopySendData(cstate, &buf, sizeof(buf)); } -/* - * CopyGetInt16 reads an int16 that appears in network byte order - */ -static bool -CopyGetInt16(CopyState cstate, int16 *val) -{ - uint16 buf; - - if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) - { - *val = 0; /* suppress compiler warning */ - return false; - } - *val = (int16) pg_ntoh16(buf); - return true; -} - /* * CopyLoadRawBuf loads some more data into raw_buf * * Returns true if able to obtain at least one more byte, else false. * - * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred - * down to the start of the buffer and then we load more data after that. - * This case is used only when a frontend multibyte character crosses a - * bufferload boundary. + * If RAW_BUF_BYTES(cstate) > 0, the unprocessed bytes are transferred down to + * the start of the buffer and then we load more data after that. This case + * is only used when a frontend multibyte character crosses a bufferload + * boundary. */ static bool CopyLoadRawBuf(CopyState cstate) @@ -797,10 +766,10 @@ CopyLoadRawBuf(CopyState cstate) int nbytes; int inbytes; - if (cstate->raw_buf_index < cstate->raw_buf_len) + if (RAW_BUF_BYTES(cstate) > 0) { /* Copy down the unprocessed data */ - nbytes = cstate->raw_buf_len - cstate->raw_buf_index; + nbytes = RAW_BUF_BYTES(cstate); memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes); } @@ -816,6 +785,57 @@ CopyLoadRawBuf(CopyState cstate) return (inbytes > 0); } +/* + * CopyReadBinaryData + * Reads 'nbytes' bytes from cstate->copy_file via cstate->raw_buf and + * writes them to 'dest' + */ +static int +CopyReadBinaryData(CopyState cstate, char *dest, int nbytes) +{ + int copied_bytes = 0; + +#define DRAIN_COPY_RAW_BUF(cstate, dest, nbytes)\ + do {\ + memcpy((dest), (cstate)->raw_buf + (cstate)->raw_buf_index, (nbytes));\ + (cstate)->raw_buf_index += (nbytes);\ + } while(0) + + if (RAW_BUF_BYTES(cstate) >= nbytes) + { + /* Enough bytes are present in the buffer. */ + DRAIN_COPY_RAW_BUF(cstate, dest, nbytes); + copied_bytes = nbytes; + } + else + { + /* + * Not enough bytes in the buffer, so must read from the file. Need + * the loop considering that 'nbytes' may be larger than the maximum + * bytes that the buffer can hold. + */ + do + { + int copy_bytes; + + /* + * This tries to read up to RAW_BUF_SIZE bytes into raw_buf, + * returning if no more were read. + */ + if (RAW_BUF_BYTES(cstate) == 0 && !CopyLoadRawBuf(cstate)) + return copied_bytes; + + copy_bytes = Min(nbytes - copied_bytes, RAW_BUF_BYTES(cstate)); + DRAIN_COPY_RAW_BUF(cstate, dest, copy_bytes); + dest += copy_bytes; + copied_bytes += copy_bytes; + } while (copied_bytes < nbytes); + } + +#undef DRAIN_COPY_RAW_BUF + + return copied_bytes; +} /* * DoCopy executes the SQL COPY statement @@ -3363,17 +3383,17 @@ BeginCopyFrom(ParseState *pstate, cstate->cur_attval = NULL; /* - * Set up variables to avoid per-attribute overhead. attribute_buf is - * used in both text and binary modes, but we use line_buf and raw_buf + * Set up variables to avoid per-attribute overhead. attribute_buf and + * raw_buf are used in both text and binary modes, but we use line_buf * only in text mode. */ initStringInfo(&cstate->attribute_buf); + cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); + cstate->raw_buf_index = cstate->raw_buf_len = 0; if (!cstate->binary) { initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; - cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); - cstate->raw_buf_index = cstate->raw_buf_len = 0; } /* Assign range table, we'll need it in CopyFrom. */ @@ -3524,16 +3544,18 @@ BeginCopyFrom(ParseState *pstate, int32 tmp; /* Signature */ - if (CopyGetData(cstate, readSig, 11, 11) != 11 || + if (CopyReadBinaryData(cstate, readSig, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("COPY file signature not recognized"))); /* Flags field */ - if (!CopyGetInt32(cstate, &tmp)) + if (CopyReadBinaryData(cstate, (char *) &tmp, sizeof(tmp)) != + sizeof(tmp)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing flags)"))); + tmp = (int32) pg_ntoh32(tmp); if ((tmp & (1 << 16)) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -3544,15 +3566,15 @@ BeginCopyFrom(ParseState *pstate, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unrecognized critical flags in COPY file header"))); /* Header extension length */ - if (!CopyGetInt32(cstate, &tmp) || - tmp < 0) + if (CopyReadBinaryData(cstate, (char *) &tmp, sizeof(tmp)) != + sizeof(tmp) || (tmp = (int32) pg_ntoh32(tmp)) < 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (missing length)"))); /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyGetData(cstate, readSig, 1, 1) != 1) + if (CopyReadBinaryData(cstate, readSig, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -3745,12 +3767,14 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, cstate->cur_lineno++; - if (!CopyGetInt16(cstate, &fld_count)) + if (CopyReadBinaryData(cstate, (char *) &fld_count, + sizeof(fld_count)) != sizeof(fld_count)) { /* EOF detected (end of file, or protocol-level EOF) */ return false; } + fld_count = (int16) pg_ntoh16(fld_count); if (fld_count == -1) { /* @@ -3768,7 +3792,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, char dummy; if (cstate->copy_dest != COPY_OLD_FE && - CopyGetData(cstate, &dummy, 1, 1) > 0) + CopyReadBinaryData(cstate, &dummy, 1) > 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("received copy data after EOF marker"))); @@ -4723,10 +4747,12 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, int32 fld_size; Datum result; - if (!CopyGetInt32(cstate, &fld_size)) + if (CopyReadBinaryData(cstate, (char *) &fld_size, sizeof(fld_size)) != + sizeof(fld_size)) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data"))); + fld_size = (int32) pg_ntoh32(fld_size); if (fld_size == -1) { *isnull = true; @@ -4741,8 +4767,8 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, resetStringInfo(&cstate->attribute_buf); enlargeStringInfo(&cstate->attribute_buf, fld_size); - if (CopyGetData(cstate, cstate->attribute_buf.data, - fld_size, fld_size) != fld_size) + if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, + fld_size) != fld_size) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("unexpected EOF in COPY data")));