Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changes

- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262)
- Add `sequencer_blocks_synchronized_total` Prometheus counter metric tracking blocks synced by source (DA/P2P) [#3259](https://github.com/evstack/ev-node/pull/3259)
- Make it easier to override `DefaultMaxBlobSize` by ldflags [#3235](https://github.com/evstack/ev-node/pull/3235)
- Add solo sequencer (simple in memory single sequencer without force inclusion) [#3235](https://github.com/evstack/ev-node/pull/3235)
Expand Down
13 changes: 7 additions & 6 deletions block/internal/da/async_block_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog"
"google.golang.org/protobuf/proto"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
)
Expand Down Expand Up @@ -186,31 +187,31 @@ func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.Subscr

// HandleCatchup fetches a single height via Retrieve and caches it.
// Also applies the prefetch window for speculative forward fetching.
func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) error {
func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
if err := ctx.Err(); err != nil {
return err
return nil, err
}

if _, err := f.cache.Get(ctx, newBlockDataKey(daHeight)); err != nil {
if err := f.fetchAndCacheBlock(ctx, daHeight); err != nil {
return err
return nil, err
}
}
// Speculatively prefetch ahead.
target := daHeight + f.prefetchWindow
for h := daHeight + 1; h <= target; h++ {
if err := ctx.Err(); err != nil {
return err
return nil, err
}
if _, err := f.cache.Get(ctx, newBlockDataKey(h)); err == nil {
continue // Already cached.
}
if err := f.fetchAndCacheBlock(ctx, h); err != nil {
return err
return nil, err
}
}

return nil
return nil, nil
}

// fetchAndCacheBlock fetches a block via Retrieve and caches it.
Expand Down
48 changes: 42 additions & 6 deletions block/internal/da/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
)

Expand All @@ -23,9 +24,13 @@ type SubscriberHandler interface {
HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error

// HandleCatchup is called for each height during sequential catchup.
// The subscriber advances localDAHeight only after this returns (true, nil).
// The subscriber advances localDAHeight only after this returns nil.
// Returning an error rolls back localDAHeight and triggers a backoff retry.
HandleCatchup(ctx context.Context, height uint64) error
// The returned events are the DAHeightEvents produced for this height
// (may be nil/empty). The subscriber does not interpret them; they are
// returned so that higher-level callers (via the Subscriber) can inspect
// the results without coupling to the handler's internals.
HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error)
}

// SubscriberConfig holds configuration for creating a Subscriber.
Expand All @@ -39,6 +44,14 @@ type SubscriberConfig struct {
FetchBlockTimestamp bool // the timestamp comes with an extra api call before Celestia v0.29.1-mocha.

StartHeight uint64 // initial localDAHeight

// WalkbackChecker is an optional callback invoked after each successful
// HandleCatchup call. It receives the DA height just processed and the
// events returned by the handler. If it returns a non-zero DA height,
// the subscriber rewinds to that height so it re-fetches on the next
// iteration. Return 0 to continue normally.
// This is nil for subscribers that don't need walkback (e.g. async block retriever).
WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64
}

