-
Notifications
You must be signed in to change notification settings - Fork 2
feat: replace TxPoller polling with SSE streaming #259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Evalir
wants to merge
4
commits into
main
Choose a base branch
from
evalir/eop/sse-tx-poller
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+167
−81
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,59 +1,49 @@ | ||
| //! Transaction service responsible for fetching and sending transactions to the simulator. | ||
| use crate::config::BuilderConfig; | ||
| use crate::{config::BuilderConfig, tasks::env::SimEnv}; | ||
| use alloy::{ | ||
| consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, | ||
| providers::Provider, | ||
| }; | ||
| use futures_util::{TryFutureExt, TryStreamExt}; | ||
| use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; | ||
| use signet_tx_cache::{TxCache, TxCacheError}; | ||
| use std::time::Duration; | ||
| use tokio::{sync::mpsc, task::JoinHandle, time}; | ||
| use tracing::{Instrument, debug, debug_span, trace, trace_span}; | ||
| use std::{ops::ControlFlow, pin::Pin, time::Duration}; | ||
| use tokio::{ | ||
| sync::{mpsc, watch}, | ||
| task::JoinHandle, | ||
| time, | ||
| }; | ||
| use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; | ||
|
|
||
| type SseStream = Pin<Box<dyn Stream<Item = Result<TxEnvelope, TxCacheError>> + Send>>; | ||
|
|
||
| /// Poll interval for the transaction poller in milliseconds. | ||
| const POLL_INTERVAL_MS: u64 = 1000; | ||
| const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); | ||
| const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); | ||
|
|
||
| /// Implements a poller for the block builder to pull transactions from the | ||
| /// transaction pool. | ||
| #[derive(Debug, Clone)] | ||
| #[derive(Debug)] | ||
| pub struct TxPoller { | ||
| /// Config values from the Builder. | ||
| config: &'static BuilderConfig, | ||
| /// Client for the tx cache. | ||
| tx_cache: TxCache, | ||
| /// Defines the interval at which the service should poll the cache. | ||
| poll_interval_ms: u64, | ||
| } | ||
|
|
||
| impl Default for TxPoller { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| /// Receiver for block environment updates, used to trigger refetches. | ||
| envs: watch::Receiver<Option<SimEnv>>, | ||
| } | ||
|
|
||
| /// [`TxPoller`] implements a poller task that fetches transactions from the transaction pool | ||
| /// and sends them into the provided channel sender. | ||
| /// [`TxPoller`] fetches transactions from the transaction pool on startup | ||
| /// and on each block environment change, and subscribes to an SSE stream | ||
| /// for real-time delivery of new transactions in between. | ||
| impl TxPoller { | ||
| /// Returns a new [`TxPoller`] with the given config. | ||
| /// * Defaults to 1000ms poll interval (1s). | ||
| pub fn new() -> Self { | ||
| Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) | ||
| } | ||
|
|
||
| /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds. | ||
| pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { | ||
| /// Returns a new [`TxPoller`] with the given block environment receiver. | ||
| pub fn new(envs: watch::Receiver<Option<SimEnv>>) -> Self { | ||
| let config = crate::config(); | ||
| let tx_cache = TxCache::new(config.tx_pool_url.clone()); | ||
| Self { config, tx_cache, poll_interval_ms } | ||
| Self { config, tx_cache, envs } | ||
| } | ||
|
|
||
| /// Returns the poll duration as a [`Duration`]. | ||
| const fn poll_duration(&self) -> Duration { | ||
| Duration::from_millis(self.poll_interval_ms) | ||
| } | ||
|
|
||
| // Spawn a tokio task to check the nonce of a transaction before sending | ||
| // it to the cachetask via the outbound channel. | ||
| /// Spawn a tokio task to check the nonce of a transaction before sending | ||
| /// it to the cachetask via the outbound channel. | ||
| fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<ReceivedTx>) { | ||
| tokio::spawn(async move { | ||
| let span = debug_span!("check_nonce", tx_id = %tx.tx_hash()); | ||
|
|
@@ -95,46 +85,132 @@ impl TxPoller { | |
| }); | ||
| } | ||
|
|
||
| /// Polls the transaction cache for transactions, paginating through all available pages. | ||
| pub async fn check_tx_cache(&self) -> Result<Vec<TxEnvelope>, TxCacheError> { | ||
| self.tx_cache.stream_transactions().try_collect().await | ||
| /// Fetches all transactions from the cache, forwarding each to nonce | ||
| /// checking before it reaches the cache task. | ||
| async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) { | ||
| let span = trace_span!("TxPoller::full_fetch", url = %self.config.tx_pool_url); | ||
|
|
||
| crate::metrics::inc_tx_poll_count(); | ||
| if let Ok(transactions) = self | ||
| .tx_cache | ||
| .stream_transactions() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sdk API thing. we now have "stream transactions" and "subscribe", which are not clear about their behavior |
||
| .try_collect::<Vec<_>>() | ||
| .inspect_err(|error| { | ||
| crate::metrics::inc_tx_poll_errors(); | ||
| debug!(%error, "Error fetching transactions"); | ||
| }) | ||
| .instrument(span.clone()) | ||
| .await | ||
| { | ||
| let _guard = span.entered(); | ||
| crate::metrics::record_txs_fetched(transactions.len()); | ||
| trace!(count = transactions.len(), "found transactions"); | ||
| for tx in transactions { | ||
| self.spawn_check_nonce(tx, outbound.clone()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| async fn task_future(self, outbound: mpsc::UnboundedSender<ReceivedTx>) { | ||
| loop { | ||
| let span = trace_span!("TxPoller::loop", url = %self.config.tx_pool_url); | ||
| /// Opens an SSE subscription to the transaction feed. Returns an empty | ||
| /// stream on connection failure so the caller can handle reconnection | ||
| /// uniformly. | ||
| async fn subscribe(&self) -> SseStream { | ||
| self.tx_cache | ||
| .subscribe_transactions() | ||
| .await | ||
| .inspect(|_| debug!(url = %self.config.tx_pool_url, "SSE transaction subscription established")) | ||
| .inspect_err(|error| warn!(%error, "Failed to open SSE transaction subscription")) | ||
| .map(|s| Box::pin(s) as SseStream) | ||
| .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) | ||
| } | ||
|
|
||
| // Check this here to avoid making the web request if we know | ||
| // we don't need the results. | ||
| if outbound.is_closed() { | ||
| span.in_scope(|| trace!("No receivers left, shutting down")); | ||
| break; | ||
| } | ||
| /// Reconnects the SSE stream with backoff. Performs a full refetch to | ||
| /// cover any items missed while disconnected. | ||
| async fn reconnect( | ||
| &mut self, | ||
| outbound: &mpsc::UnboundedSender<ReceivedTx>, | ||
| backoff: &mut Duration, | ||
| ) -> SseStream { | ||
| tokio::select! { | ||
| // Biased: a block env change wins over the backoff sleep. An env | ||
| // change triggers a full refetch below anyway, which supersedes the | ||
| // sleep-then-reconnect path — so there's no point waiting out the | ||
| // backoff. | ||
| biased; | ||
| _ = self.envs.changed() => {} | ||
| _ = time::sleep(*backoff) => {} | ||
| } | ||
| *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); | ||
| let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); | ||
| stream | ||
| } | ||
|
|
||
| crate::metrics::inc_tx_poll_count(); | ||
| if let Ok(transactions) = self | ||
| .check_tx_cache() | ||
| .inspect_err(|error| { | ||
| crate::metrics::inc_tx_poll_errors(); | ||
| debug!(%error, "Error fetching transactions"); | ||
| }) | ||
| .instrument(span.clone()) | ||
| .await | ||
| { | ||
| let _guard = span.entered(); | ||
| crate::metrics::record_txs_fetched(transactions.len()); | ||
| trace!(count = transactions.len(), "found transactions"); | ||
| for tx in transactions.into_iter() { | ||
| self.spawn_check_nonce(tx, outbound.clone()); | ||
| /// Processes a single item yielded by the SSE stream: dispatches the tx | ||
| /// for nonce checking on success, or reconnects on error / stream end. | ||
| /// Returns `Break` when the outbound channel has closed and the task | ||
| /// should shut down. | ||
| async fn handle_sse_item( | ||
| &mut self, | ||
| item: Option<Result<TxEnvelope, TxCacheError>>, | ||
| outbound: &mpsc::UnboundedSender<ReceivedTx>, | ||
| backoff: &mut Duration, | ||
| stream: &mut SseStream, | ||
| ) -> ControlFlow<()> { | ||
| match item { | ||
| Some(Ok(tx)) => { | ||
| *backoff = INITIAL_RECONNECT_BACKOFF; | ||
| if outbound.is_closed() { | ||
| trace!("No receivers left, shutting down"); | ||
| return ControlFlow::Break(()); | ||
| } | ||
| self.spawn_check_nonce(tx, outbound.clone()); | ||
| } | ||
| Some(Err(error)) => { | ||
| warn!(%error, "SSE transaction stream error, reconnecting"); | ||
| *stream = self.reconnect(outbound, backoff).await; | ||
| } | ||
| None => { | ||
| warn!("SSE transaction stream ended, reconnecting"); | ||
| *stream = self.reconnect(outbound, backoff).await; | ||
| } | ||
| } | ||
| ControlFlow::Continue(()) | ||
| } | ||
|
|
||
| async fn task_future(mut self, outbound: mpsc::UnboundedSender<ReceivedTx>) { | ||
| // Initial full fetch of all transactions currently in the cache. | ||
| self.full_fetch(&outbound).await; | ||
|
|
||
| time::sleep(self.poll_duration()).await; | ||
| // Open the SSE stream for real-time delivery of new transactions. | ||
| let mut sse_stream = self.subscribe().await; | ||
| let mut backoff = INITIAL_RECONNECT_BACKOFF; | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| item = sse_stream.next() => { | ||
| if self | ||
| .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) | ||
| .await | ||
| .is_break() | ||
| { | ||
| break; | ||
| } | ||
| } | ||
| res = self.envs.changed() => { | ||
| if res.is_err() { | ||
| debug!("Block env channel closed, shutting down"); | ||
| break; | ||
| } | ||
| trace!("Block env changed, refetching all transactions"); | ||
| self.full_fetch(&outbound).await; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Spawns a task that continuously polls the cache for transactions and sends any it finds to | ||
| /// its sender. | ||
| /// Spawns a task that fetches all current transactions, then subscribes | ||
| /// to the SSE feed for real-time updates, refetching on each new block | ||
| /// environment. | ||
| pub fn spawn(self) -> (mpsc::UnboundedReceiver<ReceivedTx>, JoinHandle<()>) { | ||
| let (outbound, inbound) = mpsc::unbounded_channel(); | ||
| let jh = tokio::spawn(self.task_future(outbound)); | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
architectural:
why was
check_tx_cachedeleted if its logic is repeated inline here?This function also does more than fetch, it dispatches tasks. So its name should reflect that