diff --git a/CHANGELOG.md b/CHANGELOG.md index 502b9bb1e5..0d79c3f177 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes +- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262) - Add `sequencer_blocks_synchronized_total` Prometheus counter metric tracking blocks synced by source (DA/P2P) [#3259](https://github.com/evstack/ev-node/pull/3259) - Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235) - Add solo sequencer (simple in memory single sequencer without force inclusion) [#3235](https://github.com/evstack/ev-node/pull/3235) diff --git a/block/internal/da/async_block_retriever.go b/block/internal/da/async_block_retriever.go index 2230df87c4..5c1ac156f2 100644 --- a/block/internal/da/async_block_retriever.go +++ b/block/internal/da/async_block_retriever.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "google.golang.org/protobuf/proto" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -186,31 +187,31 @@ func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.Subscr // HandleCatchup fetches a single height via Retrieve and caches it. // Also applies the prefetch window for speculative forward fetching. -func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) error { +func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { if err := ctx.Err(); err != nil { - return err + return nil, err } if _, err := f.cache.Get(ctx, newBlockDataKey(daHeight)); err != nil { if err := f.fetchAndCacheBlock(ctx, daHeight); err != nil { - return err + return nil, err } } // Speculatively prefetch ahead. target := daHeight + f.prefetchWindow for h := daHeight + 1; h <= target; h++ { if err := ctx.Err(); err != nil { - return err + return nil, err } if _, err := f.cache.Get(ctx, newBlockDataKey(h)); err == nil { continue // Already cached. } if err := f.fetchAndCacheBlock(ctx, h); err != nil { - return err + return nil, err } } - return nil + return nil, nil } // fetchAndCacheBlock fetches a block via Retrieve and caches it. diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index 8ff46773ce..d56a5f5952 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -23,9 +24,13 @@ type SubscriberHandler interface { HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error // HandleCatchup is called for each height during sequential catchup. - // The subscriber advances localDAHeight only after this returns (true, nil). + // The subscriber advances localDAHeight only after this returns nil. // Returning an error rolls back localDAHeight and triggers a backoff retry. - HandleCatchup(ctx context.Context, height uint64) error + // The returned events are the DAHeightEvents produced for this height + // (may be nil/empty). The subscriber does not interpret them; they are + // returned so that higher-level callers (via the Subscriber) can inspect + // the results without coupling to the handler's internals. + HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) } // SubscriberConfig holds configuration for creating a Subscriber. @@ -39,6 +44,14 @@ type SubscriberConfig struct { FetchBlockTimestamp bool // the timestamp comes with an extra api call before Celestia v0.29.1-mocha. StartHeight uint64 // initial localDAHeight + + // WalkbackChecker is an optional callback invoked after each successful + // HandleCatchup call. It receives the DA height just processed and the + // events returned by the handler. If it returns a non-zero DA height, + // the subscriber rewinds to that height so it re-fetches on the next + // iteration. Return 0 to continue normally. + // This is nil for subscribers that don't need walkback (e.g. async block retriever). + WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 } // Subscriber is a shared DA subscription primitive that encapsulates the @@ -48,9 +61,10 @@ type SubscriberConfig struct { // // Used by both DAFollower (syncing) and asyncBlockRetriever (forced inclusion). type Subscriber struct { - client Client - logger zerolog.Logger - handler SubscriberHandler + client Client + logger zerolog.Logger + handler SubscriberHandler + walkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 // namespaces to subscribe on. When multiple, they are merged. namespaces [][]byte @@ -91,6 +105,7 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber { client: cfg.Client, logger: cfg.Logger, handler: cfg.Handler, + walkbackChecker: cfg.WalkbackChecker, namespaces: cfg.Namespaces, catchupSignal: make(chan struct{}, 1), daBlockTime: cfg.DABlockTime, @@ -159,6 +174,23 @@ func (s *Subscriber) HasReachedHead() bool { return s.headReached.Load() } +// RewindTo sets localDAHeight back to the given height and signals the catchup +// loop so that DA heights are re-fetched. This is used when the primary source +// (P2P) stalls and DA needs to take over for the missing range. +func (s *Subscriber) RewindTo(daHeight uint64) { + for { + cur := s.localDAHeight.Load() + if daHeight >= cur { + return + } + if s.localDAHeight.CompareAndSwap(cur, daHeight) { + s.headReached.Store(false) + s.signalCatchup() + return + } + } +} + // signalCatchup sends a non-blocking signal to wake catchupLoop. func (s *Subscriber) signalCatchup() { select { @@ -356,7 +388,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) { continue } - if err := s.handler.HandleCatchup(ctx, local); err != nil { + if events, err := s.handler.HandleCatchup(ctx, local); err != nil { // Roll back so we can retry after backoff. s.localDAHeight.Store(local) if errors.Is(err, datypes.ErrHeightFromFuture) && local >= highest { @@ -366,6 +398,10 @@ func (s *Subscriber) runCatchup(ctx context.Context) { if !s.shouldContinueCatchup(ctx, err, local) { return } + } else if s.walkbackChecker != nil { + if rewindTo := s.walkbackChecker(local, events); rewindTo > 0 { + s.RewindTo(rewindTo) + } } } } diff --git a/block/internal/da/subscriber_test.go b/block/internal/da/subscriber_test.go index 2ed80886de..2b873977b1 100644 --- a/block/internal/da/subscriber_test.go +++ b/block/internal/da/subscriber_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" testmocks "github.com/evstack/ev-node/test/mocks" ) @@ -24,9 +25,9 @@ func (m *MockSubscriberHandler) HandleEvent(ctx context.Context, ev datypes.Subs return args.Error(0) } -func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) error { +func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) { args := m.Called(ctx, height) - return args.Error(0) + return args.Get(0).([]common.DAHeightEvent), args.Error(1) } func TestSubscriber_RunCatchup(t *testing.T) { @@ -49,8 +50,8 @@ func TestSubscriber_RunCatchup(t *testing.T) { // It should process observed heights [100..101] then stop when local passes highestSeen. sub.updateHighest(101) sub.seenSubscriptionEvent.Store(true) - mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return(nil).Once() - mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return(nil).Once() + mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return([]common.DAHeightEvent(nil), nil).Once() + mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return([]common.DAHeightEvent(nil), nil).Once() sub.runCatchup(ctx) @@ -84,13 +85,13 @@ func TestSubscriber_RunCatchup(t *testing.T) { Run(func(args mock.Arguments) { callCount++ }). - Return(errors.New("network failure")).Once() + Return([]common.DAHeightEvent(nil), errors.New("network failure")).Once() mockHandler.On("HandleCatchup", mock.Anything, uint64(100)). Run(func(args mock.Arguments) { callCount++ }). - Return(nil).Once() + Return([]common.DAHeightEvent(nil), nil).Once() sub.runCatchup(ctx) @@ -101,6 +102,44 @@ func TestSubscriber_RunCatchup(t *testing.T) { }) } +func TestSubscriber_RewindTo(t *testing.T) { + t.Run("no_op_when_target_is_equal_or_higher", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(100) + + sub.RewindTo(100) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + + sub.RewindTo(200) + assert.Equal(t, uint64(100), sub.LocalDAHeight()) + }) + + t.Run("rewinds_local_height_and_clears_head", func(t *testing.T) { + sub := NewSubscriber(SubscriberConfig{ + Client: testmocks.NewMockClient(t), + Logger: zerolog.Nop(), + Handler: new(MockSubscriberHandler), + Namespaces: [][]byte{[]byte("ns")}, + StartHeight: 100, + DABlockTime: time.Millisecond, + }) + sub.localDAHeight.Store(150) + sub.headReached.Store(true) + + sub.RewindTo(120) + + assert.Equal(t, uint64(120), sub.LocalDAHeight()) + assert.False(t, sub.HasReachedHead()) + }) +} + func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() diff --git a/block/internal/syncing/da_follower.go b/block/internal/syncing/da_follower.go index 443fb876ae..70ce946570 100644 --- a/block/internal/syncing/da_follower.go +++ b/block/internal/syncing/da_follower.go @@ -22,14 +22,19 @@ type DAFollower interface { HasReachedHead() bool // QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints). QueuePriorityHeight(daHeight uint64) + // RewindTo moves the subscriber back to a lower DA height so it + // re-fetches from there. Used by the syncer for walkback when a gap + // is detected between the node height and the DA events. + RewindTo(daHeight uint64) } // daFollower is the concrete implementation of DAFollower. type daFollower struct { - subscriber *da.Subscriber - retriever DARetriever - eventSink common.EventSink - logger zerolog.Logger + subscriber *da.Subscriber + retriever DARetriever + eventSink common.EventSink + logger zerolog.Logger + startDAHeight uint64 // Priority queue for P2P hint heights (absorbed from DARetriever refactoring #2). priorityMu sync.Mutex @@ -48,6 +53,9 @@ type DAFollowerConfig struct { DataNamespace []byte // may be nil or equal to Namespace StartDAHeight uint64 DABlockTime time.Duration + // WalkbackChecker is forwarded to the underlying Subscriber. + // See da.SubscriberConfig.WalkbackChecker for details. + WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64 } // NewDAFollower creates a new daFollower. @@ -61,16 +69,18 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower { retriever: cfg.Retriever, eventSink: cfg.EventSink, logger: cfg.Logger.With().Str("component", "da_follower").Logger(), + startDAHeight: cfg.StartDAHeight, priorityHeights: make([]uint64, 0), } f.subscriber = da.NewSubscriber(da.SubscriberConfig{ - Client: cfg.Client, - Logger: cfg.Logger, - Namespaces: [][]byte{cfg.Namespace, dataNs}, - DABlockTime: cfg.DABlockTime, - Handler: f, - StartHeight: cfg.StartDAHeight, + Client: cfg.Client, + Logger: cfg.Logger, + Namespaces: [][]byte{cfg.Namespace, dataNs}, + DABlockTime: cfg.DABlockTime, + Handler: f, + StartHeight: cfg.StartDAHeight, + WalkbackChecker: cfg.WalkbackChecker, }) return f @@ -91,6 +101,11 @@ func (f *daFollower) HasReachedHead() bool { return f.subscriber.HasReachedHead() } +// RewindTo moves the subscriber back to a lower DA height for re-fetching. +func (f *daFollower) RewindTo(daHeight uint64) { + f.subscriber.RewindTo(daHeight) +} + // HandleEvent processes a subscription event. When the follower is // caught up (ev.Height == localDAHeight) and blobs are available, it processes // them inline — avoiding a DA re-fetch round trip. Otherwise, it just lets @@ -123,7 +138,7 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve // HandleCatchup retrieves events at a single DA height and pipes them // to the event sink. Checks priority heights first. -func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { +func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { // 1. Drain stale or future priority heights from P2P hints for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() { if priorityHeight < daHeight { @@ -134,46 +149,46 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error { Uint64("da_height", priorityHeight). Msg("fetching priority DA height from P2P hint") - if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { + if _, err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil { if errors.Is(err, datypes.ErrHeightFromFuture) { - // Priority hint points to a future height — silently ignore. f.logger.Debug().Uint64("priority_da_height", priorityHeight). Msg("priority hint is from future, ignoring") continue } - // Roll back so daHeight is attempted again next cycle after backoff. - return err + return nil, err } - break // continue with daHeight + break } // 2. Normal sequential fetch - if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil { - return err + events, err := f.fetchAndPipeHeight(ctx, daHeight) + if err != nil { + return nil, err } - return nil + + return events, nil } // fetchAndPipeHeight retrieves events at a single DA height and pipes them. // It does NOT handle ErrHeightFromFuture — callers must decide how to react // because the correct response depends on whether this is a normal sequential // catchup or a priority-hint fetch. -func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error { +func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { events, err := f.retriever.RetrieveFromDA(ctx, daHeight) if err != nil { if errors.Is(err, datypes.ErrBlobNotFound) { - return nil + return nil, nil } - return err + return nil, err } for _, event := range events { if err := f.eventSink.PipeEvent(ctx, event); err != nil { - return err + return nil, err } } - return nil + return events, nil } // QueuePriorityHeight queues a DA height for priority retrieval. diff --git a/block/internal/syncing/da_follower_test.go b/block/internal/syncing/da_follower_test.go index 710d2d81d0..802b685c5d 100644 --- a/block/internal/syncing/da_follower_test.go +++ b/block/internal/syncing/da_follower_test.go @@ -12,6 +12,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" datypes "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/types" ) func TestDAFollower_HandleEvent(t *testing.T) { @@ -164,7 +165,6 @@ func TestDAFollower_HandleCatchup(t *testing.T) { initialPriorityHeights: []uint64{99}, wantPipedHeights: []uint64{100}, setupMock: func(m *MockDARetriever) { - // stale priority hint (< daHeight) is discarded; only sequential height is fetched m.On("RetrieveFromDA", mock.Anything, uint64(100)). Return([]common.DAHeightEvent{{DaHeight: 100}}, nil).Once() }, @@ -179,7 +179,7 @@ func TestDAFollower_HandleCatchup(t *testing.T) { } follower, getPipedEvents := newFollower(t, s, daRetriever) - err := follower.HandleCatchup(t.Context(), s.daHeight) + events, err := follower.HandleCatchup(t.Context(), s.daHeight) if s.wantErrIs != nil { require.ErrorIs(t, err, s.wantErrIs) @@ -203,6 +203,8 @@ func TestDAFollower_HandleCatchup(t *testing.T) { } else { assert.Empty(t, follower.priorityHeights) } + + _ = events }) } } @@ -251,3 +253,7 @@ func makeRange(start, end uint64) []uint64 { } return out } + +func makeHeader(height uint64) *types.SignedHeader { + return &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: height}}} +} diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 9b0a965c5f..16c13807d5 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -82,6 +82,17 @@ type Syncer struct { daFollower DAFollower + // p2pStalled is set by p2pWorkerLoop when P2P genuinely fails (not + // cancelled by a DA event). Used by the walkback check to decide + // whether to rewind the DA follower. + p2pStalled atomic.Bool + + // walkbackActive is set when the syncer detects a gap between the + // node's block height and the DA events being processed. While active, + // the DA follower is rewound one height at a time until the gap is + // filled. + walkbackActive atomic.Bool + // Forced inclusion tracking forcedInclusionMu sync.RWMutex seenBlockTxs map[string]struct{} // SHA-256 hex of every tx seen in a DA-sourced block @@ -212,14 +223,15 @@ func (s *Syncer) Start(ctx context.Context) (err error) { // Start the DA follower (subscribe + catchup) and other workers s.daFollower = NewDAFollower(DAFollowerConfig{ - Client: s.daClient, - Retriever: s.daRetriever, - Logger: s.logger, - EventSink: s, - Namespace: s.daClient.GetHeaderNamespace(), - DataNamespace: s.daClient.GetDataNamespace(), - StartDAHeight: s.daRetrieverHeight.Load(), - DABlockTime: s.config.DA.BlockTime.Duration, + Client: s.daClient, + Retriever: s.daRetriever, + Logger: s.logger, + EventSink: s, + Namespace: s.daClient.GetHeaderNamespace(), + DataNamespace: s.daClient.GetDataNamespace(), + StartDAHeight: s.daRetrieverHeight.Load(), + DABlockTime: s.config.DA.BlockTime.Duration, + WalkbackChecker: s.walkbackCheck, }) if err = s.daFollower.Start(ctx); err != nil { return fmt.Errorf("failed to start DA follower: %w", err) @@ -488,6 +500,7 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { } if waitCtx.Err() == nil { + s.p2pStalled.Store(true) logger.Warn().Err(err).Uint64("height", targetHeight).Msg("P2P handler failed to process height") } @@ -497,6 +510,8 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { continue } + s.p2pStalled.Store(false) + if err := s.waitForStoreHeight(ctx, targetHeight); err != nil { if errors.Is(err, context.Canceled) { return @@ -506,6 +521,55 @@ func (s *Syncer) p2pWorkerLoop(ctx context.Context) { } } +// walkbackCheck is the WalkbackChecker callback for the DA follower's +// subscriber. It decides whether the subscriber should rewind to re-fetch +// previous DA heights, based on the gap between the node's block height +// and the block heights found in the DA events. +// +// Returns a DA height to rewind to, or 0 to continue normally. +func (s *Syncer) walkbackCheck(daHeight uint64, events []common.DAHeightEvent) uint64 { + if !s.p2pStalled.Load() { + s.walkbackActive.Store(false) + return 0 + } + + if daHeight <= s.daRetrieverHeight.Load() { + return 0 + } + + nodeHeight, err := s.store.Height(s.ctx) + if err != nil { + return 0 + } + + needsWalkback := s.walkbackActive.Load() + if len(events) > 0 { + minHeight := events[0].Header.Height() + for _, e := range events[1:] { + if e.Header.Height() < minHeight { + minHeight = e.Header.Height() + } + } + if minHeight <= nodeHeight+1 { + s.walkbackActive.Store(false) + return 0 + } + needsWalkback = true + } + + if needsWalkback { + s.walkbackActive.Store(true) + s.logger.Info(). + Uint64("da_height", daHeight). + Uint64("node_height", nodeHeight). + Int("events", len(events)). + Msg("P2P stalled with gap between DA blocks and node height, walking DA follower back") + return daHeight - 1 + } + + return 0 +} + func (s *Syncer) waitForGenesis() bool { if delay := time.Until(s.genesis.StartTime); delay > 0 { timer := time.NewTimer(delay) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..c8b1b48b90 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -1454,3 +1454,122 @@ func TestSyncer_Stop_DrainWorksWithoutCriticalError(t *testing.T) { mockExec.AssertExpectations(t) }) } + +func TestSyncer_walkbackCheck(t *testing.T) { + makeEvents := func(heights ...uint64) []common.DAHeightEvent { + events := make([]common.DAHeightEvent, len(heights)) + for i, h := range heights { + events[i] = common.DAHeightEvent{Header: makeHeader(h)} + } + return events + } + + t.Run("returns_zero_when_p2p_not_stalled", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + + got := s.walkbackCheck(100, makeEvents(50)) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) + + t.Run("returns_zero_at_startDAHeight", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + s.store = st + s.cache = cm + + got := s.walkbackCheck(1, makeEvents(50)) + assert.Equal(t, uint64(0), got) + }) + + t.Run("activates_walkback_when_gap_detected", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + + got := s.walkbackCheck(100, makeEvents(50)) + assert.Equal(t, uint64(99), got) + assert.True(t, s.walkbackActive.Load()) + }) + + t.Run("continues_walkback_on_empty_events", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(99, nil) + assert.Equal(t, uint64(98), got) + assert.True(t, s.walkbackActive.Load()) + }) + + t.Run("stops_walkback_when_contiguous", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SetHeight(40)) + require.NoError(t, batch.Commit()) + + s := &Syncer{ + ctx: t.Context(), + store: st, + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.p2pStalled.Store(true) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(95, makeEvents(41)) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) + + t.Run("clears_walkback_when_p2p_recovers", func(t *testing.T) { + s := &Syncer{ + ctx: t.Context(), + } + s.daRetrieverHeight = &atomic.Uint64{} + s.daRetrieverHeight.Store(1) + s.walkbackActive.Store(true) + + got := s.walkbackCheck(100, nil) + assert.Equal(t, uint64(0), got) + assert.False(t, s.walkbackActive.Load()) + }) +}