adapters: Fix StreamFromLatest race condition#190
Conversation
StreamFromLatest was resolving the "latest" cursor position on the first Recv() call instead of at NewReceiver() time. This created a race where events sent between NewReceiver and the first Recv could be skipped or cause Recv to hang indefinitely. Fix by capturing the cursor position at NewReceiver time: - memstreamer: set cursor to current log length in NewReceiver - reflexstreamer: add WithEventsTableName option to resolve HEAD via SELECT MAX(id) at NewReceiver time - adaptertest: simplify StreamFromLatest test to send synchronously before Recv, which is now safe with the cursor fix Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe diff moves cursor/consumer-group initialisation from per-receive loops into receiver construction across multiple streamers. memstreamer seeds its cursor from the in-memory log length in NewReceiver when StreamFromLatest is set. reflexstreamer gains a variadic Option mechanism and WithEventsTableName; when provided and StreamFromLatest is requested it validates the table name and queries max(id) to seed the initial cursor (returning errors on invalid names or query failures). wredis now eagerly creates the Redis consumer group in NewReceiver (using "$" for StreamFromLatest) and removes group-creation logic from Recv. A test was simplified to remove goroutine/WaitGroup use and perform send/recv sequentially. Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapters/reflexstreamer/reflex.go`:
- Around line 120-126: The code concatenates c.eventsTableName into the SQL in
the QueryRowContext call (in the receiver init path around the QueryRowContext
usage) which risks SQL injection or malformed identifiers; validate
c.eventsTableName against a strict identifier regex (e.g. start with
letter/underscore, only alphanumerics/underscores) and then safely quote/escape
it using a proper identifier-quoting helper for your driver (or use the driver's
QuoteIdentifier like pq.QuoteIdentifier) before building the query string passed
to c.reader.QueryRowContext; update the code that computes the query (the
QueryRowContext call and its surrounding logic that reads max(id)) to use the
validated-and-quoted table name.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 70514e39-829d-4dbf-b523-65838f2a19e9
📒 Files selected for processing (4)
adapters/adaptertest/eventstreaming.goadapters/memstreamer/memstreamer.goadapters/reflexstreamer/reflex.goadapters/reflexstreamer/reflex_test.go
…ection Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
adapters/reflexstreamer/reflex.go (1)
124-143: SQL injection mitigated, but consider database-agnostic identifier quoting.The regex validation (line 26) prevents SQL injection through strict allowlisting of identifier characters (
^[a-zA-Z_][a-zA-Z0-9_]*$). Backtick quoting adds defence-in-depth, though the validation alone is sufficient.However, the library's documentation states "Infrastructure Agnostic" and "Your choice of database", yet uses MySQL-specific backtick quoting. Whilst only MySQL is currently implemented, backticks are MySQL-specific; PostgreSQL uses double quotes (
"table") and SQL Server uses square brackets ([table]). To align with the stated database-agnostic design goals, consider using ANSI SQL double quotes or a database driver-aware identifier quoting mechanism for future multi-database support.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapters/reflexstreamer/reflex.go` around lines 124 - 143, The code currently builds a query using MySQL-specific backticks around c.eventsTableName after validating it with validIdentifier; to make this database-agnostic, add a small identifier-quoting helper (e.g. quoteIdentifier(name string) or method on the connector) that chooses the proper quoting characters (ANSI double quotes by default or driver-configured quotes) and use that helper when constructing the query passed to c.reader.QueryRowContext; keep the existing validIdentifier check, and update the call site where the query string is built (referencing c.eventsTableName and c.reader.QueryRowContext) to use the helper so the adapter can support multiple SQL dialects.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@adapters/reflexstreamer/reflex.go`:
- Around line 124-143: The code currently builds a query using MySQL-specific
backticks around c.eventsTableName after validating it with validIdentifier; to
make this database-agnostic, add a small identifier-quoting helper (e.g.
quoteIdentifier(name string) or method on the connector) that chooses the proper
quoting characters (ANSI double quotes by default or driver-configured quotes)
and use that helper when constructing the query passed to
c.reader.QueryRowContext; keep the existing validIdentifier check, and update
the call site where the query string is built (referencing c.eventsTableName and
c.reader.QueryRowContext) to use the helper so the adapter can support multiple
SQL dialects.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2bfa4806-7cd5-4364-9642-35162a30319e
📒 Files selected for processing (1)
adapters/reflexstreamer/reflex.go
…ceiver Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@adapters/wredis/streamer.go`:
- Around line 59-60: The error check that rejects Redis BUSYGROUP errors is
brittle because it compares the full literal message; change the condition
around the err check to detect BUSYGROUP by prefix (use
strings.HasPrefix(err.Error(), "BUSYGROUP")) so variations won't cause false
failures, and add the strings import if missing; update the conditional that
currently compares to "BUSYGROUP Consumer Group name already exists" to use
HasPrefix on err.Error() and only return the error when it is not a BUSYGROUP
prefix match.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4fc4f981-cb92-4474-8c8e-67505ec3672d
📒 Files selected for processing (1)
adapters/wredis/streamer.go
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|



Summary
StreamFromLatestwas resolving the "latest" cursor position on the firstRecv()call instead of atNewReceiver()time. This created a race where events sent betweenNewReceiverand the firstRecvcould be skipped — or causeRecvto hang indefinitely waiting for events that were already in the log/table.This was causing flaky test failures in downstream consumers (e.g. timing out after 5 minutes in CI).
Changes
NewReceiver()instead of firstRecv()WithEventsTableNameoption resolves HEAD viaSELECT MAX(id)atNewReceiver()time. Without the option, falls back to existingWithStreamFromHead()behaviourStreamFromLatesttest — send synchronously beforeRecv(now safe with cursor fix). Removed goroutines +sync.WaitGroupTest plan
go test ./adapters/memstreamer/... -count=10passesgo test ./adapters/reflexstreamer/... -count=10passes (including gap filler test)🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests