Thread: How to estimate the shared memory size required for parallel scan?
Hi, I am trying to change cstore_fdw to scan in parallel. FDW interface provide 'EstimateDSMForeignScan' for required shared memory size to scan in parallel. How to estimate the shared memory size required for parallel scan? thanks. -- Masayuki Takahashi
On Thu, Aug 16, 2018 at 11:36 PM, Masayuki Takahashi <masayuki038@gmail.com> wrote: > I am trying to change cstore_fdw to scan in parallel. FDW interface provide > 'EstimateDSMForeignScan' for required shared memory size to scan in parallel. > > How to estimate the shared memory size required for parallel scan? It's a slightly strange use of the word "estimate". It means "tell me how much shared memory you need". Later, your InitializeDSMForeignScan() callback will receive a pointer to exactly that much shared memory to initialise. Then InitializeWorkerForeignScan will also receive a pointer to that memory, inside every worker process. Note that it may be mapped at a different address in each process, so be careful not to use raw pointers. It's up to you to design a struct to hold whatever data, spinlocks, LWLocks, atomics etc you might need to orchestrate your parallel scan. It works much the same way for built-in executor nodes that are parallel-aware by the way. For example, ExecHashJoinEstimate() reserves sizeof(ParallelHashJoinState), and then in ExecHashJoinInitializeDSM() it allocates and initialises it, and ExecHashJoinIntializeWorker() tells the workers about it. The built-in executor nodes have to do a little bit more work than FDWs, using the plan node ID to allocate and look things up in a "TOC" (table of contents), but nodeForeignScan.c does that work for you in your case: it just asks you how much you want, and then gives it to you. -- Thomas Munro http://www.enterprisedb.com
Re: How to estimate the shared memory size required for parallel scan?
From
Masayuki Takahashi
Date:
Hi Thomas, Thank you for explaining DSM and ToC. > It's up to you to design a struct to hold whatever data, spinlocks, LWLocks, atomics etc you might need to orchestrate your parallel scan. If FDW(ex. cstore_fdw) does not need to share some information among workers more than PostgreSQL core in parallel scan, does it not need to allocate DSM? thanks. 2018年8月17日(金) 9:28 Thomas Munro <thomas.munro@enterprisedb.com>: > > On Thu, Aug 16, 2018 at 11:36 PM, Masayuki Takahashi > <masayuki038@gmail.com> wrote: > > I am trying to change cstore_fdw to scan in parallel. FDW interface provide > > 'EstimateDSMForeignScan' for required shared memory size to scan in parallel. > > > > How to estimate the shared memory size required for parallel scan? > > It's a slightly strange use of the word "estimate". It means "tell me > how much shared memory you need". Later, your > InitializeDSMForeignScan() callback will receive a pointer to exactly > that much shared memory to initialise. Then > InitializeWorkerForeignScan will also receive a pointer to that > memory, inside every worker process. Note that it may be mapped at a > different address in each process, so be careful not to use raw > pointers. It's up to you to design a struct to hold whatever data, > spinlocks, LWLocks, atomics etc you might need to orchestrate your > parallel scan. > > It works much the same way for built-in executor nodes that are > parallel-aware by the way. For example, ExecHashJoinEstimate() > reserves sizeof(ParallelHashJoinState), and then in > ExecHashJoinInitializeDSM() it allocates and initialises it, and > ExecHashJoinIntializeWorker() tells the workers about it. The > built-in executor nodes have to do a little bit more work than FDWs, > using the plan node ID to allocate and look things up in a "TOC" > (table of contents), but nodeForeignScan.c does that work for you in > your case: it just asks you how much you want, and then gives it to > you. > > -- > Thomas Munro > http://www.enterprisedb.com -- 高橋 真之
On Sun, Aug 19, 2018 at 12:01 AM, Masayuki Takahashi <masayuki038@gmail.com> wrote: >> It's up to you to design a struct to hold whatever data, > spinlocks, LWLocks, atomics etc you might need to orchestrate your > parallel scan. > > If FDW(ex. cstore_fdw) does not need to share some information among > workers more than PostgreSQL core in parallel scan, does it not need > to allocate DSM? Right. You don't have to supply InitializeDSMForeignScan, ReInitializeWorkerForeignScan, InitializeWorkerForeignScan functions. If you just supply an IsForeignScanParallelSafe function that returns true, that would allow your FDW to be used inside parallel workers and wouldn't need any extra shared memory, but it wouldn't be a "parallel scan". It would just be "parallel safe". Each process that does a scan of your FDW would expect a full normal scan (presumably returning the same tuples in each process). That means it can be used, for example, on the inner side of a join, where the outer side comes from a parallel scan. Like file_fdw can. A true parallel scan of an FDW would be one where each process emits an arbitrary fraction of the tuples, but together they emit all of the tuples. You'd almost certainly need to use some shared memory to coordinate that. To say that you support that, I think your GetForeignPaths() function would need to call add_partial_path(). And unless I'm mistaken, whether or not InitializeDSMForeignScan etc are called might be the only indication you get of whether you need to run in parallel-aware mode. I haven't personally heard of any FDWs that can do this yet, but I just tried hacking file_fdw to register a partial path and it seems to work (though of course the results are duplicated because the emitted tuples are not actually partial). -- Thomas Munro http://www.enterprisedb.com
On Sun, Aug 19, 2018 at 1:40 AM, Thomas Munro <thomas.munro@enterprisedb.com> wrote: > A true parallel scan of an FDW would be one where each process emits > an arbitrary fraction of the tuples, but together they emit all of the > tuples. You'd almost certainly need to use some shared memory to > coordinate that. To say that you support that, I think your > GetForeignPaths() function would need to call add_partial_path(). And > unless I'm mistaken, whether or not InitializeDSMForeignScan etc are > called might be the only indication you get of whether you need to run > in parallel-aware mode. I haven't personally heard of any FDWs that > can do this yet, but I just tried hacking file_fdw to register a > partial path and it seems to work (though of course the results are > duplicated because the emitted tuples are not actually partial). ... though I just noticed that my quick test used "Single Copy" mode. I think I see why: it looks like core's create_foreignscan_path() function might need to take num_workers and set parallel_aware if > 0. So I guess this hasn't been done before and would require some more research. -- Thomas Munro http://www.enterprisedb.com
Re: How to estimate the shared memory size required for parallel scan?
From
Masayuki Takahashi
Date:
(Sorry, once I sent to Thomas only. This is re-post.) Hi Thomas, Thanks you for excellent explaining about shared memory in parallel scan and 'foreign path'. Those are points that I want to know. thanks. > If you just supply an IsForeignScanParallelSafe function that returns > true, that would allow your FDW to be used inside parallel workers and > wouldn't need any extra shared memory, but it wouldn't be a "parallel > scan". It would just be "parallel safe". Each process that does a > scan of your FDW would expect a full normal scan (presumably returning > the same tuples in each process). I think that parallel scan mechanism uses this each worker's full normal scan to partitioned records, right? For example, I turned IsForeignScanParallelSafe to true in cstore_fdw and compared partitioned/non-partitioned scan. https://gist.github.com/masayuki038/daa63a21f8c16ffa8138b50db9129ced This shows that counted by each partition and 'Gather Merge' merge results. As a result, parallel scan and aggregation shows the correct count. Then, in the case of cstore_fdw, it may not be necessary to reserve the shared memory in EstimateDSMForeignScan. > So I guess this hasn't been done before and would require some more > research. I agree. I will try some query patterns. thanks. 2018年8月18日(土) 23:08 Thomas Munro <thomas.munro@enterprisedb.com>: > > On Sun, Aug 19, 2018 at 1:40 AM, Thomas Munro > <thomas.munro@enterprisedb.com> wrote: > > A true parallel scan of an FDW would be one where each process emits > > an arbitrary fraction of the tuples, but together they emit all of the > > tuples. You'd almost certainly need to use some shared memory to > > coordinate that. To say that you support that, I think your > > GetForeignPaths() function would need to call add_partial_path(). And > > unless I'm mistaken, whether or not InitializeDSMForeignScan etc are > > called might be the only indication you get of whether you need to run > > in parallel-aware mode. I haven't personally heard of any FDWs that > > can do this yet, but I just tried hacking file_fdw to register a > > partial path and it seems to work (though of course the results are > > duplicated because the emitted tuples are not actually partial). > > ... though I just noticed that my quick test used "Single Copy" mode. > I think I see why: it looks like core's create_foreignscan_path() > function might need to take num_workers and set parallel_aware if > 0. > So I guess this hasn't been done before and would require some more > research. > > -- > Thomas Munro > http://www.enterprisedb.com -- 高橋 真之
On Sun, Aug 19, 2018 at 4:28 PM, Masayuki Takahashi <masayuki038@gmail.com> wrote: >> If you just supply an IsForeignScanParallelSafe function that returns >> true, that would allow your FDW to be used inside parallel workers and >> wouldn't need any extra shared memory, but it wouldn't be a "parallel >> scan". It would just be "parallel safe". Each process that does a >> scan of your FDW would expect a full normal scan (presumably returning >> the same tuples in each process). > > I think that parallel scan mechanism uses this each worker's full > normal scan to partitioned records, right? > For example, I turned IsForeignScanParallelSafe to true in cstore_fdw > and compared partitioned/non-partitioned scan. > > https://gist.github.com/masayuki038/daa63a21f8c16ffa8138b50db9129ced > > This shows that counted by each partition and 'Gather Merge' merge results. > As a result, parallel scan and aggregation shows the correct count. Ah, so here you have a Parallel Append node. That is a way to get coarse-grained parallelism when you have only parallel-safe (not parallel-aware) scans, but you have partitions. Technically (in our jargon) there is no parallel scan happening here, but Parallel Append is smart enough to scan each partition in a different worker. That means that the 'granularity' of parallelism is whole tables (partitions), so if you have (say) 3 partitions of approximately the same size and 2 processes, you'll probably see that one of the processes scans 1 partition and the other process scans 2 partitions, so the work can be quite unbalanced. But if you have lots of partitions, it's good, and in any case it's certainly better than no parallelism. > Then, in the case of cstore_fdw, it may not be necessary to reserve > the shared memory in EstimateDSMForeignScan. Correct. If all you need is parallel-safe scans, then you probably don't need any shared memory. BTW to be truly pedantically parallel-safe, I think it should ideally be the case that each process has the same "snapshot" when scanning, or subtle inconsistencies could arise (a transaction could be visible to one process, but not to another; this would be weirder if it applied to concurrent scans of the *same* foreign table, but it could still be strange when scanning different partitions in a Parallel Append). For file_fdw, we just didn't worry about that because plain old text files are not transactional anyway, so we shrugged and declared its scans to be parallel safe. I suppose that any FDW that is backed by a non-snapshot-based system (including other RDBMSs) would probably have no way to do better than that, and you might make the same decision we made for file_fdw. When the foreign table is PostgreSQL, or an extension that is tightly integrated into our transaction system, I suppose you might want to think harder and maybe even give the user some options? >> So I guess this hasn't been done before and would require some more >> research. > > I agree. I will try some query patterns. > thanks. Just to be clear, there I was talking about true Parallel Foreign Scan, which is aiming a bit higher than mere parallel safety. After looking at this again, this time with the benefit of coffee, I *think* it should be possible without modifying core, if you do this: 1. As already mentioned, you need to figure out a way for cstore_fdw to hand out a disjoint set of tuples to different processes. That seems quite doable, since cstore is apparently block-structured (though I only skim-read it for about 7 seconds and could be wrong about that). You apparently have blocks and stripes: hopefully they are of fixed size so you might be able to teach each process to advance some kind of atomic variable in shared memory so that each process eats different blocks? 2. Teach your GetForeignPath function to do something like this: ForeignPath *partial_path; double parallel_divisor; int parallel_workers; ... existing code that adds regular non-partial path here ... /* Should we add a partial path to enable a parallel scan? */ partial_path = create_foreignscan_path(root, baserel, NULL, baserel->rows, startup_cost, total_cost, NIL, NULL, NULL, coptions); parallel_workers = compute_parallel_worker(baserel, expected_num_pages, -1, max_parallel_workers_per_gather); partial_path->path.parallel_workers = parallel_workers; partial_path->path.parallel_aware = true; parallel_divisor = get_parallel_divisor(&partial_path->path); partial_path->path.rows /= parallel_divisor; partial_path->path.total_cost = startup_cost + ((total_cost - startup_cost) / parallel_divisor); if (parallel_workers > 0) add_partial_path(baserel, (Path *) partial_path); You don't really have to use compute_parallel_worker() and get_parallel_divisor() if you have a smarter way of coming up with those numbers, but I'd probably use that logic to get started. Unfortunately get_parallel_divisor() is not an extern function so you'd need a clone of it, or equivalent logic. It's also a bit inconvenient that it takes a Path * instead of just parallel_workers, which would allow tidier coding here. It's also inconvenient that you can't ALTER TABLE my_foreign_table SET (parallel_workers = N) today, which compute_parallel_worker() would respect. -- Thomas Munro http://www.enterprisedb.com
On Mon, Aug 20, 2018 at 11:09 AM Thomas Munro <thomas.munro@enterprisedb.com> wrote: > 2. Teach your GetForeignPath function to do something like this: > [blah blah blah] I was pinged off-list by someone who is working on a parallel-aware FDW, who asked if I still had the test code I mentioned up-thread. While digging that out, I couldn't resist hacking it a bit more until it gave the right answers, only sooner: $ seq -f '%20.0f' 1 10000000 > numbers.csv create extension file_fdw; create server files foreign data wrapper file_fdw; create foreign table numbers (n int) server files options (filename '/path/to/numbers.csv', format 'csv'); explain select count(*) from numbers; select count(*) from numbers; Non-parallel: 2.6s 1 worker: 1.4s 2 workers: 0.9s 3 workers: 0.7s Finally, I can do parallel hash joins between CSV files! select count(*) from numbers n1 join numbers n2 using (n); Non-parallel: 11.4s 1 worker: 6.6s 2 workers: 4.8s 3 workers: 4.1s There are probably some weird fence-post or accounting bugs hiding in this patch -- it has to count bytes carefully, and deal with some edge cases around lines that span chunks. It's only a rough draft, but might eventually serve as a useful example of a parallel-aware FDW. -- Thomas Munro https://enterprisedb.com
Attachment
On Thu, May 2, 2019 at 8:06 PM Thomas Munro <thomas.munro@gmail.com> wrote: > I was pinged off-list by someone who is working on a parallel-aware > FDW, who asked if I still had the test code I mentioned up-thread. > While digging that out, I couldn't resist hacking it a bit more until > it gave the right answers, only sooner: Here's a version that fixes an assertion failure when the regression test tries to UPDATE a file_fdw table. That'll teach me to turn off assertions for benchmarking, and then change something, and then post a patch... Added to commitfest. -- Thomas Munro https://enterprisedb.com
Attachment
On Fri, May 3, 2019 at 6:06 AM Thomas Munro <thomas.munro@gmail.com> wrote: > Added to commitfest. Rebased. -- Thomas Munro https://enterprisedb.com
Attachment
> On 1 Jul 2019, at 13:03, Thomas Munro <thomas.munro@gmail.com> wrote: > > On Fri, May 3, 2019 at 6:06 AM Thomas Munro <thomas.munro@gmail.com> wrote: >> Added to commitfest. > > Rebased. Below is a review of this patch. It does what it says on the tin, applies clean without introducing compiler warnings and it seems like a good addition (for both utility and as an example implementation). Testing with various parallel worker settings shows that it properly observes user configuration for parallelism. The tests pass, but there are also no new tests that could fail. I tried to construct a test case for this, but it’s fairly hard to make one that can be added to the repo. Have you given this any thought? Regarding documentation, it seems reasonable to add a sentence on the file_fdw page when the user can expect parallel scan. A few smaller comments on the patch: In the commit message, I assume you mean “writing *is* probablt some way off”: "In theory this could be used for other kinds of parallel copying in the future (but infrastructure for parallel writing it probably some way off)." Single-line comments don’t end with a period IIRC (there a few others but only including the first one here): +/* The size of the chunks used for parallel scans, in bytes. */ Is there a reason to create partial_path before compute_parallel_worker, since the latter can make us end up not adding the path at all? Can’t we reverse the condition on parallel_workers to if (parallel_workers <= 0) and if so skip the partial path?: + partial_path = create_foreignscan_path(root, baserel, NULL, + baserel->rows, + startup_cost, + total_cost, + NIL, NULL, NULL, coptions); + + parallel_workers = compute_parallel_worker(baserel, + fdw_private->pages, -1, + max_parallel_workers_per_gather); ... + if (parallel_workers > 0) + add_partial_path(baserel, (Path *) partial_path); Since this is quite general code which can be used by other extensions as well, maybe we should reword the comment to say so?: + /* For use by file_fdw's parallel scans. */ cheers ./daniel
On 2019-Jul-08, Daniel Gustafsson wrote: > Below is a review of this patch. Any chance for a version addressing Daniel's comments? -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services