Re: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Amit Kapila |
---|---|
Subject | Re: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | CAA4eK1+YJahJtdn+czAj8K8Pe5Y6UnidwDwi7m9xctB+xmw=-A@mail.gmail.com Whole thread Raw |
In response to | Re: Perform streaming logical transactions by background workers and parallel apply (Masahiko Sawada <sawada.mshk@gmail.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
|
List | pgsql-hackers |
On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada.mshk@gmail.com> wrote: > > On Wed, Dec 7, 2022 at 10:03 PM houzj.fnst@fujitsu.com > <houzj.fnst@fujitsu.com> wrote: > > > > > > > +static void > > > +ProcessParallelApplyInterrupts(void) > > > +{ > > > + CHECK_FOR_INTERRUPTS(); > > > + > > > + if (ShutdownRequestPending) > > > + { > > > + ereport(LOG, > > > + (errmsg("logical replication parallel > > > apply worker for subscrip > > > tion \"%s\" has finished", > > > + MySubscription->name))); > > > + > > > + apply_worker_clean_exit(false); > > > + } > > > + > > > + if (ConfigReloadPending) > > > + { > > > + ConfigReloadPending = false; > > > + ProcessConfigFile(PGC_SIGHUP); > > > + } > > > +} > > > > > > I personally think that we don't need to have a function to do only > > > these few things. > > > > I thought that introduce a new function make the handling of worker specific > > Interrupts logic similar to other existing ones. Like: > > ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in > > pgarch.c ... > > I think the difference from them is that there is only one place to > call ProcessParallelApplyInterrupts(). > But I feel it is better to isolate this code in a separate function. What if we decide to extend it further by having some logic to stop workers after reloading of config? > > > > > --- > > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn); > > > options.proto.logical.proto_version = > > > + server_version >= 160000 ? > > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : > > > server_version >= 150000 ? > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : > > > server_version >= 140000 ? > > > LOGICALREP_PROTO_STREAM_VERSION_NUM : > > > LOGICALREP_PROTO_VERSION_NUM; > > > > > > Instead of always using the new protocol version, I think we can use > > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not > > > 'parallel'. That way, we don't need to change protocl version check > > > logic in pgoutput.c and don't need to expose defGetStreamingMode(). > > > What do you think? > > > > I think that some user can also use the new version number when trying to get > > changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel > > leave the check for new version number seems fine. > > > > Besides, I feel even if we don't use new version number, we still need to use > > defGetStreamingMode to check if parallel mode in used as we need to send > > abort_lsn when parallel is in used. I might be missing something, sorry for > > that. Can you please explain the idea a bit ? > > My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if > (server_version >= 160000 && MySubscription->stream == > SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is > 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol > version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need > to send "streaming = parallel" to the publisher since the publisher > can decide whether or not to send abort_lsn based on the protocol > version (still needs to send "streaming = on" though). I might be > missing something. > What if we decide to send some more additional information as part of another patch like we are discussing in the thread [1]? Now, we won't be able to decide the version number based on just the streaming option. Also, in such a case, even for LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good idea to send additional abort information unless the user has used the streaming=parallel option. [1] - https://www.postgresql.org/message-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_%3DY6vg%40mail.gmail.com -- With Regards, Amit Kapila.
pgsql-hackers by date: