RE: Perform streaming logical transactions by background workers and parallel apply - Mailing list pgsql-hackers
From | Hayato Kuroda (Fujitsu) |
---|---|
Subject | RE: Perform streaming logical transactions by background workers and parallel apply |
Date | |
Msg-id | TYAPR01MB5866235A3B754897C03BCA1AF5179@TYAPR01MB5866.jpnprd01.prod.outlook.com Whole thread Raw |
In response to | RE: Perform streaming logical transactions by background workers and parallel apply ("houzj.fnst@fujitsu.com" <houzj.fnst@fujitsu.com>) |
Responses |
Re: Perform streaming logical transactions by background workers and parallel apply
RE: Perform streaming logical transactions by background workers and parallel apply |
List | pgsql-hackers |
Dear Hou, Thanks for making the patch. Followings are my comments for v54-0003 and 0004. 0003 pa_free_worker() + /* Unlink any files that were needed to serialize partial changes. */ + if (winfo->serialize_changes) + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); + I think this part is not needed, because the LA cannot reach here if winfo->serialize_changes is true. Moreover stream_cleanup_files()is done in pa_free_worker_info(). LogicalParallelApplyLoop() The parallel apply worker wakes up every 0.1s even if we are in the PARTIAL_SERIALIZE mode. Do you have idea to reduce that? ``` + pa_spooled_messages(); ``` Comments are needed here, like "Changes may be serialize...". pa_stream_abort() ``` + /* + * Reopen the file and set the file position to the saved + * position. + */ + if (reopen_stream_fd) + { + char path[MAXPGPATH]; + + changes_filename(path, MyLogicalRepWorker->subid, xid); + stream_fd = BufFileOpenFileSet(&MyParallelShared->fileset, + path, O_RDONLY, false); + BufFileSeek(stream_fd, fileno, offset, SEEK_SET); + } ``` MyParallelShared->serialize_changes may be used instead of reopen_stream_fd. worker.c ``` -#include "storage/buffile.h" ``` I think this include should not be removed. handle_streamed_transaction() ``` + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_change(action, &original_msg); ``` Comments are needed here, 0001 has that bu removed in 0002. There are some similar lines. ``` + /* + * It is possible that while sending this change to parallel apply + * worker we need to switch to serialize mode. + */ + if (winfo->serialize_changes) + pa_set_fileset_state(winfo->shared, FS_READY); ``` There are three same parts in the code, can we combine them to common part? apply_spooled_messages() ``` + /* + * Break the loop if the parallel apply worker has finished applying + * the transaction. The parallel apply worker should have closed the + * file before committing. + */ + if (am_parallel_apply_worker() && + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED) + goto done; ``` I thnk pfree(buffer) and pfree(s2.data) should not be skippied. And this part should be at below "nchanges++;" 0004 set_subscription_retry() ``` + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + ``` I think AccessExclusiveLock should be aquired instead of AccessShareLock. In AlterSubscription(), LockSharedObject(AccessExclusiveLock) seems to be used. Best Regards, Hayato Kuroda FUJITSU LIMITED
pgsql-hackers by date: