Re: [HACKERS] Parallel Hash take II - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: [HACKERS] Parallel Hash take II |
Date | |
Msg-id | 20171107210155.kuksdd324kgz5oev@alap3.anarazel.de Whole thread Raw |
In response to | Re: [HACKERS] Parallel Hash take II (Thomas Munro <thomas.munro@enterprisedb.com>) |
Responses |
Re: [HACKERS] Parallel Hash take II
Re: [HACKERS] Parallel Hash take II |
List | pgsql-hackers |
Hi, Here's a review of v24 +set min_parallel_table_scan_size = 0; +set parallel_setup_cost = 0; +-- Make a simple relation with well distributed keys and correctly +-- estimated size. +create table simple as + select generate_series(1, 20000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +alter table simple set (parallel_workers = 2); +analyze simple; +-- Make a relation whose size we will under-estimate. We want stats +-- to say 1000 rows, but actually there are 20,000 rows. +create table bigger_than_it_looks as + select generate_series(1, 20000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +alter table bigger_than_it_looks set (autovacuum_enabled = 'false'); +alter table bigger_than_it_looks set (parallel_workers = 2); +delete from bigger_than_it_looks where id <= 19000; +vacuum bigger_than_it_looks; +analyze bigger_than_it_looks; +insert into bigger_than_it_looks + select generate_series(1, 19000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; It seems kinda easier to just manipulate ndistinct and reltuples... +set max_parallel_workers_per_gather = 0; +set work_mem = '4MB'; I hope there's a fair amount of slop here - with different archs you're going to see quite some size differences. +-- The "good" case: batches required, but we plan the right number; we +-- plan for 16 batches, and we stick to that number, and peak memory +-- usage says within our work_mem budget +-- non-parallel +set max_parallel_workers_per_gather = 0; +set work_mem = '128kB'; So how do we know that's actually the case we're testing rather than something arbitrarily different? There's IIRC tests somewhere that just filter the json explain output to the right parts... +/* + * Build the name for a given segment of a given BufFile. + */ +static void +MakeSharedSegmentName(char *name, const char *buffile_name, int segment) +{ + snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment); +} Not a fan of this name - you're not "making" a filename here (as in allocating or such). I think I'd just remove the Make prefix. +/* + * Open a file that was previously created in another backend with + * BufFileCreateShared in the same SharedFileSet using the same name. The + * backend that created the file must have called BufFileClose() or + * BufFileExport() to make sure that it is ready to be opened by other + * backends and render it read-only. + */ Is it actually guaranteed that it's another backend / do we rely on that? +BufFile * +BufFileOpenShared(SharedFileSet *fileset, const char *name) +{ + /* + * If we didn't find any files at all, then no BufFile exists with this + * tag. + */ + if (nfiles == 0) + return NULL; s/taag/name/? +/* + * Delete a BufFile that was created by BufFileCreateShared in the given + * SharedFileSet using the given name. + * + * It is not necessary to delete files explicitly with this function. It is + * provided only as a way to delete files proactively, rather than waiting for + * the SharedFileSet to be cleaned up. + * + * Only one backend should attempt to delete a given name, and should know + * that it exists and has been exported or closed. + */ +void +BufFileDeleteShared(SharedFileSet *fileset, const char *name) +{ + char segment_name[MAXPGPATH]; + int segment = 0; + bool found = false; + + /* + * We don't know how many segments the file has. We'll keep deleting + * until we run out. If we don't manage to find even an initial segment, + * raise an error. + */ + for (;;) + { + MakeSharedSegmentName(segment_name, name, segment); + if (!SharedFileSetDelete(fileset, segment_name, true)) + break; + found = true; + ++segment; + } Hm. Do we properly delete all the files via the resowner mechanism if this fails midway? I.e. if there are no leading segments? Also wonder if this doesn't need a CFI check. +void +PathNameCreateTemporaryDir(const char *basedir, const char *directory) +{ + if (mkdir(directory, S_IRWXU) < 0) + { + if (errno == EEXIST) + return; + + /* + * Failed. Try to create basedir first in case it's missing. Tolerate + * ENOENT to close a race against another process following the same + * algorithm. + */ + if (mkdir(basedir, S_IRWXU) < 0 && errno != ENOENT) + elog(ERROR, "cannot create temporary directory \"%s\": %m", + basedir); ENOENT or EEXIST? +File +PathNameCreateTemporaryFile(const char *path, bool error_on_failure) +{ + File file; + + /* + * Open the file. Note: we don't use O_EXCL, in case there is an orphaned + * temp file that can be reused. + */ + file = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY); + if (file <= 0) + { + if (error_on_failure) + elog(ERROR, "could not create temporary file \"%s\": %m", path); + else + return file; + } + + /* Mark it for temp_file_limit accounting. */ + VfdCache[file].fdstate |= FD_TEMP_FILE_LIMIT; + + /* + * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we still + * want to make sure they get closed at end of xact. + */ + ResourceOwnerEnlargeFiles(CurrentResourceOwner); + ResourceOwnerRememberFile(CurrentResourceOwner, file); + VfdCache[file].resowner = CurrentResourceOwner; So maybe I'm being pedantic here, but wouldn't the right order be to do ResourceOwnerEnlargeFiles() *before* creating the file? It's a memory allocating operation, so it can fail, which'd leak the file. +/* + * Open a file that was created with PathNameCreateTemporaryFile, possibly in + * another backend. Files opened this way don't count agains the s/agains/against/ + * temp_file_limit of the caller, are read-only and are automatically closed + * at the end of the transaction but are not deleted on close. + */ +File +PathNameOpenTemporaryFile(const char *path) +{ + File file; + + /* We open the file read-only. */ + file = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + + /* If no such file, then we don't raise an error. */ + if (file <= 0 && errno != ENOENT) + elog(ERROR, "could not open temporary file \"%s\": %m", path); + + if (file > 0) + { + /* + * We don't set FD_DELETE_AT_CLOSE for files opened this way, but we + * still want to make sure they get closed at end of xact. + */ + ResourceOwnerEnlargeFiles(CurrentResourceOwner); + ResourceOwnerRememberFile(CurrentResourceOwner, file); + VfdCache[file].resowner = CurrentResourceOwner; Same complaint as above, ResourceOwnerEnlargeFiles() should be done earlier. +/* + * Delete a file by pathname. Return true if the file existed, false if + * didn't. + */ +bool +PathNameDeleteTemporaryFile(const char *path, bool error_on_failure) +{ + struct stat filestats; + int stat_errno; + + /* Get the final size for pgstat reporting. */ + if (stat(path, &filestats) != 0) + stat_errno = errno; + else + stat_errno = 0; + + /* + * Unlike FileClose's automatic file deletion code, we tolerate + * non-existence to support BufFileDeleteShared which doesn't know how + * many segments it has to delete until it runs out. + */ + if (stat_errno == ENOENT) + return false; + + if (unlink(path) < 0) + { + if (errno != ENOENT) + elog(error_on_failure ? ERROR : LOG, + "cannot unlink temporary file \"%s\": %m", path); + return false; + } + + if (stat_errno == 0) + ReportTemporaryFileUsage(path, filestats.st_size); + else + { + errno = stat_errno; + elog(LOG, "could not stat file \"%s\": %m", path); + } All these messages are "not expected to ever happen" ones, right? + return true; +} +/* * close a file when done with it */ @@ -1537,10 +1747,17 @@ FileClose(File file) Delete(file); } + if (vfdP->fdstate & FD_TEMP_FILE_LIMIT) + { + /* Subtract its size from current usage (do first in case of error) */ + temporary_files_size -= vfdP->fileSize; + vfdP->fileSize = 0; + } So, is it right to do so unconditionally and without regard for errors? If the file isn't deleted, it shouldn't be subtracted from fileSize. I guess you're managing that through the flag, but that's not entirely obvious. diff --git a/src/backend/storage/file/sharedfileset.c b/src/backend/storage/file/sharedfileset.c new file mode 100644 index 00000000000..6da80838b37 --- /dev/null +++ b/src/backend/storage/file/sharedfileset.c @@ -0,0 +1,240 @@ +/*------------------------------------------------------------------------- + * + * sharedfileset.c + * Shared temporary file management. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/file/sharedfileset.c + * + *------------------------------------------------------------------------- + */ A slightly bigger comment wouldn't hurt. +/* + * Attach to a set of directories that was created with SharedFileSetInit. + */ +void +SharedFileSetAttach(SharedFileSet *fileset, dsm_segment *seg) +{ + bool success; + + SpinLockAcquire(&fileset->mutex); + if (fileset->refcnt == 0) + success = false; I've not read finished reading through this, but is this safe? If the segment's gone, is the spinlock guaranteed to still be a spinlock? I suspect this isn't a problem because just the underlying data is removed, but the SharedFileSet stays alive? +static void +GetSharedFileSetPath(char *path, SharedFileSet *fileset, Oid tablespace) +{ + char tempdirpath[MAXPGPATH]; + + GetTempTablespacePath(tempdirpath, tablespace); + snprintf(path, MAXPGPATH, "%s/%s%d.%d.sharedfileset" PG_TEMP_SUBDIR_SUFFIX, + tempdirpath, PG_TEMP_FILE_PREFIX, + fileset->creator_pid, fileset->number); +} +/* + * Sorting hat to determine which tablespace a given shared temporary file + * belongs in. + */ +static Oid +ChooseTablespace(const SharedFileSet *fileset, const char *name) +{ + uint32 hash = hash_any((const unsigned char *) name, strlen(name)); + + return fileset->tablespaces[hash % fileset->ntablespaces]; +} Hm. I wonder if just round-robin through these isn't a better approach. +/* + * Compute the full path of a file in a SharedFileSet. + */ +static void +GetSharedFilePath(char *path, SharedFileSet *fileset, const char *name) +{ + char dirpath[MAXPGPATH]; + + GetSharedFileSetPath(dirpath, fileset, ChooseTablespace(fileset, name)); + snprintf(path, MAXPGPATH, "%s/" PG_TEMP_FILE_PREFIX ".%s", dirpath, name); +} diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c index 4c35ccf65eb..8b91d5a6ebe 100644 --- a/src/backend/utils/resowner/resowner.c +++ b/src/backend/utils/resowner/resowner.c @@ -528,16 +528,6 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, PrintRelCacheLeakWarning(res); RelationClose(res); } - - /* Ditto for dynamic shared memory segments */ - while (ResourceArrayGetAny(&(owner->dsmarr), &foundres)) - { - dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres); - - if (isCommit) - PrintDSMLeakWarning(res); - dsm_detach(res); - } } else if (phase == RESOURCE_RELEASE_LOCKS) { @@ -654,6 +644,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner, PrintFileLeakWarning(res); FileClose(res); } + + /* Ditto for dynamic shared memory segments */ + while (ResourceArrayGetAny(&(owner->dsmarr), &foundres)) + { + dsm_segment *res = (dsm_segment *) DatumGetPointer(foundres); + + if (isCommit) + PrintDSMLeakWarning(res); + dsm_detach(res); + } } Is that entirely unproblematic? Are there any DSM callbacks that rely on locks still being held? Please split this part into a separate commit with such analysis. +/* The initial size of chunks in pages. */ +#define STS_MIN_CHUNK_PAGES 4 Could use quick description at how you've arrived at that specific value. +/* Chunk written to disk. */ +typedef struct SharedTuplestoreChunk +{ + int npages; /* Size of this chunk in BLCKSZ pages. */ + int ntuples; /* Number of tuples in this chunk. */ + char data[FLEXIBLE_ARRAY_MEMBER]; +} SharedTuplestoreChunk; + +/* Per-participant shared state. */ +typedef struct SharedTuplestoreParticipant +{ + slock_t mutex; + BlockNumber read_page; /* Page number for next read. */ + BlockNumber npages; /* Number of pages written. */ + bool writing; /* Used only for assertions. */ + + /* + * We need variable sized chunks, because we might be asked to store + * gigantic tuples. To avoid the locking contention that would come from + * reading chunk sizes from disk, we store the chunk size for ranges of + * the file in a compact format in memory. chunk_pages starts out at + * STS_MIN_CHUNK_PAGES and then doubles each time we reach a page listed + * in chunk_expansion_log. + */ + BlockNumber chunk_expansion_log[sizeof(BlockNumber) * CHAR_BIT]; + int chunk_expansions; + int chunk_expansion; + int chunk_pages; This needs more explanation. +/* + * Initialize a SharedTuplestore in existing shared memory. There must be + * space for sts_estimate(participants) bytes. If flags is set to the value + * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more + * eagerly (but this isn't yet implemented). s/iset set to the value/includes the value/ - otherwise it's not really a flags argument. + * Tuples that are stored may optionally carry a piece of fixed sized + * meta-data which will be retrieved along with the tuple. This is useful for + * the hash codes used for multi-batch hash joins, but could have other + * applications. "hash codes"? +/* + * Prepare to rescan. Only participant should call this. After it returns, + * all participants should call sts_begin_parallel_scan() and then loop over + * sts_parallel_scan_next(). + */ s/should/may/? Also maybe document what happens with in-progress reads (or rather them not being allowed to exist)? +/* + * Write a tuple. If a meta-data size was provided to sts_initialize, then a + * pointer to meta data of that size must be provided. + */ +void +sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, + MinimalTuple tuple) +{ + /* Do we have space? */ + size = accessor->sts->meta_data_size + tuple->t_len; + if (accessor->write_pointer + size >= accessor->write_end) + { + /* Try flushing to see if that creates enough space. */ + if (accessor->write_chunk != NULL) + sts_flush_chunk(accessor); + + /* + * It may still not be enough in the case of a gigantic tuple, or if + * we haven't created a chunk buffer at all yet. + */ + if (accessor->write_pointer + size >= accessor->write_end) + { + SharedTuplestoreParticipant *participant; + size_t space_needed; + int pages_needed; + + /* How many pages to hold this data and the chunk header? */ + space_needed = offsetof(SharedTuplestoreChunk, data) + size; + pages_needed = (space_needed + (BLCKSZ - 1)) / BLCKSZ; + pages_needed = Max(pages_needed, STS_MIN_CHUNK_PAGES); + + /* + * Double the chunk size until it's big enough, and record that + * fact in the shared expansion log so that readers know about it. + */ + participant = &accessor->sts->participants[accessor->participant]; + while (accessor->write_pages < pages_needed) + { + accessor->write_pages *= 2; + participant->chunk_expansion_log[participant->chunk_expansions++] = + accessor->write_page; + } Hm. Isn't that going to be pretty unfunny if you have one large and a lot of small tuples? + /* Create the output buffer. */ + if (accessor->write_chunk != NULL) + pfree(accessor->write_chunk); + accessor->write_chunk = (SharedTuplestoreChunk *) + palloc0(accessor->write_pages * BLCKSZ); Are we guaranteed to be in a long-lived memory context here? +/* + * Get the next tuple in the current parallel scan. + */ +MinimalTuple +sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) +{ + SharedTuplestoreParticipant *p; + BlockNumber read_page; + int chunk_pages; + bool eof; + + for (;;) + { + /* Can we read more tuples from the current chunk? */ + if (likely(accessor->read_ntuples < accessor->read_ntuples_available)) + return sts_read_tuple(accessor, meta_data); I'm not convinced this is a good use of likely/unlikely (not biased and not performance critical enough). + /* Find the location of a new chunk to read. */ + p = &accessor->sts->participants[accessor->read_participant]; + + SpinLockAcquire(&p->mutex); + eof = p->read_page >= p->npages; + if (!eof) + { + /* + * Figure out how big this chunk is. It will almost always be the + * same as the last chunk loaded, but if there is one or more + * entry in the chunk expansion log for this page then we know + * that it doubled that number of times. This avoids the need to + * do IO to adjust the read head, so we don't need to hold up + * concurrent readers. (An alternative to this extremely rarely + * run loop would be to use more space storing the new size in the + * log so we'd have 'if' instead of 'while'.) + */ + read_page = p->read_page; + while (p->chunk_expansion < p->chunk_expansions && + p->chunk_expansion_log[p->chunk_expansion] == p->read_page) + { + p->chunk_pages *= 2; + p->chunk_expansion++; + } + chunk_pages = p->chunk_pages; + + /* The next reader will start after this chunk. */ + p->read_page += chunk_pages; + } + SpinLockRelease(&p->mutex); This looks more like the job of an lwlock rather than a spinlock. +/* + * Create the name used for our shared BufFiles. + */ +static void +make_name(char *name, SharedTuplestoreAccessor *accessor, int participant) +{ + snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant); +} Name's a bit generic. And it's still not really making ;) Going to buy some groceries and then look at the next patches. - Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
pgsql-hackers by date: