Thread: Re: logical replication: restart_lsn can go backwards (and more), seems broken since 9.4

Hi,

I kept investigating this, but I haven't made much progress. I still
don't understand why would it be OK to move any of the LSN fields
backwards - certainly for fields like confirm_flush or restart_lsn.

I did a simple experiment - added asserts to the couple places in
logical.c updating the the LSN fields, checking the value is increased.
But then I simply ran make check-world, instead of the stress test.

And that actually fails too, 040_standby_failover_slots_sync.pl triggers
this

    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
        Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
        MyReplicationSlot->data.confirmed_flush = lsn;
        SpinLockRelease(&MyReplicationSlot->mutex);
    }

So this moves confirm_flush back, albeit only by a tiny amount (I've
seen ~56 byte difference). I don't have an example of this causing an
issue in practice, but I note that CheckPointReplicationSlots does this:

    if (is_shutdown && SlotIsLogical(s))
    {
        SpinLockAcquire(&s->mutex);

        if (s->data.invalidated == RS_INVAL_NONE &&
            s->data.confirmed_flush > s->last_saved_confirmed_flush)
        {
            s->just_dirtied = true;
            s->dirty = true;
        }
        SpinLockRelease(&s->mutex);
    }

to determine if a slot needs to be flushed to disk during checkpoint. So
I guess it's possible we save a slot to disk at some LSN, then the
confirm_flush moves backward, and we fail to sync the slot to disk.

But I don't have a reproducer for this ...


I also noticed a strange difference between LogicalIncreaseXminForSlot
and LogicalIncreaseRestartDecodingForSlot.

The structure of LogicalIncreaseXminForSlot looks like this:

    if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    {
    }
    else if (current_lsn <= slot->data.confirmed_flush)
    {
        ... update candidate fields ...
    }
    else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    {
        ... update candidate fields ...
    }

while LogicalIncreaseRestartDecodingForSlot looks like this:

    if (restart_lsn <= slot->data.restart_lsn)
    {
    }
    else if (current_lsn <= slot->data.confirmed_flush)
    {
        ... update candidate fields ...
    }

    if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    {
        ... update candidate fields ...
    }

Notice that LogicalIncreaseXminForSlot has the third block guarded by
"else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
that a bit suspicious, considering the functions do the same thing, just
for different fields? I don't know if this is dangerous, the comments
suggest it may just waste extra effort after reconnect.


regards

-- 
Tomas Vondra




Hi,

Thank you for investigating this issue.

On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:
>
> Hi,
>
> I kept investigating this, but I haven't made much progress. I still
> don't understand why would it be OK to move any of the LSN fields
> backwards - certainly for fields like confirm_flush or restart_lsn.
>
> I did a simple experiment - added asserts to the couple places in
> logical.c updating the the LSN fields, checking the value is increased.
> But then I simply ran make check-world, instead of the stress test.
>
> And that actually fails too, 040_standby_failover_slots_sync.pl triggers
> this
>
>     {
>         SpinLockAcquire(&MyReplicationSlot->mutex);
>         Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
>         MyReplicationSlot->data.confirmed_flush = lsn;
>         SpinLockRelease(&MyReplicationSlot->mutex);
>     }
>
> So this moves confirm_flush back, albeit only by a tiny amount (I've
> seen ~56 byte difference). I don't have an example of this causing an
> issue in practice, but I note that CheckPointReplicationSlots does this:
>
>     if (is_shutdown && SlotIsLogical(s))
>     {
>         SpinLockAcquire(&s->mutex);
>
>         if (s->data.invalidated == RS_INVAL_NONE &&
>             s->data.confirmed_flush > s->last_saved_confirmed_flush)
>         {
>             s->just_dirtied = true;
>             s->dirty = true;
>         }
>         SpinLockRelease(&s->mutex);
>     }
>
> to determine if a slot needs to be flushed to disk during checkpoint. So
> I guess it's possible we save a slot to disk at some LSN, then the
> confirm_flush moves backward, and we fail to sync the slot to disk.
>
> But I don't have a reproducer for this ...
>
>
> I also noticed a strange difference between LogicalIncreaseXminForSlot
> and LogicalIncreaseRestartDecodingForSlot.
>
> The structure of LogicalIncreaseXminForSlot looks like this:
>
>     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
>     {
>     }
>     else if (current_lsn <= slot->data.confirmed_flush)
>     {
>         ... update candidate fields ...
>     }
>     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
>     {
>         ... update candidate fields ...
>     }
>
> while LogicalIncreaseRestartDecodingForSlot looks like this:
>
>     if (restart_lsn <= slot->data.restart_lsn)
>     {
>     }
>     else if (current_lsn <= slot->data.confirmed_flush)
>     {
>         ... update candidate fields ...
>     }
>
>     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
>     {
>         ... update candidate fields ...
>     }
>
> Notice that LogicalIncreaseXminForSlot has the third block guarded by
> "else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
> that a bit suspicious, considering the functions do the same thing, just
> for different fields? I don't know if this is dangerous, the comments
> suggest it may just waste extra effort after reconnect.
>

