From f2f82a6018937a594eea9b3f3d9b9c23be9b4b66 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Wed, 14 Oct 2020 12:49:48 +0530 Subject: [PATCH v7 6/6] Parallel Copy For Binary Format Files Leader reads data from the file into the DSM data blocks each of 64K size. It also identifies each tuple data block id, start offset, end offset, tuple size and updates this information in the ring data structure. Workers parallelly read the tuple information from the ring data structure, the actual tuple data from the data blocks and parallelly insert the tuples into the table. --- src/backend/commands/copy.c | 126 ++++++------- src/backend/commands/copyparallel.c | 367 ++++++++++++++++++++++++++++++++++-- src/include/commands/copy.h | 126 +++++++++++++ 3 files changed, 531 insertions(+), 88 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 9a026be..44e0aa4 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -222,19 +222,14 @@ static void CopySendData(CopyState cstate, const void *databuf, int datasize); static void CopySendString(CopyState cstate, const char *str); static void CopySendChar(CopyState cstate, char c); static void CopySendEndOfRow(CopyState cstate); -static int CopyGetData(CopyState cstate, void *databuf, - int minread, int maxread); static void CopySendInt32(CopyState cstate, int32 val); static bool CopyGetInt32(CopyState cstate, int32 *val); static void CopySendInt16(CopyState cstate, int16 val); static bool CopyGetInt16(CopyState cstate, int16 *val); static bool CopyLoadRawBuf(CopyState cstate); -static int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); - static void ClearEOLFromCopiedData(CopyState cstate, char *copy_line_data, int copy_line_pos, int *copy_line_size); - /* * Send copy start/stop messages for frontend copies. These have changed * in past protocol redesigns. @@ -448,7 +443,7 @@ CopySendEndOfRow(CopyState cstate) * * NB: no data conversion is applied here. */ -static int +int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) { int bytesread = 0; @@ -581,10 +576,25 @@ CopyGetInt32(CopyState cstate, int32 *val) { uint32 buf; - if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + /* + * For parallel copy, avoid reading data to raw buf, read directly from + * file, later the data will be read to parallel copy data buffers. + */ + if (cstate->nworkers > 0) { - *val = 0; /* suppress compiler warning */ - return false; + if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } + } + else + { + if (CopyReadBinaryData(cstate, (char *) &buf, sizeof(buf)) != sizeof(buf)) + { + *val = 0; /* suppress compiler warning */ + return false; + } } *val = (int32) pg_ntoh32(buf); return true; @@ -658,7 +668,7 @@ CopyLoadRawBuf(CopyState cstate) * and writes them to 'dest'. Returns the number of bytes read (which * would be less than 'nbytes' only if we reach EOF). */ -static int +int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes) { int copied_bytes = 0; @@ -3563,7 +3573,7 @@ BeginCopyFrom(ParseState *pstate, int32 tmp; /* Signature */ - if (CopyReadBinaryData(cstate, readSig, 11) != 11 || + if (CopyGetData(cstate, readSig, 11, 11) != 11 || memcmp(readSig, BinarySignature, 11) != 0) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), @@ -3591,7 +3601,7 @@ BeginCopyFrom(ParseState *pstate, /* Skip extension header, if present */ while (tmp-- > 0) { - if (CopyReadBinaryData(cstate, readSig, 1) != 1) + if (CopyGetData(cstate, readSig, 1, 1) != 1) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), errmsg("invalid COPY file header (wrong length)"))); @@ -3788,60 +3798,45 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, else { /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + cstate->max_fields = list_length(cstate->attnumlist); - if (!CopyGetInt16(cstate, &fld_count)) + if (!IsParallelCopy()) { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } - - if (fld_count == -1) - { - /* - * Received EOF marker. In a V3-protocol copy, wait for the - * protocol-level EOF, and complain if it doesn't come - * immediately. This ensures that we correctly handle CopyFail, - * if client chooses to send that now. - * - * Note that we MUST NOT try to read more data in an old-protocol - * copy, since there is no protocol-level EOF marker then. We - * could go either way for copy from file, but choose to throw - * error if there's data after the EOF marker, for consistency - * with the new-protocol case. - */ - char dummy; + int16 fld_count; + ListCell *cur; - if (cstate->copy_dest != COPY_OLD_FE && - CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; + } - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + CHECK_FIELD_COUNT; - foreach(cur, cstate->attnumlist) + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + } + else { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); + bool eof = false; - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; + eof = CopyReadBinaryTupleWorker(cstate, values, nulls); + + if (eof) + return false; } } @@ -4852,18 +4847,15 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, Datum result; if (!CopyGetInt32(cstate, &fld_size)) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; + if (fld_size == -1) { *isnull = true; return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); } - if (fld_size < 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("invalid field size"))); + + CHECK_FIELD_SIZE(fld_size); /* reset attribute_buf to empty, and load raw data in it */ resetStringInfo(&cstate->attribute_buf); @@ -4871,9 +4863,7 @@ CopyReadBinaryAttribute(CopyState cstate, FmgrInfo *flinfo, enlargeStringInfo(&cstate->attribute_buf, fld_size); if (CopyReadBinaryData(cstate, cstate->attribute_buf.data, fld_size) != fld_size) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("unexpected EOF in COPY data"))); + EOF_ERROR; cstate->attribute_buf.len = fld_size; cstate->attribute_buf.data[fld_size] = '\0'; diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c index 9cae112..3e17e82 100644 --- a/src/backend/commands/copyparallel.c +++ b/src/backend/commands/copyparallel.c @@ -100,6 +100,7 @@ SerializeParallelCopyState(ParallelContext *pcxt, CopyState cstate, shared_cstate.convert_selectively = cstate->convert_selectively; shared_cstate.num_defaults = cstate->num_defaults; shared_cstate.relid = cstate->pcdata->relid; + shared_cstate.binary = cstate->binary; memcpy(shmptr, (char *) &shared_cstate, sizeof(SerializedParallelCopyState)); copiedsize = sizeof(SerializedParallelCopyState); @@ -204,6 +205,7 @@ RestoreParallelCopyState(shm_toc *toc, CopyState cstate, List **attlist) cstate->convert_selectively = shared_cstate.convert_selectively; cstate->num_defaults = shared_cstate.num_defaults; cstate->pcdata->relid = shared_cstate.relid; + cstate->binary = shared_cstate.binary; cstate->null_print = CopyStringFromSharedMemory(shared_str_val + copiedsize, &copiedsize); @@ -403,7 +405,7 @@ bool IsParallelCopyAllowed(CopyState cstate) { /* Parallel copy not allowed for frontend (2.0 protocol) & binary option. */ - if ((cstate->copy_dest == COPY_OLD_FE) || cstate->binary) + if (cstate->copy_dest == COPY_OLD_FE) return false; /* @@ -620,6 +622,7 @@ InitializeParallelCopyInfo(CopyState cstate, List *attnamelist) cstate->cur_lineno = 0; cstate->cur_attname = NULL; cstate->cur_attval = NULL; + cstate->pcdata->curr_data_block = NULL; /* Set up variables to avoid per-attribute overhead. */ initStringInfo(&cstate->attribute_buf); @@ -842,7 +845,11 @@ return_line: /* Mark that encoding conversion hasn't occurred yet. */ cstate->line_buf_converted = false; - ConvertToServerEncoding(cstate); + + /* For binary format data, we don't need conversion. */ + if (!cstate->binary) + ConvertToServerEncoding(cstate); + pcdata->worker_line_buf_pos++; return false; } @@ -998,33 +1005,70 @@ ParallelCopyFrom(CopyState cstate) /* Execute the before statement triggers from the leader */ ExecBeforeStmtTrigger(cstate); - /* On input just throw the header line away. */ - if (cstate->cur_lineno == 0 && cstate->header_line) + if (!cstate->binary) { - cstate->cur_lineno++; - if (CopyReadLine(cstate)) + /* On input just throw the header line away. */ + if (cstate->cur_lineno == 0 && cstate->header_line) { - pcshared_info->is_read_in_progress = false; - return; /* done */ + cstate->cur_lineno++; + if (CopyReadLine(cstate)) + { + pcshared_info->is_read_in_progress = false; + return; /* done */ + } } - } - for (;;) - { - bool done; + for (;;) + { + bool done; - cstate->cur_lineno++; + cstate->cur_lineno++; - /* Actually read the line into memory here. */ - done = CopyReadLine(cstate); + /* Actually read the line into memory here. */ + done = CopyReadLine(cstate); + /* + * EOF at start of line means we're done. If we see EOF after + * some characters, we act as though it was newline followed by + * EOF, ie, process the line and then exit loop on next iteration. + */ + if (done && cstate->line_buf.len == 0) + break; + } + } + else + { /* - * EOF at start of line means we're done. If we see EOF after some - * characters, we act as though it was newline followed by EOF, ie, - * process the line and then exit loop on next iteration. + * Binary Format Files. In parallel copy leader, fill in the error + * context information here, in case any failures while determining + * tuple offsets, leader would throw the errors with proper context. */ - if (done && cstate->line_buf.len == 0) - break; + ErrorContextCallback errcallback; + + errcallback.callback = CopyFromErrorCallback; + errcallback.arg = (void *) cstate; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + cstate->pcdata->curr_data_block = NULL; + cstate->raw_buf_index = 0; + pcshared_info->populated = 0; + cstate->cur_lineno = 0; + cstate->max_fields = list_length(cstate->attnumlist); + + for (;;) + { + bool eof = false; + + cstate->cur_lineno++; + + eof = CopyReadBinaryTupleLeader(cstate); + + if (eof) + break; + } + + /* Reset the error context. */ + error_context_stack = errcallback.previous; } pcshared_info->is_read_in_progress = false; @@ -1032,6 +1076,289 @@ ParallelCopyFrom(CopyState cstate) } /* + * CopyReadBinaryGetDataBlock + * + * Gets a new block, updates the current offset, calculates the skip bytes. + */ +void +CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info) +{ + ParallelCopyDataBlock *data_block = NULL; + ParallelCopyDataBlock *curr_data_block = cstate->pcdata->curr_data_block; + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + uint8 move_bytes = 0; + uint32 block_pos; + uint32 prev_block_pos; + int read_bytes = 0; + + prev_block_pos = pcshared_info->cur_block_pos; + + block_pos = WaitGetFreeCopyBlock(pcshared_info); + + if (field_info == FIELD_SIZE || field_info == FIELD_COUNT) + move_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + + if (curr_data_block != NULL) + curr_data_block->skip_bytes = move_bytes; + + data_block = &pcshared_info->data_blocks[block_pos]; + + if (move_bytes > 0 && curr_data_block != NULL) + memmove(&data_block->data[0], &curr_data_block->data[cstate->raw_buf_index], move_bytes); + + elog(DEBUG1, "LEADER - field info %d is spread across data blocks - moved %d bytes from current block %u to %u block", + field_info, move_bytes, prev_block_pos, block_pos); + + read_bytes = CopyGetData(cstate, &data_block->data[move_bytes], 1, (DATA_BLOCK_SIZE - move_bytes)); + + if (field_info == FIELD_NONE && cstate->reached_eof) + return; + + if (cstate->reached_eof) + EOF_ERROR; + + elog(DEBUG1, "LEADER - bytes read from file %d", read_bytes); + + if (field_info == FIELD_SIZE || field_info == FIELD_DATA) + { + ParallelCopyDataBlock *prev_data_block = NULL; + + prev_data_block = curr_data_block; + prev_data_block->following_block = block_pos; + + if (prev_data_block->curr_blk_completed == false) + prev_data_block->curr_blk_completed = true; + + pg_atomic_add_fetch_u32(&prev_data_block->unprocessed_line_parts, 1); + } + + cstate->pcdata->curr_data_block = data_block; + cstate->raw_buf_index = 0; +} + +/* + * CopyReadBinaryTupleLeader + * + * Leader reads data from binary formatted file to data blocks and identifies + * tuple boundaries/offsets so that workers can work on the data blocks data. + */ +bool +CopyReadBinaryTupleLeader(CopyState cstate) +{ + ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info; + int16 fld_count; + uint32 line_size = 0; + uint32 start_block_pos; + uint32 start_offset; + + if (cstate->pcdata->curr_data_block == NULL) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_NONE); + + /* + * no data is read from file here. one possibility to be here could be + * that the binary file just has a valid signature but nothing else. + */ + if (cstate->reached_eof) + return true; + } + + if ((cstate->raw_buf_index + sizeof(fld_count)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_COUNT); + + memcpy(&fld_count, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + start_offset = cstate->raw_buf_index; + cstate->raw_buf_index += sizeof(fld_count); + line_size += sizeof(fld_count); + start_block_pos = pcshared_info->cur_block_pos; + + CopyReadBinaryFindTupleSize(cstate, &line_size); + + pg_atomic_add_fetch_u32(&cstate->pcdata->curr_data_block->unprocessed_line_parts, 1); + + if (line_size > 0) + (void) UpdateSharedLineInfo(cstate, start_block_pos, start_offset, + line_size, LINE_LEADER_POPULATED, -1); + + return false; +} + +/* + * CopyReadBinaryFindTupleSize + * + * Leader identifies boundaries/offsets for each attribute/column and finally + * results in the tuple/row size. It moves on to next data block if the + * attribute/column is spread across data blocks. + */ +void +CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size) +{ + int32 fld_size; + ListCell *cur; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + Form_pg_attribute att = TupleDescAttr(tup_desc, (att_num - 1)); + + cstate->cur_attname = NameStr(att->attname); + fld_size = 0; + + if ((cstate->raw_buf_index + sizeof(fld_size)) >= DATA_BLOCK_SIZE) + CopyReadBinaryGetDataBlock(cstate, FIELD_SIZE); + + memcpy(&fld_size, &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index], sizeof(fld_size)); + cstate->raw_buf_index += sizeof(fld_size); + *line_size += sizeof(fld_size); + fld_size = (int32) pg_ntoh32(fld_size); + + /* fld_size -1 represents the null value for the field. */ + if (fld_size == -1) + continue; + + CHECK_FIELD_SIZE(fld_size); + + *line_size += fld_size; + + if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size) + { + cstate->raw_buf_index += fld_size; + elog(DEBUG1, "LEADER - tuple lies in he same data block"); + } + else + { + int32 required_blks = 0; + int32 curr_blk_bytes = (DATA_BLOCK_SIZE - cstate->raw_buf_index); + int i = 0; + + GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes); + + i = required_blks; + + while (i > 0) + { + CopyReadBinaryGetDataBlock(cstate, FIELD_DATA); + i--; + } + + GET_RAW_BUF_INDEX(cstate->raw_buf_index, fld_size, required_blks, curr_blk_bytes); + + /* + * raw_buf_index should never cross data block size, as the + * required number of data blocks would have been obtained in the + * above while loop. + */ + Assert(cstate->raw_buf_index <= DATA_BLOCK_SIZE); + } + cstate->cur_attname = NULL; + } +} + +/* + * CopyReadBinaryTupleWorker + * + * Each worker reads data from data blocks caches the tuple data into local + * memory. + */ +bool +CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls) +{ + int16 fld_count; + ListCell *cur; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + TupleDesc tup_desc = RelationGetDescr(cstate->rel); + bool done = false; + + done = GetWorkerLine(cstate); + cstate->raw_buf_index = 0; + + if (done && cstate->line_buf.len == 0) + return true; + + memcpy(&fld_count, &cstate->line_buf.data[cstate->raw_buf_index], sizeof(fld_count)); + fld_count = (int16) pg_ntoh16(fld_count); + + CHECK_FIELD_COUNT; + + cstate->raw_buf_index += sizeof(fld_count); + + foreach(cur, cstate->attnumlist) + { + int att_num = lfirst_int(cur); + int m = att_num - 1; + Form_pg_attribute att = TupleDescAttr(tup_desc, m); + + cstate->cur_attname = NameStr(att->attname); + + values[m] = CopyReadBinaryAttributeWorker(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; + } + + return false; +} + +/* + * CopyReadBinaryAttributeWorker + * + * Worker identifies and converts each attribute/column data from binary to + * the data type of attribute/column. + */ +Datum +CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull) +{ + int32 fld_size; + Datum result; + + memcpy(&fld_size, &cstate->line_buf.data[cstate->raw_buf_index], sizeof(fld_size)); + cstate->raw_buf_index += sizeof(fld_size); + fld_size = (int32) pg_ntoh32(fld_size); + + /* fld_size -1 represents the null value for the field. */ + if (fld_size == -1) + { + *isnull = true; + return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod); + } + + CHECK_FIELD_SIZE(fld_size); + + /* Reset attribute_buf to empty, and load raw data in it */ + resetStringInfo(&cstate->attribute_buf); + + enlargeStringInfo(&cstate->attribute_buf, fld_size); + + memcpy(&cstate->attribute_buf.data[0], &cstate->line_buf.data[cstate->raw_buf_index], fld_size); + cstate->raw_buf_index += fld_size; + + cstate->attribute_buf.len = fld_size; + cstate->attribute_buf.data[fld_size] = '\0'; + + /* Call the column type's binary input converter */ + result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf, + typioparam, typmod); + + /* Trouble if it didn't eat the whole buffer */ + if (cstate->attribute_buf.cursor != cstate->attribute_buf.len) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("incorrect binary data format"))); + + *isnull = false; + return result; +} + +/* * GetLinePosition * * Return the line position once the leader has populated the data. diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 9b19dcb..49f438f 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -59,6 +59,109 @@ /* + * CHECK_FIELD_COUNT - Handles the error cases for field count + * for binary format files. + */ +#define CHECK_FIELD_COUNT \ +{\ + if (fld_count == -1) \ + { \ + if (IsParallelCopy() && \ + !IsLeader()) \ + return true; \ + else if (IsParallelCopy() && \ + IsLeader()) \ + { \ + if (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index + sizeof(fld_count)] != 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return true; \ + } \ + else \ + { \ + /* \ + * Received EOF marker. In a V3-protocol copy, wait for the \ + * protocol-level EOF, and complain if it doesn't come \ + * immediately. This ensures that we correctly handle CopyFail, \ + * if client chooses to send that now. \ + * \ + * Note that we MUST NOT try to read more data in an old-protocol \ + * copy, since there is no protocol-level EOF marker then. We \ + * could go either way for copy from file, but choose to throw \ + * error if there's data after the EOF marker, for consistency \ + * with the new-protocol case. \ + */ \ + char dummy; \ + if (cstate->copy_dest != COPY_OLD_FE && \ + CopyReadBinaryData(cstate, &dummy, 1) > 0) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("received copy data after EOF marker"))); \ + return false; \ + } \ + } \ + if (fld_count != cstate->max_fields) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("row field count is %d, expected %d", \ + (int) fld_count, cstate->max_fields))); \ +} + +/* + * CHECK_FIELD_SIZE - Handles the error case for field size + * for binary format files. + */ +#define CHECK_FIELD_SIZE(fld_size) \ +{ \ + if (fld_size < -1) \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("invalid field size")));\ +} + +/* + * EOF_ERROR - Error statement for EOF for binary format + * files. + */ +#define EOF_ERROR \ +{ \ + ereport(ERROR, \ + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \ + errmsg("unexpected EOF in COPY data")));\ +} + +/* + * GET_RAW_BUF_INDEX - Calculates the raw buf index for the cases + * where the data spread is across multiple data blocks. + */ +#define GET_RAW_BUF_INDEX(raw_buf_index, fld_size, required_blks, curr_blk_bytes) \ +{ \ + raw_buf_index = fld_size - (((required_blks - 1) * DATA_BLOCK_SIZE) + curr_blk_bytes); \ +} + +/* + * GET_REQUIRED_BLOCKS - Calculates the number of data + * blocks required for the cases where the data spread + * is across multiple data blocks. + */ +#define GET_REQUIRED_BLOCKS(required_blks, fld_size, curr_blk_bytes) \ +{ \ + /* \ + * field size can spread across multiple data blocks, \ + * calculate the number of required data blocks and try to get \ + * those many data blocks. \ + */ \ + required_blks = (int32)(fld_size - curr_blk_bytes)/(int32)DATA_BLOCK_SIZE; \ + /* \ + * check if we need the data block for the field data \ + * bytes that are not modulus of data block size. \ + */ \ + if ((fld_size - curr_blk_bytes)%DATA_BLOCK_SIZE != 0) \ + required_blks++; \ +} + +/* * Represents the different source/dest cases we need to worry about at * the bottom level */ @@ -236,6 +339,17 @@ typedef struct ParallelCopyLineBuf } ParallelCopyLineBuf; /* + * Represents the usage mode for CopyReadBinaryGetDataBlock. + */ +typedef enum FieldInfoType +{ + FIELD_NONE = 0, + FIELD_COUNT, + FIELD_SIZE, + FIELD_DATA +} FieldInfoType; + +/* * This structure helps in storing the common data from CopyStateData that are * required by the workers. This information will then be allocated and stored * into the DSM for the worker to retrieve and copy it to CopyStateData. @@ -258,6 +372,7 @@ typedef struct SerializedParallelCopyState /* Working state for COPY FROM */ AttrNumber num_defaults; Oid relid; + bool binary; } SerializedParallelCopyState; /* @@ -284,6 +399,9 @@ typedef struct ParallelCopyData /* Current position in worker_line_buf */ uint32 worker_line_buf_pos; + + /* For binary formatted files */ + ParallelCopyDataBlock *curr_data_block; } ParallelCopyData; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); @@ -470,4 +588,12 @@ extern uint32 UpdateSharedLineInfo(CopyState cstate, uint32 blk_pos, uint32 offs uint32 line_size, uint32 line_state, uint32 blk_line_pos); extern void EndLineParallelCopy(CopyState cstate, uint32 line_pos, uint32 line_size, uint32 raw_buf_ptr); +extern int CopyGetData(CopyState cstate, void *databuf, int minread, int maxread); +extern int CopyReadBinaryData(CopyState cstate, char *dest, int nbytes); +extern bool CopyReadBinaryTupleLeader(CopyState cstate); +extern bool CopyReadBinaryTupleWorker(CopyState cstate, Datum *values, bool *nulls); +extern void CopyReadBinaryFindTupleSize(CopyState cstate, uint32 *line_size); +extern Datum CopyReadBinaryAttributeWorker(CopyState cstate, FmgrInfo *flinfo, + Oid typioparam, int32 typmod, bool *isnull); +extern void CopyReadBinaryGetDataBlock(CopyState cstate, FieldInfoType field_info); #endif /* COPY_H */ -- 1.8.3.1