RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | houzj.fnst@fujitsu.com |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | OS0PR01MB57167BF64FC0891734C8E81A94149@OS0PR01MB5716.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Masahiko Sawada <sawada.mshk@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Thursday, December 1, 2022 3:58 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote: > > On Wed, Nov 30, 2022 at 10:51 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > On Wednesday, November 30, 2022 9:41 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila > > > > Review comments on v53-0001* > > > > > > Attach the new version patch set. > > > > Sorry, there were some mistakes in the previous patch set. > > Here is the correct V54 patch set. I also ran pgindent for the patch set. > > > > Thank you for updating the patches. Here are random review comments for > 0001 and 0002 patches. Thanks for the comments! > > ereport(ERROR, > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("logical replication parallel apply worker exited > abnormally"), > errcontext("%s", edata.context))); and > > ereport(ERROR, > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("logical replication parallel apply worker exited > because of subscription information change"))); > > I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate > here. Given that parallel apply worker has already reported the error message > with the error code, I think we don't need to set the errorcode for the logs > from the leader process. > > Also, I'm not sure the term "exited abnormally" is appropriate since we use it > when the server crashes for example. I think ERRORs reported here don't mean > that in general. How about reporting "xxx worker exited due to error" ? > --- > if (am_parallel_apply_worker() && on_subinfo_change) { > /* > * If a parallel apply worker exits due to the subscription > * information change, we notify the leader apply worker so that the > * leader can report more meaningful message in time and restart the > * logical replication. > */ > pq_putmessage('X', NULL, 0); > } > > and > > ereport(ERROR, > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > errmsg("logical replication parallel apply worker exited > because of subscription information change"))); > > Do we really need an additional message in case of 'X'? When we call > apply_worker_clean_exit with on_subinfo_change = true, we have reported the > error message such as: > > ereport(LOG, > (errmsg("logical replication parallel apply worker for subscription > \"%s\" will stop because of a parameter change", > MySubscription->name))); > > I think that reporting a similar message from the leader might not be > meaningful for users. The intention is to let leader report more meaningful message if a worker exited due to subinfo change. Otherwise, the leader is likely to report an error like " lost connection ... to parallel apply worker" when trying to send data via shared memory if the worker exited. What do you think ? > --- > - if (options->proto.logical.streaming && > - PQserverVersion(conn->streamConn) >= 140000) > - appendStringInfoString(&cmd, ", streaming 'on'"); > + if (options->proto.logical.streaming_str) > + appendStringInfo(&cmd, ", streaming '%s'", > + > options->proto.logical.streaming_str); > > and > > + /* > + * Assign the appropriate option value for streaming option > according to > + * the 'streaming' mode and the publisher's ability to > support that mode. > + */ > + if (server_version >= 160000 && > + MySubscription->stream == SUBSTREAM_PARALLEL) > + { > + options.proto.logical.streaming_str = pstrdup("parallel"); > + MyLogicalRepWorker->parallel_apply = true; > + } > + else if (server_version >= 140000 && > + MySubscription->stream != SUBSTREAM_OFF) > + { > + options.proto.logical.streaming_str = pstrdup("on"); > + MyLogicalRepWorker->parallel_apply = false; > + } > + else > + { > + options.proto.logical.streaming_str = NULL; > + MyLogicalRepWorker->parallel_apply = false; > + } > > This change moves the code of adjustment of the streaming option based on > the publisher server version from libpqwalreceiver.c to worker.c. > On the other hand, the similar logic for other parameters such as "two_phase" > and "origin" are still done in libpqwalreceiver.c. How about passing > MySubscription->stream via WalRcvStreamOptions and constructing a > streaming option string in libpqrcv_startstreaming()? > In ApplyWorkerMain(), we just need to set > MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000 > && MySubscription->stream == SUBSTREAM_PARALLEL). We won't need > pstrdup for "parallel" and "on", and it's more consistent with other parameters. Thanks for the suggestion. I thought about the same idea before, but it seems we would weed to introduce " pg_subscription.h " into libpqwalreceiver.c. The libpqwalreceiver.c looks a like a common place. So I am not sure is it looks better to expose the detail of streaming option to it. > --- > + * We maintain a worker pool to avoid restarting workers for each > + streaming > + * transaction. We maintain each worker's information in the > > Do we need to describe the pool in the doc? I thought the worker pool is kind of internal information. Maybe we can add it later if receive some feedback about this after pushing the main patch. > --- > + * in AccessExclusive mode at transaction finish commands > + (STREAM_COMMIT and > + * STREAM_PREAPRE) and release it immediately. > > typo, s/STREAM_PREAPRE/STREAM_PREPARE/ Will change. > --- > +/* Parallel apply workers hash table (initialized on first use). */ > +static HTAB *ParallelApplyWorkersHash = NULL; > + > +/* > + * A list to maintain the active parallel apply workers. The > +information for > + * the new worker is added to the list after successfully launching it. > +The > + * list entry is removed if there are already enough workers in the > +worker > + * pool either at the end of the transaction or while trying to find a > +free > + * worker for applying the transaction. For more information about the > +worker > + * pool, see comments atop this file. > + */ > +static List *ParallelApplyWorkersList = NIL; > > The names ParallelApplyWorkersHash and ParallelWorkersList are very similar > but the usages are completely different. Probably we can find better names > such as ParallelApplyTxnHash and ParallelApplyWorkerPool. > And probably we can add more comments for ParallelApplyWorkersHash. Will change. > --- > if (winfo->serialize_changes || > napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) { > int slot_no; > uint16 generation; > > SpinLockAcquire(&winfo->shared->mutex); > generation = winfo->shared->logicalrep_worker_generation; > slot_no = winfo->shared->logicalrep_worker_slot_no; > SpinLockRelease(&winfo->shared->mutex); > > logicalrep_pa_worker_stop(slot_no, generation); > > pa_free_worker_info(winfo); > > return true; > } > > /* Unlink any files that were needed to serialize partial changes. */ if > (winfo->serialize_changes) > stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); > > If winfo->serialize_changes is true, we return true in the first if statement. So > stream_cleanup_files in the second if statement is never executed. pa_free_worker_info will also cleanup the fileset. But I think I can move that stream_cleanup_files before the "... napplyworkers > (max_parallel_apply_workers_per_subscription / 2))" check so that it would be more clear. > --- > + /* > + * First, try to get a parallel apply worker from the pool, > if available. > + * Otherwise, try to start a new parallel apply worker. > + */ > + winfo = pa_get_available_worker(); > + if (!winfo) > + { > + winfo = pa_init_and_launch_worker(); > + if (!winfo) > + return; > + } > > I think we don't necessarily need to separate two functions for getting a worker > from the pool and launching a new worker. It seems to reduce the readability. > Instead, I think that we can have one function that returns winfo if there is a free > worker in the worker pool or it launches a worker. That way, we can simply do > like: > > winfo = pg_launch_parallel_worker() > if (!winfo) > return; Will change > --- > + /* Setup replication origin tracking. */ > + StartTransactionCommand(); > + ReplicationOriginNameForLogicalRep(MySubscription->oid, > + InvalidOid, > + > originname, sizeof(originname)); > + originid = replorigin_by_name(originname, true); > + if (!OidIsValid(originid)) > + originid = replorigin_create(originname); > > This code looks to allow parallel workers to use different origins in cases where > the origin doesn't exist, but is that okay? Shouldn't we pass miassing_ok = false > in this case? > Will change > --- > cfbot seems to fails: > > https://cirrus-ci.com/task/6264595342426112 Thanks for reporting, it's due to a testcase problem, I will fix that test soon. Best regards, Hou zj
pgsql-hackers by date: