From 531aeac7310864ac4f59573bd71254e763adab70 Mon Sep 17 00:00:00 2001 From: Arseniy Mukhin Date: Sun, 10 Aug 2025 16:25:08 +0300 Subject: [PATCH v10 4/4] amcheck: brin_index_check() - heap all indexed This commit extends functionality of brin_index_check() with heap_all_indexed check: we validate every index range tuple against every heap tuple within the range using consistentFn. Also, we check here that fields 'has_nulls', 'all_nulls' and 'empty_range' are consistent with the range heap data. It's the most expensive part of the brin_index_check(), so it's optional. --- contrib/amcheck/amcheck--1.5--1.6.sql | 6 +- contrib/amcheck/expected/check_brin.out | 60 ++- contrib/amcheck/sql/check_brin.sql | 54 ++- contrib/amcheck/t/007_verify_brin.pl | 51 ++- contrib/amcheck/verify_brin.c | 501 ++++++++++++++++++++++++ doc/src/sgml/amcheck.sgml | 39 +- 6 files changed, 685 insertions(+), 26 deletions(-) diff --git a/contrib/amcheck/amcheck--1.5--1.6.sql b/contrib/amcheck/amcheck--1.5--1.6.sql index 0354451c472..55276527e68 100644 --- a/contrib/amcheck/amcheck--1.5--1.6.sql +++ b/contrib/amcheck/amcheck--1.5--1.6.sql @@ -8,11 +8,13 @@ -- brin_index_check() -- CREATE FUNCTION brin_index_check(index regclass, - regularpagescheck boolean default false + regularpagescheck boolean default false, + heapallindexed boolean default false, + variadic text[] default '{}' ) RETURNS VOID AS 'MODULE_PATHNAME', 'brin_index_check' LANGUAGE C STRICT PARALLEL RESTRICTED; -- We don't want this to be available to public -REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean) FROM PUBLIC; \ No newline at end of file +REVOKE ALL ON FUNCTION brin_index_check(regclass, boolean, boolean, text[]) FROM PUBLIC; \ No newline at end of file diff --git a/contrib/amcheck/expected/check_brin.out b/contrib/amcheck/expected/check_brin.out index 6890fff46bd..909b41cb7a9 100644 --- a/contrib/amcheck/expected/check_brin.out +++ b/contrib/amcheck/expected/check_brin.out @@ -5,7 +5,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a bigint) WITH (fillfactor = 10); CREATE INDEX brintest_idx ON brintest USING brin (a); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -19,7 +19,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -28,7 +28,7 @@ SELECT brin_index_check('brintest_idx', true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -42,7 +42,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -51,7 +51,7 @@ SELECT brin_index_check('brintest_idx', true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -65,7 +65,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -74,7 +74,7 @@ SELECT brin_index_check('brintest_idx', true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -90,7 +90,7 @@ SELECT box(point(random() * 1000, random() * 1000), point(random() * 1000, rando FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true, '@>'); brin_index_check ------------------ @@ -99,7 +99,7 @@ SELECT brin_index_check('brintest_idx', true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true, '@>'); brin_index_check ------------------ @@ -113,7 +113,7 @@ CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ @@ -122,12 +122,50 @@ SELECT brin_index_check('brintest_idx', true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); brin_index_check ------------------ (1 row) +-- cleanup +DROP TABLE brintest; +-- multiple attributes test with custom operators +CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10); +CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2); +INSERT INTO brintest (a, b) SELECT + random_string((x % 100)), + box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000)) +FROM generate_series(1, 3000) x; +-- create some empty ranges +DELETE FROM brintest WHERE id > 1500 AND id < 2500; +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>'); + brin_index_check +------------------ + +(1 row) + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2); +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>'); + brin_index_check +------------------ + +(1 row) + +-- error if it's impossible to use default operator for all index attributes +SELECT brin_index_check('brintest_idx', true, true); +ERROR: Operator = is not a member of operator family "box_inclusion_ops" +-- error if number of operators in input doesn't match index attributes number +SELECT brin_index_check('brintest_idx', true, true, '='); +ERROR: Number of operator names in input (1) doesn't match index attributes number (3) +-- error if operator name is NULL +SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL); +ERROR: Operator name must not be NULL +-- error if there is no operator for attribute type +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@'); +ERROR: There is no operator @@ for type "box" -- cleanup DROP TABLE brintest; -- cleanup diff --git a/contrib/amcheck/sql/check_brin.sql b/contrib/amcheck/sql/check_brin.sql index 1c97b370cac..66dd1647d3b 100644 --- a/contrib/amcheck/sql/check_brin.sql +++ b/contrib/amcheck/sql/check_brin.sql @@ -7,7 +7,7 @@ $$ LANGUAGE sql; -- empty table index should be valid CREATE TABLE brintest (a bigint) WITH (fillfactor = 10); CREATE INDEX brintest_idx ON brintest USING brin (a); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- cleanup DROP TABLE brintest; @@ -17,12 +17,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- cleanup DROP TABLE brintest; @@ -34,12 +34,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_minmax_multi_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- cleanup DROP TABLE brintest; @@ -51,12 +51,12 @@ CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_ INSERT INTO brintest (a) SELECT x FROM generate_series(1,100000) x; -- create some empty ranges DELETE FROM brintest WHERE a > 20000 AND a < 40000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a int8_bloom_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- cleanup DROP TABLE brintest; @@ -70,12 +70,12 @@ FROM generate_series(1, 10000); -- create some empty ranges DELETE FROM brintest WHERE id > 2000 AND id < 4000; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true, '@>'); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (a box_inclusion_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true, '@>'); -- cleanup DROP TABLE brintest; @@ -86,12 +86,44 @@ CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_min INSERT INTO brintest (a) SELECT random_string((x % 100)) FROM generate_series(1,3000) x; -- create some empty ranges DELETE FROM brintest WHERE id > 1500 AND id < 2500; -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); -- rebuild index DROP INDEX brintest_idx; CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops) WITH (pages_per_range = 2); -SELECT brin_index_check('brintest_idx', true); +SELECT brin_index_check('brintest_idx', true, true); +-- cleanup +DROP TABLE brintest; + + +-- multiple attributes test with custom operators +CREATE TABLE brintest (id bigserial, a text, b box) WITH (fillfactor = 10); +CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2); +INSERT INTO brintest (a, b) SELECT + random_string((x % 100)), + box(point(random() * 1000, random() * 1000), point(random() * 1000, random() * 1000)) +FROM generate_series(1, 3000) x; +-- create some empty ranges +DELETE FROM brintest WHERE id > 1500 AND id < 2500; +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>'); + +-- rebuild index +DROP INDEX brintest_idx; +CREATE INDEX brintest_idx ON brintest USING brin (id int8_minmax_ops, a text_minmax_ops, b box_inclusion_ops) WITH (pages_per_range = 2); +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@>'); + +-- error if it's impossible to use default operator for all index attributes +SELECT brin_index_check('brintest_idx', true, true); + +-- error if number of operators in input doesn't match index attributes number +SELECT brin_index_check('brintest_idx', true, true, '='); + +-- error if operator name is NULL +SELECT brin_index_check('brintest_idx', true, true, '=', '=', NULL); + +-- error if there is no operator for attribute type +SELECT brin_index_check('brintest_idx', true, true, '=', '=', '@@'); + -- cleanup DROP TABLE brintest; diff --git a/contrib/amcheck/t/007_verify_brin.pl b/contrib/amcheck/t/007_verify_brin.pl index c4073f9bdcc..6fe0c78f12e 100644 --- a/contrib/amcheck/t/007_verify_brin.pl +++ b/contrib/amcheck/t/007_verify_brin.pl @@ -210,6 +210,55 @@ my @tests = ( return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa');); }, expected => wrap("revmap doesn't point to index tuple. Range blkno: 0, revmap item: (1,0), index tuple: (2,1)") + }, + { + # range is marked as empty_range, but heap has some data for the range + + find => pack('LCC', 0, 0x88, 0x03), + replace => pack('LCC', 0, 0xA8, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null);); + }, + expected => wrap('range is marked as empty but contains qualified live tuples. Range blkno: 0, heap tid (0,1)') + }, + { + # range hasnulls & allnulls are false, but heap contains null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x00), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range hasnulls and allnulls are false, but contains a null value. Range blkno: 0, heap tid (0,1)') + }, + { + # range allnulls is true, but heap contains non-null values for the range + + find => pack('LCC', 0, 0x88, 0x02), + replace => pack('LCC', 0, 0x88, 0x01), + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES (null), ('aaaaa');); + }, + expected => wrap('range allnulls is true, but contains nonnull value. Range blkno: 0, heap tid (0,2)') + }, + { + # consistent function return FALSE for the valid heap value + # replace "ccccc" with "bbbbb" so that min_max index was too narrow + + find => 'ccccc', + replace => 'bbbbb', + blkno => 2, # regular page + table_data => sub { + my ($test_struct) = @_; + return qq(INSERT INTO $test_struct->{table_name} (a) VALUES ('aaaaa'), ('ccccc');); + }, + expected => wrap('heap tuple inconsistent with index. Range blkno: 0, heap tid (0,2)') } ); @@ -251,7 +300,7 @@ foreach my $test_struct (@tests) { $node->start; foreach my $test_struct (@tests) { - my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true))); + my ($result, $stdout, $stderr) = $node->psql('postgres', qq(SELECT brin_index_check('$test_struct->{index_name}', true, true))); like($stderr, $test_struct->{expected}); } diff --git a/contrib/amcheck/verify_brin.c b/contrib/amcheck/verify_brin.c index 183d189685c..aae7ec3c0d7 100644 --- a/contrib/amcheck/verify_brin.c +++ b/contrib/amcheck/verify_brin.c @@ -39,6 +39,8 @@ typedef struct BrinCheckState /* Check arguments */ bool regularpagescheck; + bool heapallindexed; + ArrayType *consistent_oper_names; /* BRIN check common fields */ @@ -67,6 +69,29 @@ typedef struct BrinCheckState Page regpage; OffsetNumber regpageoffset; + /* Heap all indexed check fields */ + + BrinRevmap *revmap; + Buffer buf; + FmgrInfo *consistentFn; + /* Scan keys for regular values */ + ScanKey *nonnull_sk; + /* Scan keys for null values */ + ScanKey *isnull_sk; + double range_cnt; + /* first block of the next range */ + BlockNumber nextrangeBlk; + + /* + * checkable_range shows if current range could be checked and dtup + * contains valid index tuple for the range. It could be false if the + * current range is not summarized, or it's placeholder, or it's just a + * beginning of the check + */ + bool checkable_range; + BrinMemTuple *dtup; + MemoryContext rangeCtx; + MemoryContext heaptupleCtx; } BrinCheckState; static void brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonly); @@ -87,6 +112,23 @@ static bool revmap_points_to_index_tuple(BrinCheckState * state); static ItemId PageGetItemIdCareful(BrinCheckState * state); +static void check_heap_all_indexed(BrinCheckState * state); + +static void prepare_nonnull_scan_keys(BrinCheckState * state); + +static void brin_check_callback(Relation index, + ItemPointer tid, + Datum *values, + bool *isnull, + bool tupleIsAlive, + void *brstate); + +static void check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid); + +static ScanKey prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno, String *opname); + +static ScanKey prepare_isnull_scan_key(AttrNumber attno); + static void brin_check_ereport(BrinCheckState * state, const char *fmt); static void revmap_item_ereport(BrinCheckState * state, const char *fmt); @@ -95,6 +137,7 @@ static void index_tuple_ereport(BrinCheckState * state, const char *fmt); static void index_tuple_only_ereport(BrinCheckState * state, const char *fmt); +static void heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message); Datum brin_index_check(PG_FUNCTION_ARGS) @@ -103,6 +146,8 @@ brin_index_check(PG_FUNCTION_ARGS) BrinCheckState *state = palloc0(sizeof(BrinCheckState)); state->regularpagescheck = PG_GETARG_BOOL(1); + state->heapallindexed = PG_GETARG_BOOL(2); + state->consistent_oper_names = PG_GETARG_ARRAYTYPE_P(3); amcheck_lock_relation_and_check(indrelid, BRIN_AM_OID, @@ -127,9 +172,31 @@ brin_check(Relation idxrel, Relation heaprel, void *callback_state, bool readonl state->bdesc = brin_build_desc(idxrel); state->natts = state->bdesc->bd_tupdesc->natts; + /* Do some preparations and checks for heapallindexed */ + if (state->heapallindexed) + { + /* + * Check if we are OK with indcheckxmin, and unregister snapshot as we + * don't need it further + */ + Snapshot snapshot = RegisterSnapshot(GetTransactionSnapshot()); + + check_indcheckxmin(state->idxrel, snapshot); + UnregisterSnapshot(snapshot); + + /* + * If there are some problems with scan keys generation or operator + * name array is invalid we want to fail fast. So do it here. + */ + prepare_nonnull_scan_keys(state); + } check_brin_index_structure(state); + if (state->heapallindexed) + { + check_heap_all_indexed(state); + } brin_free_desc(state->bdesc); } @@ -797,6 +864,424 @@ PageGetItemIdCareful(BrinCheckState * state) return itemid; } +/* + * Check that every heap tuple are consistent with the index. + * + * Here we generate ScanKey for every heap tuple and test it against + * appropriate range using consistentFn (for ScanKey generation logic look 'prepare_nonnull_scan_keys') + * + * Also, we check that fields 'empty_range', 'all_nulls' and 'has_nulls' + * are not too "narrow" for each range, which means: + * 1) has_nulls = false, but we see null value (only for oi_regular_nulls is true) + * 2) all_nulls = true, but we see nonnull value. + * 3) empty_range = true, but we see tuple within the range. + * + * We use allowSync = false, because this way + * we process full ranges one by one from the first range. + * It's not necessary, but makes the code simpler and this way + * we need to fetch every index tuple only once. + */ +static void +check_heap_all_indexed(BrinCheckState * state) +{ + Relation idxrel = state->idxrel; + Relation heaprel = state->heaprel; + double reltuples; + IndexInfo *indexInfo; + + /* heap all indexed check fields initialization */ + + state->revmap = brinRevmapInitialize(idxrel, &state->pagesPerRange); + state->dtup = brin_new_memtuple(state->bdesc); + state->checkable_range = false; + state->consistentFn = palloc0_array(FmgrInfo, state->natts); + state->range_cnt = 0; + /* next range is the first range in the beginning */ + state->nextrangeBlk = 0; + state->isnull_sk = palloc0_array(ScanKey, state->natts); + state->rangeCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check range context", + ALLOCSET_DEFAULT_SIZES); + state->heaptupleCtx = AllocSetContextCreate(CurrentMemoryContext, + "brin check tuple context", + ALLOCSET_DEFAULT_SIZES); + + /* + * Prepare "is_null" scan keys and consistent fn for each attribute. + * "non-null" scan keys are already generated. + */ + for (AttrNumber attno = 1; attno <= state->natts; attno++) + { + FmgrInfo *tmp; + + tmp = index_getprocinfo(idxrel, attno, BRIN_PROCNUM_CONSISTENT); + fmgr_info_copy(&state->consistentFn[attno - 1], tmp, CurrentMemoryContext); + + state->isnull_sk[attno - 1] = prepare_isnull_scan_key(attno); + } + + indexInfo = BuildIndexInfo(idxrel); + + /* + * Use snapshot to check only those tuples that are guaranteed to be + * indexed already. Using SnapshotAny would make it more difficult to say + * if there is a corruption or checked tuple just haven't been indexed + * yet. Also, we want to support CIC indexes. + */ + indexInfo->ii_Concurrent = true; + reltuples = table_index_build_scan(heaprel, idxrel, indexInfo, false, true, + brin_check_callback, (void *) state, NULL); + + elog(DEBUG3, "ranges were checked: %f", state->range_cnt); + elog(DEBUG3, "scan total tuples: %f", reltuples); + + if (state->buf != InvalidBuffer) + ReleaseBuffer(state->buf); + + brinRevmapTerminate(state->revmap); + MemoryContextDelete(state->rangeCtx); + MemoryContextDelete(state->heaptupleCtx); +} + +/* + * Generate scan keys for every index attribute. + * + * ConsistentFn requires ScanKey, so we need to generate ScanKey for every + * attribute somehow. We want ScanKey that would result in TRUE for every heap + * tuple within the range when we use its indexed value as sk_argument. + * To generate such a ScanKey we need to define the right operand type and the strategy number. + * Right operand type is a type of data that index is built on, so it's 'opcintype'. + * There is no strategy number that we can always use, + * because every opclass defines its own set of operators it supports and strategy number + * for the same operator can differ from opclass to opclass. + * So to get strategy number we look up an operator that gives us desired behavior + * and which both operand types are 'opcintype' and then retrieve the strategy number for it. + * Most of the time we can use '='. We let user define operator name in case opclass doesn't + * support '=' operator. Also, if such operator doesn't exist, we can't proceed with the check. + * + * If operator name array is empty use "=" operator for every attribute. + */ +static void +prepare_nonnull_scan_keys(BrinCheckState * state) +{ + Oid element_type = ARR_ELEMTYPE(state->consistent_oper_names); + int16 typlen; + bool typbyval; + char typalign; + Datum *values; + bool *elem_nulls; + int num_elems; + + get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign); + deconstruct_array(state->consistent_oper_names, element_type, typlen, typbyval, typalign, + &values, &elem_nulls, &num_elems); + + + /* + * If we have some input, check that number of operators in the input is + * relevant to the index + */ + if (num_elems > 0 && num_elems != state->natts) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Number of operator names in input (%u) " + "doesn't match index attributes number (%u)", + num_elems, state->natts))); + } + + + /* Generate scan key for every index attribute */ + state->nonnull_sk = palloc0_array(ScanKey, state->natts); + + for (AttrNumber attno = 1; attno <= state->natts; attno++) + { + String *operatorName; + + if (num_elems > 0) + { + + if (elem_nulls[attno - 1]) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Operator name must not be NULL"))); + } + + operatorName = makeString(TextDatumGetCString(values[attno - 1])); + } + else + { + + /* Use '=' as default operator */ + operatorName = makeString("="); + } + + state->nonnull_sk[attno - 1] = prepare_nonnull_scan_key(state, attno, operatorName); + pfree(operatorName); + } +} + +/* + * Prepare ScanKey for index attribute. + * + * Generated once, and will be reused for all heap tuples. + * Argument field will be filled for every heap tuple before + * consistent function invocation, so leave it NULL for a while. + */ +static ScanKey +prepare_nonnull_scan_key(const BrinCheckState * state, AttrNumber attno, String *opname) +{ + ScanKey scanKey; + Oid opOid; + Oid opFamilyOid; + bool defined; + StrategyNumber strategy; + RegProcedure opRegProc; + List *operNameList; + int attindex = attno - 1; + Form_pg_attribute attr = TupleDescAttr(state->bdesc->bd_tupdesc, attindex); + Oid type = state->idxrel->rd_opcintype[attindex]; + + opFamilyOid = state->idxrel->rd_opfamily[attindex]; + operNameList = list_make1(opname); + opOid = OperatorLookup(operNameList, type, type, &defined); + + if (opOid == InvalidOid) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("There is no operator %s for type \"%s\"", + opname->sval, format_type_be(type)))); + } + + strategy = get_op_opfamily_strategy(opOid, opFamilyOid); + + if (strategy == 0) + { + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("Operator %s is not a member of operator family \"%s\"", + opname->sval, + get_opfamily_name(opFamilyOid, false)))); + } + + opRegProc = get_opcode(opOid); + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize( + scanKey, + 0, + attno, + strategy, + type, + attr->attcollation, + opRegProc, + (Datum) NULL + ); + pfree(operNameList); + + return scanKey; +} + +static ScanKey +prepare_isnull_scan_key(AttrNumber attno) +{ + ScanKey scanKey; + + scanKey = palloc0(sizeof(ScanKeyData)); + ScanKeyEntryInitialize(scanKey, + SK_ISNULL | SK_SEARCHNULL, + attno, + InvalidStrategy, + InvalidOid, + InvalidOid, + InvalidOid, + (Datum) 0); + return scanKey; +} + +/* + * We walk from the first range (blkno = 0) to the last as the scan proceed. + * For every heap tuple we check if we are done with the current range, and we need to move further + * to the current heap tuple's range. While moving to the next range we check that it's not empty (because + * we have at least one tuple for this range). + * Every heap tuple are checked to be consistent with the range it belongs to. + * In case of unsummarized ranges and placeholders we skip all checks. + * + * While moving, we may jump over some ranges, + * but it's okay because we would not be able to check them anyway. + * We also can't say whether skipped ranges should be marked as empty or not, + * since it's possible that there were some tuples before that are now deleted. + * + */ +static void +brin_check_callback(Relation index, ItemPointer tid, Datum *values, bool *isnull, bool tupleIsAlive, void *brstate) +{ + BrinCheckState *state; + BlockNumber heapblk; + + state = (BrinCheckState *) brstate; + heapblk = ItemPointerGetBlockNumber(tid); + + /* If we went beyond the current range let's fetch new range */ + if (heapblk >= state->nextrangeBlk) + { + BrinTuple *tup; + BrinTuple *tupcopy = NULL; + MemoryContext oldCtx; + OffsetNumber off; + Size size; + Size btupsz = 0; + + MemoryContextReset(state->rangeCtx); + oldCtx = MemoryContextSwitchTo(state->rangeCtx); + + state->range_cnt++; + + /* Move to the range that contains current heap tuple */ + tup = brinGetTupleForHeapBlock(state->revmap, heapblk, &state->buf, + &off, &size, BUFFER_LOCK_SHARE); + + if (tup) + { + tupcopy = brin_copy_tuple(tup, size, tupcopy, &btupsz); + LockBuffer(state->buf, BUFFER_LOCK_UNLOCK); + state->dtup = brin_deform_tuple(state->bdesc, tupcopy, state->dtup); + + /* We can't check placeholder ranges */ + state->checkable_range = !state->dtup->bt_placeholder; + } + else + { + /* We can't check unsummarized ranges. */ + state->checkable_range = false; + } + + /* + * Update nextrangeBlk so we know when we are done with the current + * range + */ + state->nextrangeBlk = (heapblk / state->pagesPerRange + 1) * state->pagesPerRange; + + MemoryContextSwitchTo(oldCtx); + + /* Range must not be empty */ + if (state->checkable_range && state->dtup->bt_empty_range) + { + heap_all_indexed_ereport(state, tid, "range is marked as empty but contains qualified live tuples"); + } + + } + + /* Check tuple is consistent with the index */ + if (state->checkable_range) + { + check_heap_tuple(state, values, isnull, tid); + } + +} + +/* + * We check hasnulls flags for null values and oi_regular_nulls = true, + * check allnulls is false for all nonnull values not matter oi_regular_nulls is set or not, + * For all other cases we call consistentFn with appropriate scanKey: + * - for oi_regular_nulls = false and null values we use 'isNull' scanKey, + * - for nonnull values we use 'nonnull' scanKey + */ +static void +check_heap_tuple(BrinCheckState * state, const Datum *values, const bool *nulls, ItemPointer tid) +{ + int attindex; + BrinMemTuple *dtup = state->dtup; + BrinDesc *bdesc = state->bdesc; + MemoryContext oldCtx; + + Assert(state->checkable_range); + + MemoryContextReset(state->heaptupleCtx); + oldCtx = MemoryContextSwitchTo(state->heaptupleCtx); + + /* check every index attribute */ + for (attindex = 0; attindex < state->natts; attindex++) + { + BrinValues *bval; + Datum consistentFnResult; + bool consistent; + ScanKey scanKey; + bool oi_regular_nulls = bdesc->bd_info[attindex]->oi_regular_nulls; + + bval = &dtup->bt_columns[attindex]; + + if (nulls[attindex]) + { + /* + * Use hasnulls flag for oi_regular_nulls is true. Otherwise, + * delegate check to consistentFn + */ + if (oi_regular_nulls) + { + /* We have null value, so hasnulls or allnulls must be true */ + if (!(bval->bv_hasnulls || bval->bv_allnulls)) + { + heap_all_indexed_ereport(state, tid, + "range hasnulls and allnulls are false, but contains a null value"); + } + continue; + } + + /* + * In case of null and oi_regular_nulls = false we use isNull + * scanKey for invocation of consistentFn + */ + scanKey = state->isnull_sk[attindex]; + } + else + { + /* We have a nonnull value, so allnulls should be false */ + if (bval->bv_allnulls) + { + heap_all_indexed_ereport(state, tid, "range allnulls is true, but contains nonnull value"); + } + + /* use nonnull scan key */ + scanKey = state->nonnull_sk[attindex]; + scanKey->sk_argument = values[attindex]; + } + + /* If oi_regular_nulls = true we should never get there with null */ + Assert(!oi_regular_nulls || !nulls[attindex]); + + if (state->consistentFn[attindex].fn_nargs >= 4) + { + consistentFnResult = FunctionCall4Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(&scanKey), + Int32GetDatum(1) + ); + } + else + { + consistentFnResult = FunctionCall3Coll(&state->consistentFn[attindex], + state->idxrel->rd_indcollation[attindex], + PointerGetDatum(state->bdesc), + PointerGetDatum(bval), + PointerGetDatum(scanKey) + ); + } + + consistent = DatumGetBool(consistentFnResult); + + if (!consistent) + { + heap_all_indexed_ereport(state, tid, "heap tuple inconsistent with index"); + } + + } + + MemoryContextSwitchTo(oldCtx); +} /* Report without any additional info */ static void @@ -862,3 +1347,19 @@ revmap_item_ereport(BrinCheckState * state, const char *fmt) state->revmapBlk, state->revmapidx))); } + +/* Report with range blkno, heap tuple info */ +static void +heap_all_indexed_ereport(const BrinCheckState * state, const ItemPointerData *tid, const char *message) +{ + Assert(state->rangeBlkno != InvalidBlockNumber); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("Index %s is not consistent with the heap - %s. Range blkno: %u, heap tid (%u,%u)", + RelationGetRelationName(state->idxrel), + message, + state->dtup->bt_blkno, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)))); +} diff --git a/doc/src/sgml/amcheck.sgml b/doc/src/sgml/amcheck.sgml index 2f76af907ec..8ed5fe6ebfd 100644 --- a/doc/src/sgml/amcheck.sgml +++ b/doc/src/sgml/amcheck.sgml @@ -233,7 +233,7 @@ SET client_min_messages = DEBUG1; - brin_index_check(index regclass, regularpagescheck boolean) returns void + brin_index_check(index regclass, regularpagescheck boolean, heapallindexed boolean, variadic text[]) returns void brin_index_check @@ -261,6 +261,43 @@ SET client_min_messages = DEBUG1; + + heapallindexed + + + If true, the check verifies that every heap tuple is consistent with the + index. This check phase needs an operator for which an expression + LHS OPERATOR RHS evaluates to true + when we use the same value of the indexed type for both LHS and RHS. + For example, if the indexed column's type is bigint, + equality operator can be used because expression + x = x result in true + for every value of bigint + (e.g. 1 = 1 is true, 2 = 2 is true, and so on). + Operator also should be part of the operator family of the indexed column. + Most of the time, the equality operator can be used. + If all indexed column operator classes support equality operator, + the function call looks like this: + + SELECT brin_index_check('index_name', true, true); + + If any indexed column operator class doesn't support equality operator then + a suitable operator for every such column should be found and + operators for all indexed columns should be listed in the function call. + For instance, we have two indexed columns + (a int8_minmax_ops, b box_inclusion_ops). + box_inclusion_ops operator class does not support equality operator. + The appropriate operator would be @>. + Then the function call looks like this: + + SELECT brin_index_check('index_name', true, true, '=', '@>'); + + + + Defaults to false. + + + -- 2.43.0