diff --git a/Cargo.lock b/Cargo.lock index 93d1ce79..95da644f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3780,6 +3780,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -10473,9 +10484,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" dependencies = [ "aws-lc-rs", "log", @@ -11189,9 +11200,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba06dc92c72b877f042ba141c7ee553c68aab1c6e077a3ba1adb2eae17fdaff4" dependencies = [ "alloy", + "eventsource-stream", "futures-util", "reqwest", "serde", + "serde_json", "signet-bundle", "signet-constants", "signet-types", diff --git a/Cargo.toml b/Cargo.toml index f2e39007..10104edc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,13 +21,13 @@ name = "zenith-builder-example" path = "bin/builder.rs" [dependencies] -init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } +init4-bin-base = { version = "0.18.0", features = ["perms", "aws", "pylon", "sse"] } -signet-constants = { version = "0.16.0-rc.16" } -signet-sim = { version = "0.16.0-rc.16" } -signet-tx-cache = { version = "0.16.0-rc.16" } -signet-types = { version = "0.16.0-rc.16" } -signet-zenith = { version = "0.16.0-rc.16" } +signet-constants = { version = "0.16.0" } +signet-sim = { version = "0.16.0" } +signet-tx-cache = { version = "0.16.0" } +signet-types = { version = "0.16.0" } +signet-zenith = { version = "0.16.0" } signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 81223a36..89698a4d 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -23,7 +23,7 @@ impl CacheTasks { /// [`CacheTask`], [`TxPoller`], and [`BundlePoller`] internally and yields their [`JoinHandle`]s. pub fn spawn(&self) -> CacheSystem { // Tx Poller pulls transactions from the cache - let tx_poller = TxPoller::new(); + let tx_poller = TxPoller::new(self.block_env.clone()); let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 25a481d9..7f9728d4 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -1,59 +1,49 @@ //! Transaction service responsible for fetching and sending transactions to the simulator. -use crate::config::BuilderConfig; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::{TryFutureExt, TryStreamExt}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use signet_tx_cache::{TxCache, TxCacheError}; -use std::time::Duration; -use tokio::{sync::mpsc, task::JoinHandle, time}; -use tracing::{Instrument, debug, debug_span, trace, trace_span}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, + time, +}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; + +type SseStream = Pin> + Send>>; -/// Poll interval for the transaction poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); /// Implements a poller for the block builder to pull transactions from the /// transaction pool. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TxPoller { /// Config values from the Builder. config: &'static BuilderConfig, /// Client for the tx cache. tx_cache: TxCache, - /// Defines the interval at which the service should poll the cache. - poll_interval_ms: u64, -} - -impl Default for TxPoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool -/// and sends them into the provided channel sender. +/// [`TxPoller`] fetches transactions from the transaction pool on startup +/// and on each block environment change, and subscribes to an SSE stream +/// for real-time delivery of new transactions in between. impl TxPoller { - /// Returns a new [`TxPoller`] with the given config. - /// * Defaults to 1000ms poll interval (1s). - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } - - /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`TxPoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = TxCache::new(config.tx_pool_url.clone()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) - } - - // Spawn a tokio task to check the nonce of a transaction before sending - // it to the cachetask via the outbound channel. + /// Spawn a tokio task to check the nonce of a transaction before sending + /// it to the cachetask via the outbound channel. fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); @@ -95,46 +85,132 @@ impl TxPoller { }); } - /// Polls the transaction cache for transactions, paginating through all available pages. - pub async fn check_tx_cache(&self) -> Result, TxCacheError> { - self.tx_cache.stream_transactions().try_collect().await + /// Fetches all transactions from the cache, forwarding each to nonce + /// checking before it reaches the cache task. + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); + + crate::metrics::inc_tx_poll_count(); + if let Ok(transactions) = self + .tx_cache + .stream_transactions() + .try_collect::>() + .inspect_err(|error| { + crate::metrics::inc_tx_poll_errors(); + debug!(%error, "Error fetching transactions"); + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + crate::metrics::record_txs_fetched(transactions.len()); + trace!(count = transactions.len(), "found transactions"); + for tx in transactions { + self.spawn_check_nonce(tx, outbound.clone()); + } + } } - async fn task_future(self, outbound: mpsc::UnboundedSender) { - loop { - let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); + /// Opens an SSE subscription to the transaction feed. Returns an empty + /// stream on connection failure so the caller can handle reconnection + /// uniformly. + async fn subscribe(&self) -> SseStream { + self.tx_cache + .subscribe_transactions() + .await + .inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established")) + .inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription")) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) + } - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; - } + /// Reconnects the SSE stream with backoff. Performs a full refetch to + /// cover any items missed while disconnected. + async fn reconnect( + &mut self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + stream + } - crate::metrics::inc_tx_poll_count(); - if let Ok(transactions) = self - .check_tx_cache() - .inspect_err(|error| { - crate::metrics::inc_tx_poll_errors(); - debug!(%error, "Error fetching transactions"); - }) - .instrument(span.clone()) - .await - { - let _guard = span.entered(); - crate::metrics::record_txs_fetched(transactions.len()); - trace!(count = transactions.len(), "found transactions"); - for tx in transactions.into_iter() { - self.spawn_check_nonce(tx, outbound.clone()); + /// Processes a single item yielded by the SSE stream: dispatches the tx + /// for nonce checking on success, or reconnects on error / stream end. + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(tx)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.is_closed() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); } + self.spawn_check_nonce(tx, outbound.clone()); } + Some(Err(error)) => { + warn!(%error, "SSE transaction stream error, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + None => { + warn!("SSE transaction stream ended, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + } + ControlFlow::Continue(()) + } + + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + // Initial full fetch of all transactions currently in the cache. + self.full_fetch(&outbound).await; - time::sleep(self.poll_duration()).await; + // Open the SSE stream for real-time delivery of new transactions. + let mut sse_stream = self.subscribe().await; + let mut backoff = INITIAL_RECONNECT_BACKOFF; + + loop { + tokio::select! { + item = sse_stream.next() => { + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; + } + } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); + break; + } + trace!("Block env changed, refetching all transactions"); + self.full_fetch(&outbound).await; + } + } } } - /// Spawns a task that continuously polls the cache for transactions and sends any it finds to - /// its sender. + /// Spawns a task that fetches all current transactions, then subscribes + /// to the SSE feed for real-time updates, refetching on each new block + /// environment. pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index d607d8c6..40ef31c6 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,11 +1,10 @@ #![cfg(feature = "test-utils")] use alloy::{primitives::U256, signers::local::PrivateKeySigner}; -use builder::{ - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, -}; +use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; use eyre::{Ok, Result}; +use futures_util::TryStreamExt; +use signet_tx_cache::TxCache; #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { @@ -15,11 +14,9 @@ async fn test_tx_roundtrip() -> Result<()> { // Post a transaction to the cache post_tx().await?; - // Create a new poller - let poller = TxPoller::new(); - // Fetch transactions from the pool - let transactions = poller.check_tx_cache().await?; + let tx_cache = TxCache::new(builder::config().tx_pool_url.clone()); + let transactions: Vec<_> = tx_cache.stream_transactions().try_collect().await?; // Ensure at least one transaction exists assert!(!transactions.is_empty());