Re: [HACKERS] Restricting maximum keep segments by repslots - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: [HACKERS] Restricting maximum keep segments by repslots |
Date | |
Msg-id | 20170913.114306.67844218.horiguchi.kyotaro@lab.ntt.co.jp Whole thread Raw |
In response to | Re: [HACKERS] Restricting maximum keep segments by repslots (Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp>) |
Responses |
Re: [HACKERS] Restricting maximum keep segments by repslots
|
List | pgsql-hackers |
At Thu, 07 Sep 2017 21:59:56 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in <20170907.215956.110216588.horiguchi.kyotaro@lab.ntt.co.jp> > Hello, > > At Thu, 07 Sep 2017 14:12:12 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI <horiguchi.kyotaro@lab.ntt.co.jp> wrote in<20170907.141212.227032666.horiguchi.kyotaro@lab.ntt.co.jp> > > > I would like a flag in pg_replication_slots, and possibly also a > > > numerical column that indicates how far away from the critical point > > > each slot is. That would be great for a monitoring system. > > > > Great! I'll do that right now. > > Done. The CF status of this patch turned into "Waiting on Author". This is because the second patch is posted separately from the first patch. I repost them together after rebasing to the current master. regards, -- Kyotaro Horiguchi NTT Open Source Software Center *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 105,110 **** int wal_level = WAL_LEVEL_MINIMAL; --- 105,111 ---- int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings =5; /* # concurrent xacts needed to sleep */ int wal_retrieve_retry_interval = 5000; + int max_slot_wal_keep_size_mb = 0; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; *************** *** 9365,9373 **** KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) --- 9366,9397 ---- if (max_replication_slots > 0 && keep != InvalidXLogRecPtr) { XLogSegNo slotSegNo; + int slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb); XLByteToSeg(keep, slotSegNo); + /* + * ignore slots if too many wal segments are kept. + * max_slot_wal_keep_size is just accumulated on wal_keep_segments. + */ + if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno) + { + segno = segno - slotlimitsegs; /* must be positive */ + + /* + * warn only if the checkpoint flushes the required segment. + * we assume here that *logSegNo is calculated keep location. + */ + if (slotSegNo < *logSegNo) + ereport(WARNING, + (errmsg ("restart LSN of replication slots is ignored by checkpoint"), + errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld segments.", + (segno < *logSegNo ? segno : *logSegNo) - slotSegNo))); + + /* emergency vent */ + slotSegNo = segno; + } + if (slotSegNo <= 0) segno = 1; else if (slotSegNo < segno) *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 2371,2376 **** static struct config_int ConfigureNamesInt[] = --- 2371,2387 ---- }, { + {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the maximum size of extra WALs kept by replication slots."), + NULL, + GUC_UNIT_MB + }, + &max_slot_wal_keep_size_mb, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { {"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING, gettext_noop("Sets the maximum time towait for WAL replication."), NULL, *** a/src/backend/utils/misc/postgresql.conf.sample --- b/src/backend/utils/misc/postgresql.conf.sample *************** *** 235,240 **** --- 235,241 ---- #max_wal_senders = 10 # max number of walsender processes # (change requires restart)#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables + #max_slot_wal_keep_size = 0 # measured in bytes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #max_replication_slots = 10 # max number of replication slots *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 97,102 **** extern bool reachedConsistency; --- 97,103 ---- extern int min_wal_size_mb; extern int max_wal_size_mb; extern int wal_keep_segments; + extern int max_slot_wal_keep_size_mb; extern int XLOGbuffers; extern int XLogArchiveTimeout; extern int wal_retrieve_retry_interval; *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 9336,9341 **** CreateRestartPoint(int flags) --- 9336,9420 ---- } /* + * Check if the record on the given lsn will be preserved at the next + * checkpoint. + * + * Returns true if it will be preserved. If distance is given, the distance + * from origin to the beginning of the first segment kept at the next + * checkpoint. It means margin when this function returns true and gap of lost + * records when false. + * + * This function should return the consistent result with KeepLogSeg. + */ + bool + GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance) + { + XLogRecPtr currpos; + XLogRecPtr tailpos; + uint64 currSeg; + uint64 restByteInSeg; + uint64 restartSeg; + uint64 tailSeg; + uint64 keepSegs; + + currpos = GetXLogWriteRecPtr(); + + LWLockAcquire(ControlFileLock, LW_SHARED); + tailpos = ControlFile->checkPointCopy.redo; + LWLockRelease(ControlFileLock); + + /* Move the pointer to the beginning of the segment*/ + XLByteToSeg(currpos, currSeg); + XLByteToSeg(restartLSN, restartSeg); + XLByteToSeg(tailpos, tailSeg); + restByteInSeg = 0; + + Assert(wal_keep_segments >= 0); + Assert(max_slot_wal_keep_size_mb >= 0); + + /* + * WAL are removed by the unit of segment. + */ + keepSegs = wal_keep_segments + ConvertToXSegs(max_slot_wal_keep_size_mb); + + /* + * If the latest checkpoint's redo point is older than the current head + * minus keep segments, the next checkpoint keeps the redo point's + * segment. Elsewise use current head minus number of segments to keep. + */ + if (currSeg < tailSeg + keepSegs) + { + if (currSeg < keepSegs) + tailSeg = 0; + else + tailSeg = currSeg - keepSegs; + + /* In this case, the margin will be the bytes to the next segment */ + restByteInSeg = XLogSegSize - (currpos % XLogSegSize); + } + + /* Required sements will be removed at the next checkpoint */ + if (restartSeg < tailSeg) + { + /* Calculate how may bytes the slot have lost */ + if (distance) + { + uint64 restbytes = (restartSeg + 1) * XLogSegSize - restartLSN; + *distance = + (tailSeg - restartSeg - 1) * XLogSegSize + + restbytes; + } + return false; + } + + /* Margin at the next checkpoint before the slot lose sync */ + if (distance) + *distance = (restartSeg - tailSeg) * XLogSegSize + restByteInSeg; + + return true; + } + + /* * Retreat *logSegNo to the last segment that we need to retain because of * either wal_keep_segments or replicationslots. * *** a/src/backend/catalog/system_views.sql --- b/src/backend/catalog/system_views.sql *************** *** 793,799 **** CREATE VIEW pg_replication_slots AS L.xmin, L.catalog_xmin, L.restart_lsn, ! L.confirmed_flush_lsn FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid= D.oid); --- 793,801 ---- L.xmin, L.catalog_xmin, L.restart_lsn, ! L.confirmed_flush_lsn, ! L.live, ! L.distance FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); *** a/src/backend/replication/slotfuncs.c --- b/src/backend/replication/slotfuncs.c *************** *** 182,188 **** pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { ! #define PG_GET_REPLICATION_SLOTS_COLS 11 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; --- 182,188 ---- Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { ! #define PG_GET_REPLICATION_SLOTS_COLS 13 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; *************** *** 304,309 **** pg_get_replication_slots(PG_FUNCTION_ARGS) --- 304,323 ---- else nulls[i++] = true; + if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr) + { + uint64 distance; + + values[i++] = BoolGetDatum(GetMarginToSlotSegmentLimit(restart_lsn, + &distance)); + values[i++] = Int64GetDatum(distance); + } + else + { + values[i++] = BoolGetDatum(true); + nulls[i++] = true; + } + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } LWLockRelease(ReplicationSlotControlLock); *** a/src/include/access/xlog.h --- b/src/include/access/xlog.h *************** *** 267,272 **** extern void ShutdownXLOG(int code, Datum arg); --- 267,273 ---- extern void InitXLOGAccess(void); extern void CreateCheckPoint(int flags); extern bool CreateRestartPoint(intflags); + extern bool GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance); extern void XLogPutNextOid(Oid nextOid);extern XLogRecPtr XLogRestorePoint(const char *rpName); extern void UpdateFullPageWrites(void); *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 5347,5353 **** DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physicalreplication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 02278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replicationslot"); ! DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}""{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null__null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use");DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 1916" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot_null_ _null_ _null_ )); DESCR("set up a logical replication slot"); --- 5347,5353 ---- DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSPPGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null__null_ )); DESCR("drop a replication slot"); ! DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}""{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}" _null__null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use");DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 1916" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot_null_ _null_ _null_ )); DESCR("set up a logical replication slot"); -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers
pgsql-hackers by date: