From 1bd9a299242a07637f2cd788e8166b5d0256bf34 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 2 Apr 2026 19:52:39 +0200 Subject: [PATCH 1/4] feat: replace TxPoller polling with SSE streaming Switch TxPoller from 1s timer-based polling to SSE streaming for real-time transaction delivery. The new lifecycle: 1. Full fetch of all transactions at startup 2. SSE stream for real-time new transaction delivery 3. Full refetch on each block environment change Adds exponential backoff (1s-30s) on SSE reconnection to prevent tight loops when the endpoint is unavailable. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 17 +++- Cargo.toml | 12 +-- src/tasks/cache/system.rs | 2 +- src/tasks/cache/tx.rs | 171 ++++++++++++++++++++++++-------------- tests/tx_poller_test.rs | 13 ++- 5 files changed, 135 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93d1ce79..95da644f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,6 +3780,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -10473,9 +10484,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" dependencies = [ "aws-lc-rs", "log", @@ -11189,9 +11200,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba06dc92c72b877f042ba141c7ee553c68aab1c6e077a3ba1adb2eae17fdaff4" dependencies = [ "alloy", + "eventsource-stream", "futures-util", "reqwest", "serde", + "serde_json", "signet-bundle", "signet-constants", "signet-types", diff --git a/Cargo.toml b/Cargo.toml index f2e39007..10104edc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,13 +21,13 @@ name = "zenith-builder-example" path = "bin/builder.rs" [dependencies] -init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } +init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] } -signet-constants = { version = "0.16.0-rc.16" } -signet-sim = { version = "0.16.0-rc.16" } -signet-tx-cache = { version = "0.16.0-rc.16" } -signet-types = { version = "0.16.0-rc.16" } -signet-zenith = { version = "0.16.0-rc.16" } +signet-constants = { version = "0.16.0" } +signet-sim = { version = "0.16.0" } +signet-tx-cache = { version = "0.16.0" } +signet-types = { version = "0.16.0" } +signet-zenith = { version = "0.16.0" } signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 81223a36..89698a4d 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -23,7 +23,7 @@ impl CacheTasks { /// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s. pub fn spawn(&self) -> CacheSystem { // Tx Poller pulls transactions from the cache - let tx_poller = TxPoller::new(); + let tx_poller = TxPoller::new(self.block_env.clone()); let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 25a481d9..5459edab 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,59 +1,46 @@ //! Transaction service responsible for fetching and sending transactions to the simulator. -use crate::config::BuilderConfig; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::time::Duration; -use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace, trace_span}; +use std::{pin::Pin, time::Duration}; +use tokio::{sync::{mpsc, watch}, task::JoinHandle, time}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; -/// Poll interval for the transaction poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; /// Implements a poller for the block builder to pull transactions from the /// transaction pool. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, /// Client for the tx cache. tx_cache: TxCache, - /// Defines the interval at which the service should poll the cache. - poll_interval_ms: u64, -} - -impl Default for TxPoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool -/// and sends them into the provided channel sender. +/// [`TxPoller`] fetches transactions from the transaction pool on startup +/// and on each block environment change, and subscribes to an SSE stream +/// for real-time delivery of new transactions in between. impl TxPoller { - /// Returns a new [`TxPoller`] with the given config. - /// * Defaults to 1000ms poll interval (1s). - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } + const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); + const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); - /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`TxPoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, poll_interval_ms } - } - - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) + Self { config, tx_cache, envs } } - // Spawn a tokio task to check the nonce of a transaction before sending - // it to the cachetask via the outbound channel. + /// Spawn a tokio task to check the nonce of a transaction before sending + /// it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); @@ -95,46 +82,104 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions, paginating through all available pages. - pub async fn check_tx_cache(&self) -> Result, TxCacheError> { - self.tx_cache.stream_transactions().try_collect().await + /// Fetches all transactions from the cache, forwarding each to nonce + /// checking before it reaches the [`CacheTask`]. + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); + + counter!("signet.builder.cache.tx_poll_count").increment(1); + if let Ok(transactions) = self + .tx_cache + .stream_transactions() + .try_collect::>() + .inspect_err(|error| { + counter!("signet.builder.cache.tx_poll_errors").increment(1); + debug!(%error, "Error fetching transactions"); + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); + } + } } - async fn task_future(self, outbound: mpsc::UnboundedSender) { - loop { - let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); - - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; + /// Opens an SSE subscription to the transaction feed. Returns an empty + /// stream on connection failure so the caller can handle reconnection + /// uniformly. + async fn subscribe(&self) -> SseStream { + match self.tx_cache.subscribe_transactions().await { + Ok(stream) => { + debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"); + Box::pin(stream) + } + Err(error) => { + warn!(%error, "Failed to open SSE transaction subscription"); + Box::pin(futures_util::stream::empty()) } + } + } - crate::metrics::inc_tx_poll_count(); - if let Ok(transactions) = self - .check_tx_cache() - .inspect_err(|error| { - crate::metrics::inc_tx_poll_errors(); - debug!(%error, "Error fetching transactions"); - }) - .instrument(span.clone()) - .await - { - let _guard = span.entered(); - crate::metrics::record_txs_fetched(transactions.len()); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions.into_iter() { + /// Reconnects the SSE stream with backoff. Performs a full refetch to + /// cover any items missed while disconnected. + async fn reconnect( + &self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + time::sleep(*backoff).await; + *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); + self.full_fetch(outbound).await; + self.subscribe().await + } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + // Initial full fetch of all transactions currently in the cache. + self.full_fetch(&outbound).await; + + // Open the SSE stream for real-time delivery of new transactions. + let mut sse_stream = self.subscribe().await; + let mut backoff = Self::INITIAL_RECONNECT_BACKOFF; + + loop { + tokio::select! { + item = sse_stream.next() => { + let Some(result) = item else { + warn!("SSE transaction stream ended, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + let Ok(tx) = result else { + warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + continue; + }; + backoff = Self::INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } self.spawn_check_nonce(tx, outbound.clone()); } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); + break; + } + trace!("Block env changed, refetching all transactions"); + self.full_fetch(&outbound).await; + } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to - /// its sender. + /// Spawns a task that fetches all current transactions, then subscribes + /// to the SSE feed for real-time updates, refetching on each new block + /// environment. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index d607d8c6..40ef31c6 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,11 +1,10 @@ #![cfg(feature = "test-utils")] use alloy::{primitives::U256, signers::local::PrivateKeySigner}; -use builder::{ - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, -}; +use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; use eyre::{Ok, Result}; +use futures_util::TryStreamExt; +use signet_tx_cache::TxCache; #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> { // Post a transaction to the cache post_tx().await?; - // Create a new poller - let poller = TxPoller::new(); - // Fetch transactions from the pool - let transactions = poller.check_tx_cache().await?; + let tx_cache = TxCache::new(builder::config().tx_pool_url.clone()); + let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?; // Ensure at least one transaction exists assert!(!transactions.is_empty()); From 874ef23e390f9ab03ff39d1bc5784f836948bd35 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 2 Apr 2026 20:03:00 +0200 Subject: [PATCH 2/4] fix: rustfmt and rustdoc CI failures Expand tokio import for nightly rustfmt, remove unresolved `CacheTask` rustdoc link. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tasks/cache/tx.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 5459edab..d92ed251 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -8,7 +8,11 @@ use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; use std::{pin::Pin, time::Duration}; -use tokio::{sync::{mpsc, watch}, task::JoinHandle, time}; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, + time, +}; use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; type SseStream = Pin> + Send>>; @@ -83,7 +87,7 @@ impl TxPoller { } /// Fetches all transactions from the cache, forwarding each to nonce - /// checking before it reaches the [`CacheTask`]. + /// checking before it reaches the cache task. async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); From 162545bdb4da0021ced477a933c0ff903043324a Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 15 Apr 2026 18:35:35 +0200 Subject: [PATCH 3/4] fix: make TxPoller reconnect responsive to block env changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Race the backoff sleep against envs.changed() so a block env change arriving during reconnect cuts the sleep short, instead of buffering up to 30s while the simulator operates on a stale cache. Also replace the nested let-else + unwrap_err in the SSE arm with a single match — no behavior change, drops the double-unwrap. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/tasks/cache/tx.rs | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index d92ed251..06701657 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -131,11 +131,16 @@ impl TxPoller { /// Reconnects the SSE stream with backoff. Performs a full refetch to /// cover any items missed while disconnected. async fn reconnect( - &self, + &mut self, outbound: &mpsc::UnboundedSender, backoff: &mut Duration, ) -> SseStream { - time::sleep(*backoff).await; + tokio::select! { + _ = time::sleep(*backoff) => {} + // Break the sleep early on block env change or channel close — + // full_fetch below serves the same purpose the env arm would have. + _ = self.envs.changed() => {} + } *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); self.full_fetch(outbound).await; self.subscribe().await @@ -152,22 +157,24 @@ impl TxPoller { loop { tokio::select! { item = sse_stream.next() => { - let Some(result) = item else { - warn!("SSE transaction stream ended, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - continue; - }; - let Ok(tx) = result else { - warn!(error = %result.unwrap_err(), "SSE transaction stream error, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - continue; - }; - backoff = Self::INITIAL_RECONNECT_BACKOFF; - if outbound.is_closed() { - trace!("No receivers left, shutting down"); - break; + match item { + Some(Ok(tx)) => { + backoff = Self::INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + break; + } + self.spawn_check_nonce(tx, outbound.clone()); + } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + sse_stream = self.reconnect(&outbound, &mut backoff).await; + } } - self.spawn_check_nonce(tx, outbound.clone()); } res = self.envs.changed() => { if res.is_err() { From 1809ad13f5ae055432736e632860b0b742d324cd Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 23 Apr 2026 21:04:27 +0200 Subject: [PATCH 4/4] refactor: apply PR #259 review feedback Addresses the obvious nits from prestwich's review on src/tasks/cache/tx.rs: - Move backoff constants to module level (was assoc consts) - Use crate::metrics::inc_*/record_* helpers instead of bare counter!/histogram! macros (matches #263's metrics module) - Rewrite subscribe() as a combinator chain over the Result - Bias the select! in reconnect() so env changes preempt the backoff sleep, with an inline rationale - Run full_fetch and subscribe concurrently via tokio::join! in reconnect() - Extract handle_sse_item helper to flatten the nested match inside task_future's SSE select arm Deferred to a follow-up (need design decisions): - Split full_fetch into fetch + dispatch with better name - Replace inline backoff with backon + permanence criterion - Rename SDK stream_transactions/subscribe_transactions Co-Authored-By: Claude Opus 4.7 (1M context) --- src/tasks/cache/tx.rs | 104 +++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 42 deletions(-) diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 06701657..7f9728d4 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -5,9 +5,8 @@ use alloy::{ providers::Provider, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; -use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::{pin::Pin, time::Duration}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, @@ -17,6 +16,9 @@ use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; type SseStream = Pin> + Send>>; +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); + /// Implements a poller for the block builder to pull transactions from the /// transaction pool. #[derive(Debug)] @@ -33,9 +35,6 @@ pub struct TxPoller { /// and on each block environment change, and subscribes to an SSE stream /// for real-time delivery of new transactions in between. impl TxPoller { - const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); - const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); - /// Returns a new [`TxPoller`] with the given block environment receiver. pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); @@ -91,20 +90,20 @@ impl TxPoller { async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); - counter!("signet.builder.cache.tx_poll_count").increment(1); + crate::metrics::inc_tx_poll_count(); if let Ok(transactions) = self .tx_cache .stream_transactions() .try_collect::>() .inspect_err(|error| { - counter!("signet.builder.cache.tx_poll_errors").increment(1); + crate::metrics::inc_tx_poll_errors(); debug!(%error, "Error fetching transactions"); }) .instrument(span.clone()) .await { let _guard = span.entered(); - histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64); + crate::metrics::record_txs_fetched(transactions.len()); trace!(count = transactions.len(), "found transactions"); for tx in transactions { self.spawn_check_nonce(tx, outbound.clone()); @@ -116,16 +115,13 @@ impl TxPoller { /// stream on connection failure so the caller can handle reconnection /// uniformly. async fn subscribe(&self) -> SseStream { - match self.tx_cache.subscribe_transactions().await { - Ok(stream) => { - debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established"); - Box::pin(stream) - } - Err(error) => { - warn!(%error, "Failed to open SSE transaction subscription"); - Box::pin(futures_util::stream::empty()) - } - } + self.tx_cache + .subscribe_transactions() + .await + .inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established")) + .inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription")) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) } /// Reconnects the SSE stream with backoff. Performs a full refetch to @@ -136,14 +132,49 @@ impl TxPoller { backoff: &mut Duration, ) -> SseStream { tokio::select! { - _ = time::sleep(*backoff) => {} - // Break the sleep early on block env change or channel close — - // full_fetch below serves the same purpose the env arm would have. + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + stream + } + + /// Processes a single item yielded by the SSE stream: dispatches the tx + /// for nonce checking on success, or reconnects on error / stream end. + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(tx)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); + } + self.spawn_check_nonce(tx, outbound.clone()); + } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } } - *backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF); - self.full_fetch(outbound).await; - self.subscribe().await + ControlFlow::Continue(()) } async fn task_future(mut self, outbound: mpsc::UnboundedSender) { @@ -152,28 +183,17 @@ impl TxPoller { // Open the SSE stream for real-time delivery of new transactions. let mut sse_stream = self.subscribe().await; - let mut backoff = Self::INITIAL_RECONNECT_BACKOFF; + let mut backoff = INITIAL_RECONNECT_BACKOFF; loop { tokio::select! { item = sse_stream.next() => { - match item { - Some(Ok(tx)) => { - backoff = Self::INITIAL_RECONNECT_BACKOFF; - if outbound.is_closed() { - trace!("No receivers left, shutting down"); - break; - } - self.spawn_check_nonce(tx, outbound.clone()); - } - Some(Err(error)) => { - warn!(%error, "SSE transaction stream error, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - } - None => { - warn!("SSE transaction stream ended, reconnecting"); - sse_stream = self.reconnect(&outbound, &mut backoff).await; - } + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; } } res = self.envs.changed() => {