I also suspected this point. I still need to investigate if this
suspicion is related to the issue but I find this code in
LogicalIncreaseRestartDecodingForSlot() is dangerous.

We update slot's restart_lsn based on candidate_lsn and
candidate_valid upon receiving a feedback message from a subscriber,
then clear both fields. Therefore, this code in
LogicalIncreaseRestartDecodingForSlot() means that it sets an
arbitrary LSN to candidate_restart_lsn after updating slot's
restart_lsn.

I think an LSN older than slot's restart_lsn can be passed to
LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
after logical decoding restarts; My scenario I shared on another
thread was that after updating slot's restart_lsn (upon feedback from
a subscriber) based on both candidate_restart_lsn and
candidate_restart_valid that are remained in the slot, we might call
LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
record whose LSN is older than the slot's new restart_lsn. In this
case, we end up passing an LSN older than the new restart_lsn to
LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
candidate_restart_lsn. My hypothesis is that we wanted to prevent such
case by the first if block:

    /* don't overwrite if have a newer restart lsn */
    if (restart_lsn <= slot->data.restart_lsn)
    {
    }

Regards,

[1] https://www.postgresql.org/message-id/CAD21AoBG2OSDOFTtpPtQ7fx5Vt8p3dS5hPAv28CBSC6z2kHx-g%40mail.gmail.com

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com




On 11/8/24 19:25, Masahiko Sawada wrote:
> Hi,
> 
> Thank you for investigating this issue.
> 
> On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:
>>
>> Hi,
>>
>> I kept investigating this, but I haven't made much progress. I still
>> don't understand why would it be OK to move any of the LSN fields
>> backwards - certainly for fields like confirm_flush or restart_lsn.
>>
>> I did a simple experiment - added asserts to the couple places in
>> logical.c updating the the LSN fields, checking the value is increased.
>> But then I simply ran make check-world, instead of the stress test.
>>
>> And that actually fails too, 040_standby_failover_slots_sync.pl triggers
>> this
>>
>>     {
>>         SpinLockAcquire(&MyReplicationSlot->mutex);
>>         Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
>>         MyReplicationSlot->data.confirmed_flush = lsn;
>>         SpinLockRelease(&MyReplicationSlot->mutex);
>>     }
>>
>> So this moves confirm_flush back, albeit only by a tiny amount (I've
>> seen ~56 byte difference). I don't have an example of this causing an
>> issue in practice, but I note that CheckPointReplicationSlots does this:
>>
>>     if (is_shutdown && SlotIsLogical(s))
>>     {
>>         SpinLockAcquire(&s->mutex);
>>
>>         if (s->data.invalidated == RS_INVAL_NONE &&
>>             s->data.confirmed_flush > s->last_saved_confirmed_flush)
>>         {
>>             s->just_dirtied = true;
>>             s->dirty = true;
>>         }
>>         SpinLockRelease(&s->mutex);
>>     }
>>
>> to determine if a slot needs to be flushed to disk during checkpoint. So
>> I guess it's possible we save a slot to disk at some LSN, then the
>> confirm_flush moves backward, and we fail to sync the slot to disk.
>>
>> But I don't have a reproducer for this ...
>>
>>
>> I also noticed a strange difference between LogicalIncreaseXminForSlot
>> and LogicalIncreaseRestartDecodingForSlot.
>>
>> The structure of LogicalIncreaseXminForSlot looks like this:
>>
>>     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
>>     {
>>     }
>>     else if (current_lsn <= slot->data.confirmed_flush)
>>     {
>>         ... update candidate fields ...
>>     }
>>     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
>>     {
>>         ... update candidate fields ...
>>     }
>>
>> while LogicalIncreaseRestartDecodingForSlot looks like this:
>>
>>     if (restart_lsn <= slot->data.restart_lsn)
>>     {
>>     }
>>     else if (current_lsn <= slot->data.confirmed_flush)
>>     {
>>         ... update candidate fields ...
>>     }
>>
>>     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
>>     {
>>         ... update candidate fields ...
>>     }
>>
>> Notice that LogicalIncreaseXminForSlot has the third block guarded by
>> "else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
>> that a bit suspicious, considering the functions do the same thing, just
>> for different fields? I don't know if this is dangerous, the comments
>> suggest it may just waste extra effort after reconnect.
>>
> 
> I also suspected this point. I still need to investigate if this
> suspicion is related to the issue but I find this code in
> LogicalIncreaseRestartDecodingForSlot() is dangerous.
> 
> We update slot's restart_lsn based on candidate_lsn and
> candidate_valid upon receiving a feedback message from a subscriber,
> then clear both fields. Therefore, this code in
> LogicalIncreaseRestartDecodingForSlot() means that it sets an
> arbitrary LSN to candidate_restart_lsn after updating slot's
> restart_lsn.
> 
> I think an LSN older than slot's restart_lsn can be passed to
> LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
> after logical decoding restarts; My scenario I shared on another
> thread was that after updating slot's restart_lsn (upon feedback from
> a subscriber) based on both candidate_restart_lsn and
> candidate_restart_valid that are remained in the slot, we might call
> LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
> record whose LSN is older than the slot's new restart_lsn. In this
> case, we end up passing an LSN older than the new restart_lsn to
> LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
> candidate_restart_lsn.

