From 112202840d68f41902581f1f28a1635b189ad403 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 19 Mar 2020 13:20:20 +1300 Subject: [PATCH v25 1/5] Add sequential scan capability to dshash. Allow all entries in a hash table to be visited, for use by later commits. Author: Kyotaro Horiguchi Reviewed-by: Andres Freund Discussion: https://postgre.es/m/20180629.173418.190173462.horiguchi.kyotaro%40lab.ntt.co.jp --- src/backend/lib/dshash.c | 185 +++++++++++++++++++++++++++++++++++++-- src/include/lib/dshash.h | 22 ++++- 2 files changed, 201 insertions(+), 6 deletions(-) diff --git a/src/backend/lib/dshash.c b/src/backend/lib/dshash.c index 78ccf03217..afaccb123b 100644 --- a/src/backend/lib/dshash.c +++ b/src/backend/lib/dshash.c @@ -112,6 +112,7 @@ struct dshash_table size_t size_log2; /* log2(number of buckets) */ bool find_locked; /* Is any partition lock held by 'find'? */ bool find_exclusively_locked; /* ... exclusively? */ + bool seqscan_running;/* now under sequential scan */ }; /* Given a pointer to an item, find the entry (user data) it holds. */ @@ -127,6 +128,10 @@ struct dshash_table #define NUM_SPLITS(size_log2) \ (size_log2 - DSHASH_NUM_PARTITIONS_LOG2) +/* How many buckets are there in a given size? */ +#define NUM_BUCKETS(size_log2) \ + (((size_t) 1) << (size_log2)) + /* How many buckets are there in each partition at a given size? */ #define BUCKETS_PER_PARTITION(size_log2) \ (((size_t) 1) << NUM_SPLITS(size_log2)) @@ -153,6 +158,10 @@ struct dshash_table #define BUCKET_INDEX_FOR_PARTITION(partition, size_log2) \ ((partition) << NUM_SPLITS(size_log2)) +/* Choose partition based on bucket index. */ +#define PARTITION_FOR_BUCKET_INDEX(bucket_idx, size_log2) \ + ((bucket_idx) >> NUM_SPLITS(size_log2)) + /* The head of the active bucket for a given hash value (lvalue). */ #define BUCKET_FOR_HASH(hash_table, hash) \ (hash_table->buckets[ \ @@ -228,6 +237,7 @@ dshash_create(dsa_area *area, const dshash_parameters *params, void *arg) hash_table->find_locked = false; hash_table->find_exclusively_locked = false; + hash_table->seqscan_running = false; /* * Set up the initial array of buckets. Our initial size is the same as @@ -279,6 +289,7 @@ dshash_attach(dsa_area *area, const dshash_parameters *params, hash_table->control = dsa_get_address(area, control); hash_table->find_locked = false; hash_table->find_exclusively_locked = false; + hash_table->seqscan_running = false; Assert(hash_table->control->magic == DSHASH_MAGIC); /* @@ -324,7 +335,7 @@ dshash_destroy(dshash_table *hash_table) ensure_valid_bucket_pointers(hash_table); /* Free all the entries. */ - size = ((size_t) 1) << hash_table->size_log2; + size = NUM_BUCKETS(hash_table->size_log2); for (i = 0; i < size; ++i) { dsa_pointer item_pointer = hash_table->buckets[i]; @@ -549,9 +560,14 @@ dshash_delete_entry(dshash_table *hash_table, void *entry) LW_EXCLUSIVE)); delete_item(hash_table, item); - hash_table->find_locked = false; - hash_table->find_exclusively_locked = false; - LWLockRelease(PARTITION_LOCK(hash_table, partition)); + + /* We need to keep partition lock while sequential scan */ + if (!hash_table->seqscan_running) + { + hash_table->find_locked = false; + hash_table->find_exclusively_locked = false; + LWLockRelease(PARTITION_LOCK(hash_table, partition)); + } } /* @@ -568,6 +584,8 @@ dshash_release_lock(dshash_table *hash_table, void *entry) Assert(LWLockHeldByMeInMode(PARTITION_LOCK(hash_table, partition_index), hash_table->find_exclusively_locked ? LW_EXCLUSIVE : LW_SHARED)); + /* lock is under control of sequential scan */ + Assert(!hash_table->seqscan_running); hash_table->find_locked = false; hash_table->find_exclusively_locked = false; @@ -592,6 +610,164 @@ dshash_memhash(const void *v, size_t size, void *arg) return tag_hash(v, size); } +/* + * dshash_seq_init/_next/_term + * Sequentially scan trhough dshash table and return all the + * elements one by one, return NULL when no more. + * + * dshash_seq_term should be called for incomplete scans and otherwise + * shoudln't. Finished scans are cleaned up automatically. + * + * Returned elements are locked as is the case with dshash_find. However, the + * caller must not release the lock. + * + * Same as dynanash, the caller may delete returned elements midst of a scan. + * + * If consistent is set for dshash_seq_init, the all hash table partitions are + * locked in the requested mode (as determined by the exclusive flag) during + * the scan. Otherwise partitions are locked in one-at-a-time way during the + * scan. + */ +void +dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, + bool consistent, bool exclusive) +{ + /* allowed at most one scan at once */ + Assert(!hash_table->seqscan_running); + + status->hash_table = hash_table; + status->curbucket = 0; + status->nbuckets = 0; + status->curitem = NULL; + status->pnextitem = InvalidDsaPointer; + status->curpartition = -1; + status->consistent = consistent; + status->exclusive = exclusive; + hash_table->seqscan_running = true; + + /* + * Protect all partitions from modification if the caller wants a + * consistent result. + */ + if (consistent) + { + int i; + + for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i) + { + Assert(!LWLockHeldByMe(PARTITION_LOCK(hash_table, i))); + + LWLockAcquire(PARTITION_LOCK(hash_table, i), + exclusive ? LW_EXCLUSIVE : LW_SHARED); + } + ensure_valid_bucket_pointers(hash_table); + } +} + +void * +dshash_seq_next(dshash_seq_status *status) +{ + dsa_pointer next_item_pointer; + + Assert(status->hash_table->seqscan_running); + if (status->curitem == NULL) + { + int partition; + + Assert (status->curbucket == 0); + Assert(!status->hash_table->find_locked); + + /* first shot. grab the first item. */ + if (!status->consistent) + { + partition = + PARTITION_FOR_BUCKET_INDEX(status->curbucket, + status->hash_table->size_log2); + LWLockAcquire(PARTITION_LOCK(status->hash_table, partition), + status->exclusive ? LW_EXCLUSIVE : LW_SHARED); + status->curpartition = partition; + + /* resize doesn't happen from now until seq scan ends */ + status->nbuckets = + NUM_BUCKETS(status->hash_table->control->size_log2); + ensure_valid_bucket_pointers(status->hash_table); + } + + next_item_pointer = status->hash_table->buckets[status->curbucket]; + } + else + next_item_pointer = status->pnextitem; + + /* Move to the next bucket if we finished the current bucket */ + while (!DsaPointerIsValid(next_item_pointer)) + { + if (++status->curbucket >= status->nbuckets) + { + /* all buckets have been scanned. finsih. */ + dshash_seq_term(status); + return NULL; + } + + /* Also move parititon lock if needed */ + if (!status->consistent) + { + int next_partition = + PARTITION_FOR_BUCKET_INDEX(status->curbucket, + status->hash_table->size_log2); + + /* Move lock along with partition for the bucket */ + if (status->curpartition != next_partition) + { + /* + * Lock the next partition then release the current, not in the + * reverse order to to avoid concurrent resizing. Partitions + * are locked in the same order with resize() so dead locks + * won't happen. + */ + LWLockAcquire(PARTITION_LOCK(status->hash_table, + next_partition), + status->exclusive ? LW_EXCLUSIVE : LW_SHARED); + LWLockRelease(PARTITION_LOCK(status->hash_table, + status->curpartition)); + status->curpartition = next_partition; + } + } + + next_item_pointer = status->hash_table->buckets[status->curbucket]; + } + + status->curitem = + dsa_get_address(status->hash_table->area, next_item_pointer); + status->hash_table->find_locked = true; + status->hash_table->find_exclusively_locked = status->exclusive; + + /* + * The caller may delete the item. Store the next item in case of deletion. + */ + status->pnextitem = status->curitem->next; + + return ENTRY_FROM_ITEM(status->curitem); +} + +void +dshash_seq_term(dshash_seq_status *status) +{ + Assert(status->hash_table->seqscan_running); + status->hash_table->find_locked = false; + status->hash_table->find_exclusively_locked = false; + status->hash_table->seqscan_running = false; + + if (status->consistent) + { + int i; + + for (i = 0; i < DSHASH_NUM_PARTITIONS; ++i) + LWLockRelease(PARTITION_LOCK(status->hash_table, i)); + } + else if (status->curpartition >= 0) + LWLockRelease(PARTITION_LOCK(status->hash_table, status->curpartition)); +} + /* * Print debugging information about the internal state of the hash table to * stderr. The caller must hold no partition locks. @@ -673,7 +849,6 @@ delete_item(dshash_table *hash_table, dshash_table_item *item) /* * Grow the hash table if necessary to the requested number of buckets. The * requested size must be double some previously observed size. - * * Must be called without any partition lock held. */ static void diff --git a/src/include/lib/dshash.h b/src/include/lib/dshash.h index b86df68e77..ad88f32cdd 100644 --- a/src/include/lib/dshash.h +++ b/src/include/lib/dshash.h @@ -59,6 +59,22 @@ typedef struct dshash_parameters struct dshash_table_item; typedef struct dshash_table_item dshash_table_item; +/* + * Sequential scan state. The detail is exposed to let users know the storage + * size but it should be considered as an opaque type by callers. + */ +typedef struct dshash_seq_status +{ + dshash_table *hash_table; + int curbucket; + int nbuckets; + dshash_table_item *curitem; + dsa_pointer pnextitem; + int curpartition; + bool consistent; + bool exclusive; +} dshash_seq_status; + /* Creating, sharing and destroying from hash tables. */ extern dshash_table *dshash_create(dsa_area *area, const dshash_parameters *params, @@ -70,7 +86,6 @@ extern dshash_table *dshash_attach(dsa_area *area, extern void dshash_detach(dshash_table *hash_table); extern dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table); extern void dshash_destroy(dshash_table *hash_table); - /* Finding, creating, deleting entries. */ extern void *dshash_find(dshash_table *hash_table, const void *key, bool exclusive); @@ -80,6 +95,11 @@ extern bool dshash_delete_key(dshash_table *hash_table, const void *key); extern void dshash_delete_entry(dshash_table *hash_table, void *entry); extern void dshash_release_lock(dshash_table *hash_table, void *entry); +/* seq scan support */ +extern void dshash_seq_init(dshash_seq_status *status, dshash_table *hash_table, + bool consistent, bool exclusive); +extern void *dshash_seq_next(dshash_seq_status *status); +extern void dshash_seq_term(dshash_seq_status *status); /* Convenience hash and compare functions wrapping memcmp and tag_hash. */ extern int dshash_memcmp(const void *a, const void *b, size_t size, void *arg); extern dshash_hash dshash_memhash(const void *v, size_t size, void *arg); -- 2.20.1