From 51d23d2a38a58f154958602e471458bbae5c38f7 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 10 Jun 2019 10:54:42 -0700 Subject: [PATCH v1] hashloop fallback First part is to "chunk" the inner file into arbitrary partitions of work_mem size This chunks inner file and makes it so that the offset is along tuple bounds. Note that this makes it impossible to increase nbatches during the loading of batches after initial hashtable creation In preparation for doing this chunking, separate advance batch and load batch. advance batch only if page offset is reset to 0, then load that part of the batch Second part was to: implement outer tuple batch rewinding per chunk of inner batch Would be a simple rewind and replay of outer side for each chunk of inner if it weren't for LOJ. Because we need to wait to emit NULL-extended tuples for LOJ until after all chunks of inner have been processed. To do this, make a list with an entry for each outer tuple and keep track of its match status. Also, keep track of its offset so that we can access the file at that offset in case the tuples are not processed in order (like in parallel case--not handled here but in anticipation of such cases) --- src/backend/executor/nodeHashjoin.c | 212 ++++++++++++++++++++-- src/include/executor/hashjoin.h | 10 + src/include/nodes/execnodes.h | 10 + src/test/regress/expected/adaptive_hj.out | 209 +++++++++++++++++++++ src/test/regress/sql/adaptive_hj.sql | 31 ++++ 5 files changed, 456 insertions(+), 16 deletions(-) create mode 100644 src/test/regress/expected/adaptive_hj.out create mode 100644 src/test/regress/sql/adaptive_hj.sql diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 8484a287e7..7207ab1e57 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -127,6 +127,7 @@ #define HJ_FILL_OUTER_TUPLE 4 #define HJ_FILL_INNER_TUPLES 5 #define HJ_NEED_NEW_BATCH 6 +#define HJ_ADAPTIVE_EMIT_UNMATCHED 7 /* Returns true if doing null-fill on outer relation */ #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL) @@ -143,10 +144,13 @@ static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, TupleTableSlot *tupleSlot); -static bool ExecHashJoinNewBatch(HashJoinState *hjstate); +static bool ExecHashJoinAdvanceBatch(HashJoinState *hjstate); static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); static void ExecParallelHashJoinPartitionOuter(HashJoinState *node); +static bool LoadInnerBatch(HashJoinState *hjstate); +static TupleTableSlot *ExecHashJoinGetOuterTupleAtOffset(HashJoinState *hjstate, off_t offset); +static OuterOffsetMatchStatus *cursor = NULL; /* ---------------------------------------------------------------- * ExecHashJoinImpl @@ -176,6 +180,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) int batchno; ParallelHashJoinState *parallel_state; + /* * get information from HashJoin node */ @@ -198,6 +203,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) */ for (;;) { + /* * It's possible to iterate this loop many times before returning a * tuple, in some pathological cases such as needing to move much of @@ -368,9 +374,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) node->hj_JoinState = HJ_NEED_NEW_BATCH; continue; } - + /* + * only initialize this to false during the first chunk -- + * otherwise, we will be resetting a tuple that had a match to false + */ + if (node->first_chunk || hashtable->curbatch == 0) + node->hj_MatchedOuter = false; econtext->ecxt_outertuple = outerTupleSlot; - node->hj_MatchedOuter = false; /* * Find the corresponding bucket for this tuple in the main @@ -410,6 +420,47 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) continue; } + /* + * We need to construct the linked list of match statuses on the first chunk. + * Note that node->first_chunk isn't true until HJ_NEED_NEW_BATCH + * so this means that we don't construct this list on batch 0. + */ + if (node->first_chunk) + { + BufFile *outerFile = hashtable->outerBatchFile[batchno]; + + if (outerFile != NULL) + { + OuterOffsetMatchStatus *outerOffsetMatchStatus = NULL; + + outerOffsetMatchStatus = palloc(sizeof(struct OuterOffsetMatchStatus)); + outerOffsetMatchStatus->match_status = false; + outerOffsetMatchStatus->outer_tuple_start_offset = 0L; + outerOffsetMatchStatus->next = NULL; + + if (node->first_outer_offset_match_status != NULL) + { + node->current_outer_offset_match_status->next = outerOffsetMatchStatus; + node->current_outer_offset_match_status = outerOffsetMatchStatus; + } + else + { + node->first_outer_offset_match_status = outerOffsetMatchStatus; + node->current_outer_offset_match_status = node->first_outer_offset_match_status; + } + + outerOffsetMatchStatus->outer_tuple_val = DatumGetInt32(outerTupleSlot->tts_values[0]); + outerOffsetMatchStatus->outer_tuple_start_offset = node->HJ_NEED_NEW_OUTER_tup_start; + } + } + else if (node->hj_HashTable->curbatch > 0) + { + if (node->current_outer_offset_match_status == NULL) + node->current_outer_offset_match_status = node->first_outer_offset_match_status; + else + node->current_outer_offset_match_status = node->current_outer_offset_match_status->next; + } + /* OK, let's scan the bucket for matches */ node->hj_JoinState = HJ_SCAN_BUCKET; @@ -455,6 +506,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); + if (node->current_outer_offset_match_status) + node->current_outer_offset_match_status->match_status = true; /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI) @@ -492,6 +545,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (!node->hj_MatchedOuter && HJ_FILL_OUTER(node)) { + if (node->current_outer_offset_match_status) + break; /* * Generate a fake join tuple with nulls for the inner * tuple, and return it if it passes the non-join quals. @@ -543,12 +598,56 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) } else { - if (!ExecHashJoinNewBatch(node)) - return NULL; /* end of parallel-oblivious join */ + if (node->inner_page_offset == 0L) + { + /* + * This case is entered on two separate conditions: + * when we need to load the first batch ever in this hash join; + * or when we've exhausted the outer side of the current batch. + */ + if (node->first_outer_offset_match_status && HJ_FILL_OUTER(node)) + { + node->hj_JoinState = HJ_ADAPTIVE_EMIT_UNMATCHED; + cursor = node->first_outer_offset_match_status; + break; + } + + if (!ExecHashJoinAdvanceBatch(node)) + return NULL; /* end of parallel-oblivious join */ + } + LoadInnerBatch(node); + + if (node->first_chunk) + node->first_outer_offset_match_status = NULL; + node->current_outer_offset_match_status = NULL; } node->hj_JoinState = HJ_NEED_NEW_OUTER; break; + case HJ_ADAPTIVE_EMIT_UNMATCHED: + while (cursor) + { + TupleTableSlot *outer_unmatched_tup; + if (cursor->match_status == true) + { + cursor = cursor->next; + continue; + } + /* + * if it is not a match, go to the offset in the page that it specifies + * and emit it NULL-extended + */ + outer_unmatched_tup = ExecHashJoinGetOuterTupleAtOffset(node, cursor->outer_tuple_start_offset); + econtext->ecxt_outertuple = outer_unmatched_tup; + econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; + cursor = cursor->next; + return ExecProject(node->js.ps.ps_ProjInfo); + } + + node->hj_JoinState = HJ_NEED_NEW_BATCH; + node->first_outer_offset_match_status = NULL; + break; + default: elog(ERROR, "unrecognized hashjoin state: %d", (int) node->hj_JoinState); @@ -628,6 +727,11 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate->js.ps.ExecProcNode = ExecHashJoin; hjstate->js.jointype = node->join.jointype; + hjstate->inner_page_offset = 0L; + hjstate->HJ_NEED_NEW_OUTER_tup_start = 0L; + hjstate->HJ_NEED_NEW_OUTER_tup_end = 0L; + hjstate->current_outer_offset_match_status = NULL; + hjstate->first_outer_offset_match_status = NULL; /* * Miscellaneous initialization * @@ -805,6 +909,28 @@ ExecEndHashJoin(HashJoinState *node) ExecEndNode(innerPlanState(node)); } +static TupleTableSlot * +ExecHashJoinGetOuterTupleAtOffset(HashJoinState *hjstate, off_t offset) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + TupleTableSlot *slot; + uint32 hashvalue; + + BufFile *file = hashtable->outerBatchFile[curbatch]; + /* ? should fileno always be 0? */ + if (BufFileSeek(file, 0, offset, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + + slot = ExecHashJoinGetSavedTuple(hjstate, + file, + &hashvalue, + hjstate->hj_OuterTupleSlot); + return slot; +} + /* * ExecHashJoinOuterGetTuple * @@ -951,20 +1077,17 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* - * ExecHashJoinNewBatch + * ExecHashJoinAdvanceBatch * switch to a new hashjoin batch * * Returns true if successful, false if there are no more batches. */ static bool -ExecHashJoinNewBatch(HashJoinState *hjstate) +ExecHashJoinAdvanceBatch(HashJoinState *hjstate) { HashJoinTable hashtable = hjstate->hj_HashTable; int nbatch; int curbatch; - BufFile *innerFile; - TupleTableSlot *slot; - uint32 hashvalue; nbatch = hashtable->nbatch; curbatch = hashtable->curbatch; @@ -1039,10 +1162,31 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) curbatch++; } + hjstate->inner_page_offset = 0L; + hjstate->first_chunk = true; if (curbatch >= nbatch) return false; /* no more batches */ hashtable->curbatch = curbatch; + return true; +} + +/* + * Returns true if there are more chunks left, false otherwise + */ +static bool LoadInnerBatch(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + BufFile *innerFile; + TupleTableSlot *slot; + uint32 hashvalue; + + off_t tup_start_offset; + off_t chunk_start_offset; + off_t tup_end_offset; + int64 current_saved_size; + int current_fileno; /* * Reload the hash table with the new inner batch (which could be empty) @@ -1051,27 +1195,60 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) innerFile = hashtable->innerBatchFile[curbatch]; + /* + * Reset this even if the innerfile is not null + */ + hjstate->first_chunk = hjstate->inner_page_offset == 0L; + if (innerFile != NULL) { - if (BufFileSeek(innerFile, 0, 0L, SEEK_SET)) + /* should fileno always be 0? */ + if (BufFileSeek(innerFile, 0, hjstate->inner_page_offset, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); + chunk_start_offset = hjstate->inner_page_offset; + tup_end_offset = hjstate->inner_page_offset; while ((slot = ExecHashJoinGetSavedTuple(hjstate, innerFile, &hashvalue, hjstate->hj_HashTupleSlot))) { + /* next tuple's start is last tuple's end */ + tup_start_offset = tup_end_offset; + /* after we got the tuple, figure out what the offset is */ + BufFileTell(innerFile, ¤t_fileno, &tup_end_offset); + current_saved_size = tup_end_offset - chunk_start_offset; + if (current_saved_size > work_mem) + { + hjstate->inner_page_offset = tup_start_offset; + /* + * Rewind outer batch file (if present), so that we can start reading it. + */ + if (hashtable->outerBatchFile[curbatch] != NULL) + { + if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } + return true; + } + hjstate->inner_page_offset = tup_end_offset; /* - * NOTE: some tuples may be sent to future batches. Also, it is - * possible for hashtable->nbatch to be increased here! + * NOTE: some tuples may be sent to future batches. + * With current hashloop patch, however, it is not possible + * for hashtable->nbatch to be increased here */ ExecHashTableInsert(hashtable, slot, hashvalue); } + // this is the end of the file + hjstate->inner_page_offset = 0L; + /* - * after we build the hash table, the inner batch file is no longer + * after we processed all chunks, the inner batch file is no longer * needed */ BufFileClose(innerFile); @@ -1088,8 +1265,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); } - - return true; + return false; } /* @@ -1270,6 +1446,8 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate, uint32 header[2]; size_t nread; MinimalTuple tuple; + int dummy_fileno; + /* * We check for interrupts here because this is typically taken as an @@ -1278,6 +1456,7 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate, */ CHECK_FOR_INTERRUPTS(); + BufFileTell(file, &dummy_fileno, &hjstate->HJ_NEED_NEW_OUTER_tup_start); /* * Since both the hash value and the MinimalTuple length word are uint32, * we can read them both in one BufFileRead() call without any type @@ -1304,6 +1483,7 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate, (errcode_for_file_access(), errmsg("could not read from hash-join temporary file: %m"))); ExecForceStoreMinimalTuple(tuple, tupleSlot, true); + BufFileTell(file, &dummy_fileno, &hjstate->HJ_NEED_NEW_OUTER_tup_end); return tupleSlot; } diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 2c94b926d3..bd5aeba74c 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -59,6 +59,16 @@ * if so, we just dump them out to the correct batch file. * ---------------------------------------------------------------- */ +struct OuterOffsetMatchStatus; +typedef struct OuterOffsetMatchStatus OuterOffsetMatchStatus; + +struct OuterOffsetMatchStatus +{ + bool match_status; + off_t outer_tuple_start_offset; + int32 outer_tuple_val; + struct OuterOffsetMatchStatus *next; +}; /* these are in nodes/execnodes.h: */ /* typedef struct HashJoinTupleData *HashJoinTuple; */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 99b9fa414f..874fc47ffe 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -42,6 +42,8 @@ struct RangeTblEntry; /* avoid including parsenodes.h here */ struct ExprEvalStep; /* avoid including execExpr.h everywhere */ struct CopyMultiInsertBuffer; +struct OuterOffsetMatchStatus; + /* ---------------- * ExprState node @@ -1899,6 +1901,14 @@ typedef struct HashJoinState int hj_JoinState; bool hj_MatchedOuter; bool hj_OuterNotEmpty; + + off_t inner_page_offset; + bool first_chunk; + struct OuterOffsetMatchStatus *first_outer_offset_match_status; + struct OuterOffsetMatchStatus *current_outer_offset_match_status; + + off_t HJ_NEED_NEW_OUTER_tup_start; + off_t HJ_NEED_NEW_OUTER_tup_end; } HashJoinState; diff --git a/src/test/regress/expected/adaptive_hj.out b/src/test/regress/expected/adaptive_hj.out new file mode 100644 index 0000000000..4bdc681994 --- /dev/null +++ b/src/test/regress/expected/adaptive_hj.out @@ -0,0 +1,209 @@ +drop table if exists t1; +NOTICE: table "t1" does not exist, skipping +drop table if exists t2; +NOTICE: table "t2" does not exist, skipping +create table t1(a int); +create table t2(b int); +insert into t1 values(1),(2); +insert into t2 values(2),(3); +insert into t1 select i from generate_series(1,10)i; +insert into t2 select i from generate_series(2,10)i; +insert into t1 select 2 from generate_series(1,5)i; +insert into t2 select 2 from generate_series(2,7)i; +set work_mem=64; +set enable_mergejoin to off; +select * from t1 left outer join t2 on a = b order by b; + a | b +----+---- + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 3 | 3 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 + 1 | + 1 | +(67 rows) + +select count(*) from t1 left outer join t2 on a = b; + count +------- + 67 +(1 row) + +select * from t1, t2 where a = b; + a | b +----+---- + 5 | 5 + 3 | 3 + 3 | 3 + 4 | 4 + 7 | 7 + 6 | 6 + 9 | 9 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 8 | 8 + 10 | 10 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 +(65 rows) + +select count(*) from t1, t2 where a = b; + count +------- + 65 +(1 row) + +truncate table t1; +insert into t1 values (1),(2),(2),(3); +truncate table t2; +insert into t2 values(2),(2),(3),(3),(4); +set work_mem=64; +set enable_mergejoin to off; +select * from t1 left outer join t2 on a = b order by b; + a | b +---+--- + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 + 3 | 3 + 3 | 3 + 1 | +(7 rows) + +select count(*) from t1 left outer join t2 on a = b; + count +------- + 7 +(1 row) + +select * from t1, t2 where a = b; + a | b +---+--- + 3 | 3 + 3 | 3 + 2 | 2 + 2 | 2 + 2 | 2 + 2 | 2 +(6 rows) + +select count(*) from t1, t2 where a = b; + count +------- + 6 +(1 row) + diff --git a/src/test/regress/sql/adaptive_hj.sql b/src/test/regress/sql/adaptive_hj.sql new file mode 100644 index 0000000000..6b7c4d9eff --- /dev/null +++ b/src/test/regress/sql/adaptive_hj.sql @@ -0,0 +1,31 @@ +drop table if exists t1; +drop table if exists t2; +create table t1(a int); +create table t2(b int); + +insert into t1 values(1),(2); +insert into t2 values(2),(3); +insert into t1 select i from generate_series(1,10)i; +insert into t2 select i from generate_series(2,10)i; +insert into t1 select 2 from generate_series(1,5)i; +insert into t2 select 2 from generate_series(2,7)i; +set work_mem=64; +set enable_mergejoin to off; + +select * from t1 left outer join t2 on a = b order by b; +select count(*) from t1 left outer join t2 on a = b; +select * from t1, t2 where a = b; +select count(*) from t1, t2 where a = b; + +truncate table t1; +insert into t1 values (1),(2),(2),(3); +truncate table t2; +insert into t2 values(2),(2),(3),(3),(4); + +set work_mem=64; +set enable_mergejoin to off; + +select * from t1 left outer join t2 on a = b order by b; +select count(*) from t1 left outer join t2 on a = b; +select * from t1, t2 where a = b; +select count(*) from t1, t2 where a = b; -- 2.21.0