Right, I believe that matches my observations. I only see the issues
after (unexpected) restarts, say due to network issues, but chances are
regular reconnects have the same problem.

> My hypothesis is that we wanted to prevent such
> case by the first if block:
> 
>     /* don't overwrite if have a newer restart lsn */
>     if (restart_lsn <= slot->data.restart_lsn)
>     {
>     }
> 

Yeah, that condition / comment seems to say exactly that.

Do you plan / expect to work on fixing this? It seems you proposed the
right fix in that old thread, but it's been inactive since 2023/02 :-(


regards

-- 
Tomas Vondra




On Sat, Nov 9, 2024 at 3:45 AM Tomas Vondra <tomas@vondra.me> wrote:
>
>
>
> On 11/8/24 19:25, Masahiko Sawada wrote:
> > Hi,
> >
> > Thank you for investigating this issue.
> >
> > On Thu, Nov 7, 2024 at 10:40 AM Tomas Vondra <tomas@vondra.me> wrote:
> >>
> >> Hi,
> >>
> >> I kept investigating this, but I haven't made much progress. I still
> >> don't understand why would it be OK to move any of the LSN fields
> >> backwards - certainly for fields like confirm_flush or restart_lsn.
> >>
> >> I did a simple experiment - added asserts to the couple places in
> >> logical.c updating the the LSN fields, checking the value is increased.
> >> But then I simply ran make check-world, instead of the stress test.
> >>
> >> And that actually fails too, 040_standby_failover_slots_sync.pl triggers
> >> this
> >>
> >>     {
> >>         SpinLockAcquire(&MyReplicationSlot->mutex);
> >>         Assert(MyReplicationSlot->data.confirmed_flush <= lsn);
> >>         MyReplicationSlot->data.confirmed_flush = lsn;
> >>         SpinLockRelease(&MyReplicationSlot->mutex);
> >>     }
> >>
> >> So this moves confirm_flush back, albeit only by a tiny amount (I've
> >> seen ~56 byte difference). I don't have an example of this causing an
> >> issue in practice, but I note that CheckPointReplicationSlots does this:
> >>
> >>     if (is_shutdown && SlotIsLogical(s))
> >>     {
> >>         SpinLockAcquire(&s->mutex);
> >>
> >>         if (s->data.invalidated == RS_INVAL_NONE &&
> >>             s->data.confirmed_flush > s->last_saved_confirmed_flush)
> >>         {
> >>             s->just_dirtied = true;
> >>             s->dirty = true;
> >>         }
> >>         SpinLockRelease(&s->mutex);
> >>     }
> >>
> >> to determine if a slot needs to be flushed to disk during checkpoint. So
> >> I guess it's possible we save a slot to disk at some LSN, then the
> >> confirm_flush moves backward, and we fail to sync the slot to disk.
> >>
> >> But I don't have a reproducer for this ...
> >>
> >>
> >> I also noticed a strange difference between LogicalIncreaseXminForSlot
> >> and LogicalIncreaseRestartDecodingForSlot.
> >>
> >> The structure of LogicalIncreaseXminForSlot looks like this:
> >>
> >>     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
> >>     {
> >>     }
> >>     else if (current_lsn <= slot->data.confirmed_flush)
> >>     {
> >>         ... update candidate fields ...
> >>     }
> >>     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
> >>     {
> >>         ... update candidate fields ...
> >>     }
> >>
> >> while LogicalIncreaseRestartDecodingForSlot looks like this:
> >>
> >>     if (restart_lsn <= slot->data.restart_lsn)
> >>     {
> >>     }
> >>     else if (current_lsn <= slot->data.confirmed_flush)
> >>     {
> >>         ... update candidate fields ...
> >>     }
> >>
> >>     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
> >>     {
> >>         ... update candidate fields ...
> >>     }
> >>
> >> Notice that LogicalIncreaseXminForSlot has the third block guarded by
> >> "else if", while LogicalIncreaseRestartDecodingForSlot has "if". Isn't
> >> that a bit suspicious, considering the functions do the same thing, just
> >> for different fields? I don't know if this is dangerous, the comments
> >> suggest it may just waste extra effort after reconnect.
> >>
> >
> > I also suspected this point. I still need to investigate if this
> > suspicion is related to the issue but I find this code in
> > LogicalIncreaseRestartDecodingForSlot() is dangerous.
> >
> > We update slot's restart_lsn based on candidate_lsn and
> > candidate_valid upon receiving a feedback message from a subscriber,
> > then clear both fields. Therefore, this code in
> > LogicalIncreaseRestartDecodingForSlot() means that it sets an
> > arbitrary LSN to candidate_restart_lsn after updating slot's
> > restart_lsn.
> >
> > I think an LSN older than slot's restart_lsn can be passed to
> > LogicalIncreaseRestartDecodingForSlot() as restart_lsn for example
> > after logical decoding restarts; My scenario I shared on another
> > thread was that after updating slot's restart_lsn (upon feedback from
> > a subscriber) based on both candidate_restart_lsn and
> > candidate_restart_valid that are remained in the slot, we might call
> > LogicalIncreaseRestartDecodingForSlot() when decoding a RUNNING_XACTS
> > record whose LSN is older than the slot's new restart_lsn. In this
> > case, we end up passing an LSN older than the new restart_lsn to
> > LogicalIncreaseRestartDecodingForSlot(), and that LSN is set to
> > candidate_restart_lsn.
>
> Right, I believe that matches my observations. I only see the issues
> after (unexpected) restarts, say due to network issues, but chances are
> regular reconnects have the same problem.
>
> > My hypothesis is that we wanted to prevent such
> > case by the first if block:
> >
> >     /* don't overwrite if have a newer restart lsn */
> >     if (restart_lsn <= slot->data.restart_lsn)
> >     {
> >     }
> >
>
> Yeah, that condition / comment seems to say exactly that.
>
> Do you plan / expect to work on fixing this? It seems you proposed the
> right fix in that old thread, but it's been inactive since 2023/02 :-(

I'm happy to work on this fix. At that time, I was unsure if my fix
was really correct and there was no further discussion.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com



On 11/11/24 21:56, Masahiko Sawada wrote:
> ...
>>
>>> My hypothesis is that we wanted to prevent such
>>> case by the first if block:
>>>
>>>     /* don't overwrite if have a newer restart lsn */
>>>     if (restart_lsn <= slot->data.restart_lsn)
>>>     {
>>>     }
>>>
>>
>> Yeah, that condition / comment seems to say exactly that.
>>
>> Do you plan / expect to work on fixing this? It seems you proposed the
>> right fix in that old thread, but it's been inactive since 2023/02 :-(
> 
> I'm happy to work on this fix. At that time, I was unsure if my fix
> was really correct and there was no further discussion.
> 

Thanks. I'm not sure about the correctness either, but I think it's
clear the issue is real, and it's not difficult to reproduce it.

regards

-- 
Tomas Vondra