// Subscriber is a shared DA subscription primitive that encapsulates the
Expand All @@ -48,9 +61,10 @@ type SubscriberConfig struct {
//
// Used by both DAFollower (syncing) and asyncBlockRetriever (forced inclusion).
type Subscriber struct {
client Client
logger zerolog.Logger
handler SubscriberHandler
client Client
logger zerolog.Logger
handler SubscriberHandler
walkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64

// namespaces to subscribe on. When multiple, they are merged.
namespaces [][]byte
Expand Down Expand Up @@ -91,6 +105,7 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber {
client: cfg.Client,
logger: cfg.Logger,
handler: cfg.Handler,
walkbackChecker: cfg.WalkbackChecker,
namespaces: cfg.Namespaces,
catchupSignal: make(chan struct{}, 1),
daBlockTime: cfg.DABlockTime,
Expand Down Expand Up @@ -159,6 +174,23 @@ func (s *Subscriber) HasReachedHead() bool {
return s.headReached.Load()
}

// RewindTo sets localDAHeight back to the given height and signals the catchup
// loop so that DA heights are re-fetched. This is used when the primary source
// (P2P) stalls and DA needs to take over for the missing range.
func (s *Subscriber) RewindTo(daHeight uint64) {
for {
cur := s.localDAHeight.Load()
if daHeight >= cur {
return
}
if s.localDAHeight.CompareAndSwap(cur, daHeight) {
s.headReached.Store(false)
s.signalCatchup()
return
}
}
}

// signalCatchup sends a non-blocking signal to wake catchupLoop.
func (s *Subscriber) signalCatchup() {
select {
Expand Down Expand Up @@ -356,7 +388,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
continue
}

if err := s.handler.HandleCatchup(ctx, local); err != nil {
if events, err := s.handler.HandleCatchup(ctx, local); err != nil {
// Roll back so we can retry after backoff.
s.localDAHeight.Store(local)
if errors.Is(err, datypes.ErrHeightFromFuture) && local >= highest {
Expand All @@ -366,6 +398,10 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
if !s.shouldContinueCatchup(ctx, err, local) {
return
}
} else if s.walkbackChecker != nil {
if rewindTo := s.walkbackChecker(local, events); rewindTo > 0 {
s.RewindTo(rewindTo)
}
}
}
}
Expand Down
51 changes: 45 additions & 6 deletions block/internal/da/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
testmocks "github.com/evstack/ev-node/test/mocks"
)
Expand All @@ -24,9 +25,9 @@ func (m *MockSubscriberHandler) HandleEvent(ctx context.Context, ev datypes.Subs
return args.Error(0)
}

func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) error {
func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) {
args := m.Called(ctx, height)
return args.Error(0)
return args.Get(0).([]common.DAHeightEvent), args.Error(1)
}

func TestSubscriber_RunCatchup(t *testing.T) {
Expand All @@ -49,8 +50,8 @@ func TestSubscriber_RunCatchup(t *testing.T) {
// It should process observed heights [100..101] then stop when local passes highestSeen.
sub.updateHighest(101)
sub.seenSubscriptionEvent.Store(true)
mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return(nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return(nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return([]common.DAHeightEvent(nil), nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return([]common.DAHeightEvent(nil), nil).Once()

sub.runCatchup(ctx)

Expand Down Expand Up @@ -84,13 +85,13 @@ func TestSubscriber_RunCatchup(t *testing.T) {
Run(func(args mock.Arguments) {
callCount++
}).
Return(errors.New("network failure")).Once()
Return([]common.DAHeightEvent(nil), errors.New("network failure")).Once()

mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).
Run(func(args mock.Arguments) {
callCount++
}).
Return(nil).Once()
Return([]common.DAHeightEvent(nil), nil).Once()

sub.runCatchup(ctx)

Expand All @@ -101,6 +102,44 @@ func TestSubscriber_RunCatchup(t *testing.T) {
})
}

func TestSubscriber_RewindTo(t *testing.T) {
t.Run("no_op_when_target_is_equal_or_higher", func(t *testing.T) {
sub := NewSubscriber(SubscriberConfig{
Client: testmocks.NewMockClient(t),
Logger: zerolog.Nop(),
Handler: new(MockSubscriberHandler),
Namespaces: [][]byte{[]byte("ns")},
StartHeight: 100,
DABlockTime: time.Millisecond,
})
sub.localDAHeight.Store(100)

sub.RewindTo(100)
assert.Equal(t, uint64(100), sub.LocalDAHeight())

sub.RewindTo(200)
assert.Equal(t, uint64(100), sub.LocalDAHeight())
})

t.Run("rewinds_local_height_and_clears_head", func(t *testing.T) {
sub := NewSubscriber(SubscriberConfig{
Client: testmocks.NewMockClient(t),
Logger: zerolog.Nop(),
Handler: new(MockSubscriberHandler),
Namespaces: [][]byte{[]byte("ns")},
StartHeight: 100,
DABlockTime: time.Millisecond,
})
sub.localDAHeight.Store(150)
sub.headReached.Store(true)

sub.RewindTo(120)

assert.Equal(t, uint64(120), sub.LocalDAHeight())
assert.False(t, sub.HasReachedHead())
})
}

func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
Expand Down
63 changes: 39 additions & 24 deletions block/internal/syncing/da_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ type DAFollower interface {
HasReachedHead() bool
// QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints).
QueuePriorityHeight(daHeight uint64)
// RewindTo moves the subscriber back to a lower DA height so it
// re-fetches from there. Used by the syncer for walkback when a gap
// is detected between the node height and the DA events.
RewindTo(daHeight uint64)
}

// daFollower is the concrete implementation of DAFollower.
type daFollower struct {
subscriber *da.Subscriber
retriever DARetriever
eventSink common.EventSink
logger zerolog.Logger
subscriber *da.Subscriber
retriever DARetriever
eventSink common.EventSink
logger zerolog.Logger
startDAHeight uint64

// Priority queue for P2P hint heights (absorbed from DARetriever refactoring #2).
priorityMu sync.Mutex
Expand All @@ -48,6 +53,9 @@ type DAFollowerConfig struct {
DataNamespace []byte // may be nil or equal to Namespace
StartDAHeight uint64
DABlockTime time.Duration
// WalkbackChecker is forwarded to the underlying Subscriber.
// See da.SubscriberConfig.WalkbackChecker for details.
WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64
}

// NewDAFollower creates a new daFollower.
Expand All @@ -61,16 +69,18 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower {
retriever: cfg.Retriever,
eventSink: cfg.EventSink,
logger: cfg.Logger.With().Str("component", "da_follower").Logger(),
startDAHeight: cfg.StartDAHeight,
priorityHeights: make([]uint64, 0),
}

f.subscriber = da.NewSubscriber(da.SubscriberConfig{
Client: cfg.Client,
Logger: cfg.Logger,
Namespaces: [][]byte{cfg.Namespace, dataNs},
DABlockTime: cfg.DABlockTime,
Handler: f,
StartHeight: cfg.StartDAHeight,
Client: cfg.Client,
Logger: cfg.Logger,
Namespaces: [][]byte{cfg.Namespace, dataNs},
DABlockTime: cfg.DABlockTime,
Handler: f,
StartHeight: cfg.StartDAHeight,
WalkbackChecker: cfg.WalkbackChecker,
})

return f
Expand All @@ -91,6 +101,11 @@ func (f *daFollower) HasReachedHead() bool {
return f.subscriber.HasReachedHead()
}

// RewindTo moves the subscriber back to a lower DA height for re-fetching.
func (f *daFollower) RewindTo(daHeight uint64) {
f.subscriber.RewindTo(daHeight)
}

// HandleEvent processes a subscription event. When the follower is
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just lets
Expand Down Expand Up @@ -123,7 +138,7 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve

// HandleCatchup retrieves events at a single DA height and pipes them
// to the event sink. Checks priority heights first.
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
// 1. Drain stale or future priority heights from P2P hints
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
if priorityHeight < daHeight {
Expand All @@ -134,46 +149,46 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")

if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if _, err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if errors.Is(err, datypes.ErrHeightFromFuture) {
// Priority hint points to a future height — silently ignore.
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
Msg("priority hint is from future, ignoring")
continue
}
// Roll back so daHeight is attempted again next cycle after backoff.
return err
return nil, err
}
break // continue with daHeight
break
}

// 2. Normal sequential fetch
if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil {
return err
events, err := f.fetchAndPipeHeight(ctx, daHeight)
if err != nil {
return nil, err
}
return nil

return events, nil
}

// fetchAndPipeHeight retrieves events at a single DA height and pipes them.
// It does NOT handle ErrHeightFromFuture — callers must decide how to react
// because the correct response depends on whether this is a normal sequential
// catchup or a priority-hint fetch.
func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error {
func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
events, err := f.retriever.RetrieveFromDA(ctx, daHeight)
if err != nil {
if errors.Is(err, datypes.ErrBlobNotFound) {
return nil
return nil, nil
}
return err
return nil, err
}

for _, event := range events {
if err := f.eventSink.PipeEvent(ctx, event); err != nil {
return err
return nil, err
}
}

return nil
return events, nil
}

// QueuePriorityHeight queues a DA height for priority retrieval.
Expand Down
Loading
Loading