From 176af35834f1c2549b64010fb066586090d085d9 Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin Date: Tue, 22 Jul 2025 18:06:36 +0300 Subject: [PATCH v8 4/4] amcheck: brin_index_check() - heap all indexed This commit extends functionality of brin_index_check() with heap_all_indexed check: we validate every index range tuple against every heap tuple within the range using consistentFn. Also, we check here that fields 'has_nulls', 'all_nulls' and 'empty_range' are consistent with the range heap data. It's the most expensive part of the brin_index_check(), so it's optional. --- contrib/amcheck/amcheck--1.5--1.6.sql | 6 +- contrib/amcheck/expected/check_brin.out | 22 +- contrib/amcheck/sql/check_brin.sql | 22 +- contrib/amcheck/t/007_verify_brin.pl | 51 ++- contrib/amcheck/verify_brin.c | 490 +++++++++++++++++++++++- 5 files changed, 565 insertions(+), 26 deletions(-) diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql index 0354451c472..6337e065bb1 100644 --- a/contrib/amcheck/amcheck--1.5--1.6.sql +++ b/contrib/amcheck/amcheck--1.5--1.6.sql @@ -8,11 +8,13 @@ -- brin_index_check() -- CREATE FUNCTION brin_index_check(index regclass, - regularpagescheck boolean default false + regularpagescheck boolean default false, + heapallindexed boolean default false, + consistent_operator_names text[] default '{}' ) RETURNS VOID AS 'MODULE_PATHNAME', 'brin_index_check' LANGUAGE C STRICT PARALLEL RESTRICTED; -- We don't want this to be available to public -REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC; \ No newline at end of file +REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC; \ No newline at end of file diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out index e5fc52ed747..be85c32bc58 100644 --- a/contrib/amcheck/expected/check_brin.out +++ b/contrib/amcheck/expected/check_brin.out @@ -5,7 +5,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -19,7 +19,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -28,7 +28,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -42,7 +42,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -51,7 +51,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -65,7 +65,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -74,7 +74,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -88,7 +88,7 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -97,7 +97,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); brin_index_check ------------------ @@ -113,7 +113,7 @@ SELECT BOX(point(random() * 1000, random() * 1000), point(random() * 1000, rando FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); brin_index_check ------------------ @@ -122,7 +122,7 @@ SELECT brin_index_check('brintest_idx'::REGCLASS, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); brin_index_check ------------------ diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql index b36af37fe03..4f16f31c7f8 100644 --- a/contrib/amcheck/sql/check_brin.sql +++ b/contrib/amcheck/sql/check_brin.sql @@ -7,7 +7,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a BIGINT) WITH (FILLFACTOR = 10); CREATE INDEX brintest_idx ON brintest USING BRIN (a); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -17,12 +17,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops, id int8_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a TEXT_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -34,12 +34,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -51,12 +51,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_minmax_multi_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -68,12 +68,12 @@ CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a int8_bloom_ops) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true); -- cleanup DROP TABLE brintest; @@ -87,12 +87,12 @@ FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING BRIN (a BOX_INCLUSION_OPS) WITH (PAGES_PER_RANGE = 2); -SELECT brin_index_check('brintest_idx'::REGCLASS, true); +SELECT brin_index_check('brintest_idx'::REGCLASS, true, true, '{"@>"}'); -- cleanup DROP TABLE brintest; diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl index 2c62b76cc70..51bfed7e273 100644 --- a/contrib/amcheck/t/007_verify_brin.pl +++ b/contrib/amcheck/t/007_verify_brin.pl @@ -200,6 +200,55 @@ my @tests = ( return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); }, expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)") + }, + { + # range is marked as empty_range, but heap has some data for the range + + find => pack('LCC', 0, 0x88, 0x03), + replace => pack('LCC', 0, 0xA8, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null);); + }, + expected => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)') + }, + { + # range hasnulls & allnulls are false, but heap contains null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x00), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)') + }, + { + # range allnulls is true, but heap contains non-null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)') + }, + { + # consistent function return FALSE for the valid heap value + # replace "ccccc" with "bbbbb" so that min_max index was too narrow + + find => 'ccccc', + replace => 'bbbbb', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc');); + }, + expected => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)') } ); @@ -241,7 +290,7 @@ foreach my $test_struct (@tests) { $node->start; foreach my $test_struct (@tests) { - my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true))); + my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true))); like($stderr, $test_struct->{expected}); } diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c index d4024e76b56..b7bf1513734 100644 --- a/contrib/amcheck/verify_brin.c +++ b/contrib/amcheck/verify_brin.c @@ -39,6 +39,8 @@ typedef struct BrinCheckState /* Check arguments */ bool regularpagescheck; + bool heapallindexed; + ArrayType *consistent_oper_names; /* BRIN check common fields */ @@ -67,6 +69,30 @@ typedef struct BrinCheckState Page regpage; OffsetNumber regpageoffset; + /* Heap all indexed check fields */ + + String **operatorNames; + BrinRevmap *revmap; + Buffer buf; + FmgrInfo *consistentFn; + /* Scan keys for regular values */ + ScanKey *nonnull_sk; + /* Scan keys for null values */ + ScanKey *isnull_sk; + double range_cnt; + /* first block of the next range */ + BlockNumber nextrangeBlk; + + /* + * checkable_range shows if current range could be checked and dtup + * contains valid index tuple for the range. It could be false if the + * current range is not summarized, or it's placeholder, or it's just a + * beginning of the check + */ + bool checkable_range; + BrinMemTuple *dtup; + MemoryContext rangeCtx; + MemoryContext heaptupleCtx; } BrinCheckState; static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly); @@ -87,6 +113,23 @@ static bool revmap_points_to_index_tuple(BrinCheckState * state); static ItemId PageGetItemIdCareful(BrinCheckState * state); +static void check_heap_all_indexed(BrinCheckState * state); + +static void check_and_prepare_operator_names(BrinCheckState * state); + +static void brin_check_callback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate); + +static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid); + +static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno); + +static ScanKey prepare_isnull_scan_key(AttrNumber attno); + static void brin_check_ereport(BrinCheckState * state, const char *fmt); static void revmap_item_ereport(BrinCheckState * state, const char *fmt); @@ -95,6 +138,7 @@ static void index_tuple_ereport(BrinCheckState * state, const char *fmt); static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt); +static void heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message); Datum brin_index_check(PG_FUNCTION_ARGS) @@ -103,6 +147,8 @@ brin_index_check(PG_FUNCTION_ARGS) BrinCheckState *state = palloc0(sizeof(BrinCheckState)); state->regularpagescheck = PG_GETARG_BOOL(1); + state->heapallindexed = PG_GETARG_BOOL(2); + state->consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3); amcheck_lock_relation_and_check(indrelid, BRIN_AM_OID, @@ -127,9 +173,27 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl state->bdesc = brin_build_desc(idxrel); state->natts = state->bdesc->bd_tupdesc->natts; + /* + * We know how many attributes index has, so let's process operator names + * array + */ + if (state->heapallindexed) + { + check_and_prepare_operator_names(state); + + /* + * Check if we are OK with indcheckxmin, and unregister snapshot as we + * don't need it further + */ + UnregisterSnapshot(RegisterSnapshotAndCheckIndexCheckXMin(state->idxrel)); + } check_brin_index_structure(state); + if (state->heapallindexed) + { + check_heap_all_indexed(state); + } brin_free_desc(state->bdesc); } @@ -628,7 +692,6 @@ check_regular_pages(BrinCheckState * state) state->regpageoffset = InvalidOffsetNumber; state->idxnblocks = RelationGetNumberOfBlocks(state->idxrel); - /* * Prepare stream data for regular pages walk. It is safe to use batchmode * as block_range_read_stream_cb takes no locks. @@ -788,6 +851,415 @@ PageGetItemIdCareful(BrinCheckState * state) return itemid; } +/* + * Check that every heap tuple are consistent with the index. + * + * Here we generate ScanKey for every heap tuple and test it against + * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_key') + * + * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls' + * are not too "narrow" for each range, which means: + * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true) + * 2) all_nulls = true, but we see nonnull value. + * 3) empty_range = true, but we see tuple within the range. + * + * We use allowSync = false, because this way + * we process full ranges one by one from the first range. + * It's not necessary, but makes the code simpler and this way + * we need to fetch every index tuple only once. + */ +static void +check_heap_all_indexed(BrinCheckState * state) +{ + Relation idxrel = state->idxrel; + Relation heaprel = state->heaprel; + double reltuples; + IndexInfo *indexInfo; + + /* heap all indexed check fields initialization */ + + state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange); + state->dtup = brin_new_memtuple(state->bdesc); + state->checkable_range = false; + state->consistentFn = palloc0_array(FmgrInfo, state->natts); + state->range_cnt = 0; + /* next range is the first range in the beginning */ + state->nextrangeBlk = 0; + state->nonnull_sk = palloc0_array(ScanKey, state->natts); + state->isnull_sk = palloc0_array(ScanKey, state->natts); + state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check range context", + ALLOCSET_DEFAULT_SIZES); + state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check tuple context", + ALLOCSET_DEFAULT_SIZES); + + /* + * Prepare "non-null" and "is_null" scan keys and consistent fn for each + * attribute + */ + for (AttrNumber attno = 1; attno <= state->natts; attno++) + { + FmgrInfo *tmp; + + tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT); + fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext); + + state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno); + state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno); + } + + indexInfo = BuildIndexInfo(idxrel); + + /* + * Use snapshot to check only those tuples that are guaranteed to be + * indexed already. Using SnapshotAny would make it more difficult to say + * if there is a corruption or checked tuple just haven't been indexed + * yet. Also, we want to support CIC indexes. + */ + indexInfo->ii_Concurrent = true; + reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true, + brin_check_callback, (void *) state, NULL); + + elog(DEBUG3, "ranges were checked: %f", state->range_cnt); + elog(DEBUG3, "scan total tuples: %f", reltuples); + + if (state->buf != InvalidBuffer) + ReleaseBuffer(state->buf); + + brinRevmapTerminate(state->revmap); + MemoryContextDelete(state->rangeCtx); + MemoryContextDelete(state->heaptupleCtx); +} + +/* + * Check operator names array input parameter and convert it to array of strings + * Empty input array means we use "=" operator for every attribute + */ +static void +check_and_prepare_operator_names(BrinCheckState * state) +{ + Oid element_type = ARR_ELEMTYPE(state->consistent_oper_names); + int16 typlen; + bool typbyval; + char typalign; + Datum *values; + bool *elem_nulls; + int num_elems; + + state->operatorNames = palloc(sizeof(String) * state->natts); + + get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign); + deconstruct_array(state->consistent_oper_names, element_type, typlen, typbyval, typalign, + &values, &elem_nulls, &num_elems); + + /* If we have some input check it and convert to String** */ + if (num_elems != 0) + { + if (num_elems != state->natts) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Operator names array length %u, but index has %u attributes", + num_elems, state->natts))); + } + + for (int i = 0; i < num_elems; i++) + { + if (elem_nulls[i]) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Operator names array contains NULL"))); + } + state->operatorNames[i] = makeString(TextDatumGetCString(values[i])); + } + } + else + { + /* If there is no input just use "=" operator for all attributes */ + for (int i = 0; i < state->natts; i++) + { + state->operatorNames[i] = makeString("="); + } + } +} + +/* + * Prepare ScanKey for index attribute. + * + * ConsistentFn requires ScanKey, so we need to generate ScanKey for every + * attribute somehow. We want ScanKey that would result in TRUE for every heap + * tuple within the range when we use its indexed value as sk_argument. + * To generate such a ScanKey we need to define the right operand type and the strategy number. + * Right operand type is a type of data that index is built on, so it's 'opcintype'. + * There is no strategy number that we can always use, + * because every opclass defines its own set of operators it supports and strategy number + * for the same operator can differ from opclass to opclass. + * So to get strategy number we look up an operator that gives us desired behavior + * and which both operand types are 'opcintype' and then retrieve the strategy number for it. + * Most of the time we can use '='. We let user define operator name in case opclass doesn't + * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check. + * + * Generated once, and will be reused for all heap tuples. + * Argument field will be filled for every heap tuple before + * consistent function invocation, so leave it NULL for a while. + * + */ +static ScanKey +prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno) +{ + ScanKey scanKey; + Oid opOid; + Oid opFamilyOid; + bool defined; + StrategyNumber strategy; + RegProcedure opRegProc; + List *operNameList; + int attindex = attno - 1; + Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex); + Oid type = state->idxrel->rd_opcintype[attindex]; + String *opname = state->operatorNames[attno - 1]; + + opFamilyOid = state->idxrel->rd_opfamily[attindex]; + operNameList = list_make1(opname); + opOid = OperatorLookup(operNameList, type, type, &defined); + + if (opOid == InvalidOid) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("There is no operator %s for type %u", + opname->sval, type))); + } + + strategy = get_op_opfamily_strategy(opOid, opFamilyOid); + + if (strategy == 0) + { + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("operator %s is not a member of operator family \"%s\"", + opname->sval, + get_opfamily_name(opFamilyOid, false)))); + } + + opRegProc = get_opcode(opOid); + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize( + scanKey, + 0, + attno, + strategy, + type, + attr->attcollation, + opRegProc, + (Datum) NULL + ); + pfree(operNameList); + + return scanKey; +} + +static ScanKey +prepare_isnull_scan_key(AttrNumber attno) +{ + ScanKey scanKey; + + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize(scanKey, + SK_ISNULL | SK_SEARCHNULL, + attno, + InvalidStrategy, + InvalidOid, + InvalidOid, + InvalidOid, + (Datum) 0); + return scanKey; +} + +/* + * We walk from the first range (blkno = 0) to the last as the scan proceed. + * For every heap tuple we check if we are done with the current range, and we need to move further + * to the current heap tuple's range. While moving to the next range we check that it's not empty (because + * we have at least one tuple for this range). + * Every heap tuple are checked to be consistent with the range it belongs to. + * In case of unsummarized ranges and placeholders we skip all checks. + * + * While moving, we may jump over some ranges, + * but it's okay because we would not be able to check them anyway. + * We also can't say whether skipped ranges should be marked as empty or not, + * since it's possible that there were some tuples before that are now deleted. + * + */ +static void +brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate) +{ + BrinCheckState *state; + BlockNumber heapblk; + + state = (BrinCheckState *) brstate; + heapblk = ItemPointerGetBlockNumber(tid); + + /* If we went beyond the current range let's fetch new range */ + if (heapblk >= state->nextrangeBlk) + { + BrinTuple *tup; + BrinTuple *tupcopy = NULL; + MemoryContext oldCtx; + OffsetNumber off; + Size size; + Size btupsz = 0; + + MemoryContextReset(state->rangeCtx); + oldCtx = MemoryContextSwitchTo(state->rangeCtx); + + state->range_cnt++; + + /* Move to the range that contains current heap tuple */ + tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf, + &off, &size, BUFFER_LOCK_SHARE); + + if (tup) + { + tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz); + LockBuffer(state->buf, BUFFER_LOCK_UNLOCK); + state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup); + + /* We can't check placeholder ranges */ + state->checkable_range = !state->dtup->bt_placeholder; + } + else + { + /* We can't check unsummarized ranges. */ + state->checkable_range = false; + } + + /* + * Update nextrangeBlk so we know when we are done with the current + * range + */ + state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange; + + MemoryContextSwitchTo(oldCtx); + + /* Range must not be empty */ + if (state->checkable_range && state->dtup->bt_empty_range) + { + heap_all_indexed_ereport(state, tid, "range is marked as empty but contains qualified live tuples"); + } + + } + + /* Check tuple is consistent with the index */ + if (state->checkable_range) + { + check_heap_tuple(state, values, isnull, tid); + } + +} + +/* + * We check hasnulls flags for null values and oi_regular_nulls = true, + * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not, + * For all other cases we call consistentFn with appropriate scanKey: + * - for oi_regular_nulls = false and null values we use 'isNull' scanKey, + * - for nonnull values we use 'nonnull' scanKey + */ +static void +check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid) +{ + int attindex; + BrinMemTuple *dtup = state->dtup; + BrinDesc *bdesc = state->bdesc; + MemoryContext oldCtx; + + Assert(state->checkable_range); + + MemoryContextReset(state->heaptupleCtx); + oldCtx = MemoryContextSwitchTo(state->heaptupleCtx); + + /* check every index attribute */ + for (attindex = 0; attindex < state->natts; attindex++) + { + BrinValues *bval; + Datum consistentFnResult; + bool consistent; + ScanKey scanKey; + bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls; + + bval = &dtup->bt_columns[attindex]; + + if (nulls[attindex]) + { + /* + * Use hasnulls flag for oi_regular_nulls is true. Otherwise, + * delegate check to consistentFn + */ + if (oi_regular_nulls) + { + /* We have null value, so hasnulls or allnulls must be true */ + if (!(bval->bv_hasnulls || bval->bv_allnulls)) + { + heap_all_indexed_ereport(state, tid, + "range hasnulls and allnulls are false, but contains a null value"); + } + continue; + } + + /* + * In case of null and oi_regular_nulls = false we use isNull + * scanKey for invocation of consistentFn + */ + scanKey = state->isnull_sk[attindex]; + } + else + { + /* We have a nonnull value, so allnulls should be false */ + if (bval->bv_allnulls) + { + heap_all_indexed_ereport(state, tid, "range allnulls is true, but contains nonnull value"); + } + + /* use nonnull scan key */ + scanKey = state->nonnull_sk[attindex]; + scanKey->sk_argument = values[attindex]; + } + + /* If oi_regular_nulls = true we should never get there with null */ + Assert(!oi_regular_nulls || !nulls[attindex]); + + if (state->consistentFn[attindex].fn_nargs >= 4) + { + consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(&scanKey), + Int32GetDatum(1) + ); + } + else + { + consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(scanKey) + ); + } + + consistent = DatumGetBool(consistentFnResult); + + if (!consistent) + { + heap_all_indexed_ereport(state, tid, "heap tuple inconsistent with index"); + } + + } + + MemoryContextSwitchTo(oldCtx); +} /* Report without any additional info */ static void @@ -853,3 +1325,19 @@ revmap_item_ereport(BrinCheckState * state, const char *fmt) state->revmapBlk, state->revmapidx))); } + +/* Report with range blkno, heap tuple info */ +static void +heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)", + RelationGetRelationName(state->idxrel), + message, + state->dtup->bt_blkno, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)))); +} -- 2.43.0