feat(sqs): per-queue throttling (Phase 3.C)#679
feat(sqs): per-queue throttling (Phase 3.C)#679bootjp merged 14 commits intodocs/sqs-phase3-proposalsfrom
Conversation
Implements the design from
docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.
Schema (sqs_catalog.go):
- sqsQueueMeta gains an optional Throttle *sqsQueueThrottle field
with six float64 sub-fields (Send/Recv/Default x Capacity/RefillPerSecond).
- Six new ThrottleSendCapacity / ThrottleSendRefillPerSecond /
ThrottleRecvCapacity / ThrottleRecvRefillPerSecond /
ThrottleDefaultCapacity / ThrottleDefaultRefillPerSecond attributes
go through the standard sqsAttributeAppliers dispatch.
- validateThrottleConfig enforces the cross-field rules from the
design: each (capacity, refill) pair must be both-zero or
both-positive; capacity must be >= refill so the bucket can burst;
Send/Recv capacities must be >= 10 (the SendMessageBatch /
DeleteMessageBatch max charge) so a small capacity does not make
every full batch permanently unserviceable; per-field hard ceiling
of 100k. Validator runs once at the end of applyAttributes so a
multi-field update sees the post-apply state as a whole.
- queueMetaToAttributes surfaces the configured Throttle* fields so
GetQueueAttributes("All") round-trips. Extracted into
addThrottleAttributes to stay under the cyclop ceiling.
Bucket store (sqs_throttle.go, new):
- bucketStore: sync.Map[bucketKey]*tokenBucket; per-bucket sync.Mutex
on the hot path so cross-queue traffic never serialises on a
process-wide lock. Per Gemini medium on PR #664 the single-mutex
alternative was rejected. Lazy idle-evict sweep removes inactive
buckets after 1h to bound memory.
- charge(): lock-free Load, LoadOrStore on miss (race-tolerant -- both
racers compute identical capacity/refill from the same meta), then
per-bucket lock for refill + take + release. Refill is elapsed *
refillRate, capped at capacity. On reject, computes Retry-After
from the actual refillRate and the requestedCount per the §3.4
formula (numerator is requested count, not 1, so a batch verb does
not get told to retry in 1s when it really needs 10s).
- resolveActionConfig() handles the Default* fall-through: when only
DefaultCapacity is set, Send and Recv requests share the same
bucketKey{action:"*"} so Default behaves as one shared cap rather
than three independent quotas.
- invalidateQueue() drops every bucket belonging to a queue. Called
after the Raft commit on SetQueueAttributes / DeleteQueue so new
limits take effect on the very next request, not after the 1h
idle-evict sweep. Without this step the §3.1 cache-invalidation
contract fails and operators see stale enforcement.
Charging (sqs_messages.go, sqs_messages_batch.go):
- All seven message-plane handlers wired:
SendMessage / SendMessageBatch (Send bucket; batch charges by entry
count), ReceiveMessage / DeleteMessage / DeleteMessageBatch /
ChangeMessageVisibility / ChangeMessageVisibilityBatch (Receive
bucket).
- Throttle check sits OUTSIDE the OCC retry loop per §4.2 -- a
rejected request never reaches the coordinator, so the existing
sendMessageWithRetry et al. cannot busy-loop on a permanent
rate-limit failure.
- sendMessage extracted into prepareSendMessage + validateSend +
body to stay under the cyclop ceiling once the throttle branch
was added.
Error envelope (sqs.go):
- New sqsErrThrottling code + writeSQSThrottlingError helper that
writes 400 + AWS-shaped JSON body + x-amzn-ErrorType + Retry-After
header. The envelope is the same shape AWS uses; SDKs that key off
x-amzn-ErrorType handle it without changes.
Tests:
- adapter/sqs_throttle_test.go: 18 unit tests covering bucket math
(fresh capacity, refill elapsed, refill cap, batch reject preserves
partial credit, Retry-After uses requested count, sub-1-RPS floor,
per-action / per-queue / Default-fallthrough isolation,
invalidateQueue, concurrent -race, default-off short-circuit) and
the validator (nil/empty canonicalisation, both-zero-or-both-positive,
capacity >= 10 batch floor, Default* exempt, capacity >= refill,
parseThrottleFloat range checks, computeRetryAfter floor).
- adapter/sqs_throttle_integration_test.go: 8 end-to-end tests
covering the §6 testing strategy (default-off allows unbounded,
send/recv reject after capacity with correct envelope + Retry-After,
batch charges by entry count, SetQueueAttributes invalidation,
DeleteQueue + CreateQueue lifecycle invalidation, GetQueueAttributes
round-trip, validator rejects below batch min). Run with -race
clean.
Out of scope for this PR (deferred to a follow-up):
- Prometheus counter (sqs_throttled_requests_total) and gauge
(sqs_throttle_tokens_remaining) per the design's §4.1 monitoring/
registry.go entry. The wiring needs a new prometheus collector and
hooking through the Registry seam; punted to keep this PR focused
on the data-plane behaviour. The bucket store's chargeOutcome
already exposes tokensAfter so the gauge wiring is one site.
- §6.5 cross-protocol (Query) parity test: the Query protocol layer
lives on PR #662 and the throttle envelope already has the same
shape on both protocols (writeSQSError handles both), so the test
is a follow-up after #662 lands.
- §6.6 failover behaviour test: the §3.1 "fresh bucket on failover"
contract is implementation-correct (the bucket map is per-process,
no Raft state) but a 3-node failover test needs the cluster
scaffolding and is best added with the Jepsen workload.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements per-queue rate limiting for SQS using a token-bucket mechanism. It introduces new queue attributes for configuring capacity and refill rates, a centralized bucket store, and integrates throttling checks into the message processing pipeline. Feedback highlights several performance and robustness concerns: the current implementation performs redundant storage lookups on every request, the cache-sweeping logic runs on the hot path potentially causing latency spikes, and the retry-after calculation could be improved to prevent overflows and ensure system responsiveness.
| if s.throttle == nil { | ||
| return true | ||
| } | ||
| throttle := s.queueThrottleConfig(r, queueName) |
There was a problem hiding this comment.
chargeQueue performs a storage read via queueThrottleConfig on every request, even for queues without throttling configured. Furthermore, since handlers (e.g., sendMessage, receiveMessage) perform their own metadata load shortly after, this results in two storage lookups and two nextTxnReadTS calls per request. Consider caching the throttle configuration in memory or passing the already-loaded metadata to chargeQueue to avoid redundant I/O and timestamp acquisition.
| b.buckets.Range(func(k, v any) bool { | ||
| bucket, _ := v.(*tokenBucket) | ||
| bucket.mu.Lock() | ||
| idle := bucket.lastRefill.Before(cutoff) | ||
| bucket.mu.Unlock() | ||
| if idle { | ||
| b.buckets.Delete(k) | ||
| } | ||
| return true | ||
| }) |
There was a problem hiding this comment.
maybeSweep performs an O(N) iteration over all active buckets using sync.Map.Range on the request hot path. While it is gated to run at most once per minute, the specific request that triggers the sweep will experience a significant latency spike proportional to the number of queues in the system. This logic should be moved to a background goroutine to avoid impacting request latency. Additionally, ensure that the collection of active buckets is strictly bounded to prevent unbounded memory growth and potential OOM issues.
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.
| secs := math.Ceil(needed / refillRate) | ||
| if secs < 1 { | ||
| secs = 1 | ||
| } | ||
| return time.Duration(secs) * time.Second |
There was a problem hiding this comment.
Converting secs to time.Duration and then multiplying by time.Second can lead to an integer overflow if refillRate is extremely small. Furthermore, for low-latency systems, it is preferred to use a short maximum backoff duration for retries. Consider capping the retryAfter duration to 10ms to prevent overflow and ensure the system remains responsive under contention.
| secs := math.Ceil(needed / refillRate) | |
| if secs < 1 { | |
| secs = 1 | |
| } | |
| return time.Duration(secs) * time.Second | |
| secs := math.Ceil(needed / refillRate) | |
| if secs > 0.01 { | |
| return 10 * time.Millisecond | |
| } | |
| return time.Duration(secs * float64(time.Second)) |
References
- For low-latency systems, it is acceptable to use a short maximum backoff duration for retries (e.g., 10ms) as long-lived contention is not the primary scenario.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e105815ce8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if v, ok := b.buckets.Load(key); ok { | ||
| // type assertion is sound: only tokenBucket pointers are stored. | ||
| bucket, _ := v.(*tokenBucket) | ||
| return bucket |
There was a problem hiding this comment.
Reconcile reused buckets with the latest throttle config
loadOrInit returns an existing bucket entry without checking whether its capacity/refillRate still match the queue’s current Throttle* metadata. After a leadership change, a node can retain old in-memory buckets from a prior leader term while SetQueueAttributes was applied on another node, then enforce stale limits/tokens when it becomes leader again (until idle eviction or another local invalidation). This breaks correctness for failover scenarios and can over- or under-throttle traffic for up to the eviction window.
Useful? React with 👍 / 👎.
| // tenant in real time. The LoadOrStore race a concurrent in-flight | ||
| // request might run with the stale bucket is benign: the rebuilt | ||
| // bucket starts at full capacity, same as failover semantics. | ||
| s.throttle.invalidateQueue(name) |
There was a problem hiding this comment.
Invalidate throttle buckets only when throttle attrs change
This unconditional invalidation runs for every successful SetQueueAttributes, including updates to unrelated fields (or no-op writes), which resets the queue’s token state to full capacity. In environments where callers can invoke SetQueueAttributes, this allows bypassing configured rate limits by repeatedly sending benign attribute updates. Invalidation should be gated to changes that actually affect throttle configuration.
Useful? React with 👍 / 👎.
| // names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc. | ||
| // Persisted on the meta so a leader failover loads the configuration | ||
| // along with the rest of the queue. | ||
| Throttle *sqsQueueThrottle `json:"throttle,omitempty"` |
There was a problem hiding this comment.
Compare throttle settings in CreateQueue idempotency checks
Throttle configuration was added to queue metadata, but CreateQueue idempotency still relies on attributesEqual, which does not include Throttle* fields. As a result, creating an existing queue with different throttle settings can be treated as “same attributes” and incorrectly return success instead of QueueNameExists, hiding configuration drift during provisioning.
Useful? React with 👍 / 👎.
…ini high/medium) Six fixes from the first round of automated reviews on PR #679: Codex P1 -- bucket reconciliation on stale config (sqs_throttle.go): loadOrInit returned cached buckets without checking that capacity / refillRate still match the queue's current Throttle config. After a leadership change, a node retaining buckets from a prior leader term would keep enforcing the prior term's limits even after a new SetQueueAttributes had committed -- the invalidation only runs on the leader that processed the commit, so a different leader's stale buckets survive. Now compares cap/refill on every Load hit and rebuilds (Delete + LoadOrStore) on mismatch. Codex P1 -- invalidate only on actual throttle change (sqs_catalog.go, sqs_throttle.go): cache invalidation in setQueueAttributes ran unconditionally after every successful commit, including unrelated- field updates and no-op writes. Result: any caller could silently restore a noisy tenant's burst capacity by writing a no-op SetQueueAttributes. Now gated on throttleAttributesPresent(in.Attributes) which checks the request for any Throttle* key. Bucket reconciliation above acts as the safety net if a future code path bypasses the gate. Codex P2 -- attributesEqual covers Throttle (sqs_catalog.go): CreateQueue idempotency relied on attributesEqual which did not include Throttle*. A re-create with different limits was treated as idempotent and silently kept the old limits. Now compares the full Throttle struct via throttleConfigEqual; baseAttributesEqual extracted to keep cyclop under the ceiling. Gemini high -- thread throttle through existing meta load (sqs_messages.go, sqs_throttle.go): chargeQueue did one Pebble read per request even though the hot-path handlers (sendMessage, receiveMessage) load the meta moments later. Added chargeQueueWithThrottle that takes pre-loaded throttle config; both hot-path handlers now load meta once and pass throttle in. Throttle check now sits AFTER the QueueDoesNotExist branch so a missing queue no longer consumes a token. Batch + delete handlers keep chargeQueue (one extra meta read) -- low-QPS verbs where the simplification of not pulling meta out of the retry loop is worth the per-call cost. Gemini high -- move sweep off hot path (sqs.go, sqs_throttle.go): maybeSweep ran the O(N) sync.Map.Range on whichever request was unlucky enough to trigger the per-minute sweep, causing latency spikes on many-queue clusters. Replaced with runSweepLoop on a background ticker tied to s.reaperCtx (started in Run alongside the existing message reaper, cleaned up by the same reaperCancel in Stop). The hot-path charge() no longer calls into the sweep at all. Gemini medium -- cap retry-after duration (sqs_throttle.go): computeRetryAfter could compute a multi-day Retry-After (or worse, overflow time.Duration arithmetic) for pathologically small refillRate / large requested values. Capped at throttleRetryAfterCap (1h, matching the bucket idle-evict window). Cap is applied before the Duration multiplication so overflow is impossible. New tests: - TestBucketStore_ReconcilesBucketOnConfigChange pins the Codex P1 reconciliation contract. - TestComputeRetryAfter_CapsAtMaximum pins the Gemini medium cap. - TestThrottleAttributesPresent covers the request-gate helper used by the conditional invalidation. All tests pass under -race; golangci-lint clean.
|
最初のレビュー round の指摘 6件すべて対応しました( Codex P1 — bucket reconciliation on stale config Codex P1 — invalidate only on actual throttle change Codex P2 — attributesEqual covers Throttle Gemini high — thread throttle through existing meta load Gemini high — move sweep off hot path Gemini medium — cap retry-after duration Verified: @claude review |
|
Claude finished @bootjp's task in 5m 54s —— View job Review of
|
Three medium-priority Gemini findings + one cyclop fix from the rebase onto the updated PR #679 throttling branch. (1) sqsErrInvalidParameterValue removed (sqs_catalog.go, sqs_partitioning.go, sqs_partitioning_test.go): the constant duplicated sqsErrValidation which already carries "InvalidParameterValue". Reuses the existing constant; comment on the validator clarifies the choice. (2) snapshotImmutableHTFIFO heap allocation gated on htfifoAttributesPresent (sqs_catalog.go): the snapshot is only needed when the request actually carries an HT-FIFO attribute. A SetQueueAttributes that touches only mutable attributes (e.g. VisibilityTimeout — the common path) now skips the allocation entirely. The validator short-circuit on preApply == nil keeps correctness identical: a request with no HT-FIFO attribute can never change an HT-FIFO field. (3) partitionFor inlined FNV-1a (sqs_partitioning.go): the hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap- allocates the byte slice and pays the hash.Hash interface dispatch on every SendMessage. Replaced with a manual FNV-1a loop over the string — measurably faster on the routing hot path. MessageGroupId is capped at 128 chars by validation so the loop is bounded. (4) cyclop (trySetQueueAttributesOnce): the conditional preApply branch + immutability check + Throttle validator pushed cyclop to 12. Extracted applyAndValidateSetAttributes so the function stays under the ceiling; the helper is also useful for any future admin-surface callers that want the same apply+validate flow without the OCC dispatch. All tests pass under -race; golangci-lint clean.
…est (PR #679 round 2) Two items from the round-2 Claude review on PR #679: (1) bucketStore.sweepMu and bucketStore.lastSweep became dead code when the move from on-hot-path maybeSweep to the background runSweepLoop landed in round 1. The sweep() ticker is the only caller, so the serialisation primitive that protected against concurrent on-hot-path callers no longer has a job. Removed both fields and the sweepMu.Lock/lastSweep=now/sweepMu.Unlock block; sweep() now reads b.clock() inline. Field comment updated to describe the post-runSweepLoop design (single-goroutine driver). (2) TestSQSServer_CatalogCreateIsIdempotent now exercises throttleConfigEqual via a fourth case: same name, same VisibilityTimeout, but the second create adds Throttle* attributes. attributesEqual must notice the diff and the call must reject as QueueNameExists. Without this case a future bug in throttleConfigEqual (e.g. always returning true) would slip past the existing VisibilityTimeout-only test. Both findings noted in the round-2 Claude review on PR #679.
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Round 2 review の 2 件対応( Medium — dead Low — throttleConfigEqual の idempotency test ギャップ Token-before-validation の low (follow-up note): Verified: @claude review |
|
Claude finished @bootjp's task in 1m 39s —— View job Claude finished @bootjp's task in 4m 3s —— Review of
|
Three medium-priority Gemini findings + one cyclop fix from the rebase onto the updated PR #679 throttling branch. (1) sqsErrInvalidParameterValue removed (sqs_catalog.go, sqs_partitioning.go, sqs_partitioning_test.go): the constant duplicated sqsErrValidation which already carries "InvalidParameterValue". Reuses the existing constant; comment on the validator clarifies the choice. (2) snapshotImmutableHTFIFO heap allocation gated on htfifoAttributesPresent (sqs_catalog.go): the snapshot is only needed when the request actually carries an HT-FIFO attribute. A SetQueueAttributes that touches only mutable attributes (e.g. VisibilityTimeout — the common path) now skips the allocation entirely. The validator short-circuit on preApply == nil keeps correctness identical: a request with no HT-FIFO attribute can never change an HT-FIFO field. (3) partitionFor inlined FNV-1a (sqs_partitioning.go): the hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap- allocates the byte slice and pays the hash.Hash interface dispatch on every SendMessage. Replaced with a manual FNV-1a loop over the string — measurably faster on the routing hot path. MessageGroupId is capped at 128 chars by validation so the loop is bounded. (4) cyclop (trySetQueueAttributesOnce): the conditional preApply branch + immutability check + Throttle validator pushed cyclop to 12. Extracted applyAndValidateSetAttributes so the function stays under the ceiling; the helper is also useful for any future admin-surface callers that want the same apply+validate flow without the OCC dispatch. All tests pass under -race; golangci-lint clean.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e3530e93bb
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if matches { | ||
| return bucket | ||
| } | ||
| b.buckets.Delete(key) |
There was a problem hiding this comment.
Guard stale-bucket delete with compare-and-delete
When loadOrInit detects a config mismatch, it unconditionally calls b.buckets.Delete(key) before LoadOrStore. If multiple requests race on the same stale bucket (common right after failover/config change), each goroutine can delete a bucket that another goroutine has already recreated and charged, then recreate a fresh full-capacity bucket again. That lets more than the configured capacity through because earlier token deductions are applied to orphaned bucket instances no longer in the map. This reconciliation path needs a conditional delete (only if the map still points to the stale pointer) or a CAS loop to avoid resetting live buckets.
Useful? React with 👍 / 👎.
Round-3 Claude review on PR #679 caught two stale comments left over from the runSweepLoop refactor: throttleIdleEvictAfter said "no goroutine; lookups call sweep() opportunistically" -- both clauses are now false (runSweepLoop is the goroutine, hot-path charge() never calls sweep()). throttleEvictSweepEvery said "bounds how often the sweep runs from the hot path" -- the hot path no longer runs sweep at all; this constant is now just the background ticker interval. Both updated to describe the post-runSweepLoop design and to reinforce that the sweep cost is amortised across the goroutine ticker rather than concentrated on whichever request was unlucky enough to trigger it (which was the old behaviour Gemini high flagged in PR #679 round 1).
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Round 3 review の low fix(
Verified: @claude review |
|
Claude finished @bootjp's task in 2m 32s —— View job Review of
|
Three medium-priority Gemini findings + one cyclop fix from the rebase onto the updated PR #679 throttling branch. (1) sqsErrInvalidParameterValue removed (sqs_catalog.go, sqs_partitioning.go, sqs_partitioning_test.go): the constant duplicated sqsErrValidation which already carries "InvalidParameterValue". Reuses the existing constant; comment on the validator clarifies the choice. (2) snapshotImmutableHTFIFO heap allocation gated on htfifoAttributesPresent (sqs_catalog.go): the snapshot is only needed when the request actually carries an HT-FIFO attribute. A SetQueueAttributes that touches only mutable attributes (e.g. VisibilityTimeout — the common path) now skips the allocation entirely. The validator short-circuit on preApply == nil keeps correctness identical: a request with no HT-FIFO attribute can never change an HT-FIFO field. (3) partitionFor inlined FNV-1a (sqs_partitioning.go): the hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap- allocates the byte slice and pays the hash.Hash interface dispatch on every SendMessage. Replaced with a manual FNV-1a loop over the string — measurably faster on the routing hot path. MessageGroupId is capped at 128 chars by validation so the loop is bounded. (4) cyclop (trySetQueueAttributesOnce): the conditional preApply branch + immutability check + Throttle validator pushed cyclop to 12. Extracted applyAndValidateSetAttributes so the function stays under the ceiling; the helper is also useful for any future admin-surface callers that want the same apply+validate flow without the OCC dispatch. All tests pass under -race; golangci-lint clean.
…(PR #679 round 4) Round-4 Claude review on PR #679 surfaced a TOCTOU race in the loadOrInit reconciliation path the round-1 fix introduced. The race: two concurrent goroutines hit the same stale bucket. A detects mismatch, Delete + LoadOrStore stores freshA. B (which also Loaded the stale bucket before A's Delete) then runs its own Delete -- which under the previous unconditional Delete would evict freshA. B's LoadOrStore then stores freshB. Net effect: a charge() that obtained freshA via the in-flight LoadOrStore from A's path keeps charging a bucket that is no longer in the map, while subsequent requests get freshB at full capacity. Total tokens consumed across the mismatch window can exceed the new capacity by one bucket's worth. Fix: replace the unconditional b.buckets.Delete(key) with b.buckets.CompareAndDelete(key, v) where v is the original stale pointer. CompareAndDelete is a no-op when the map already holds someone else's fresh bucket, so the racer that arrives second takes the winner's fresh bucket via LoadOrStore (which returns the existing entry) instead of orphaning it. Test: TestBucketStore_ConcurrentReconciliationRespectsNewCapacity seeds a stale bucket (cfgOld, drained), then races 200 goroutines through a charge with cfgNew (capacity 50). Asserts exactly 50 successes -- a Delete-after-replace race would let some past the cap. Runs under -race; the new test plus the existing TestBucketStore_ConcurrentChargesPreserveCount and TestBucketStore_ReconcilesBucketOnConfigChange cover the concurrent-fresh-bucket, concurrent-stale-mismatch, and sequential-mismatch cases.
|
Round 4 review の TOCTOU 指摘に対応( Medium — ご指摘の通り、round 1 で導入した reconciliation path で unconditional Fix: 新規テスト
Verified: @claude review |
|
Claude finished @bootjp's task in 2m 41s —— View job Review of
|
| Site | Type | Rationale |
|---|---|---|
loadOrInit |
CompareAndDelete(key, v) |
Conditional — only evict the specific stale pointer; leaves any already-replaced fresh bucket alone. |
invalidateQueue |
Delete(key) unconditional |
Intentional full eviction after a config commit; the next request rebuilds from fresh meta. |
sweep |
Delete(k) unconditional |
Idle eviction on the sole background goroutine; no concurrent sweep races. |
No New Issues Found
The fix is minimal and surgical — exactly one call site changed, one test added. No other paths were touched. The comment is accurate; the test is deterministic and covers the race that the old code exposed.
Overall
All five rounds of findings have been resolved:
| Round | Issue | Status |
|---|---|---|
| 1 | Codex P1 — bucket reconciliation on stale config | ✅ e3530e9 |
| 1 | Codex P1 — conditional invalidation on throttle-attr change | ✅ e3530e9 |
| 1 | Codex P2 — attributesEqual covers Throttle |
✅ e3530e9 |
| 1 | Gemini high — thread throttle through existing meta load | ✅ e3530e9 |
| 1 | Gemini high — move sweep off hot path | ✅ e3530e9 |
| 1 | Gemini medium — cap retry-after duration | ✅ e3530e9 |
| 2 | Dead sweepMu/lastSweep fields |
✅ c6d3a68 |
| 2 | throttleConfigEqual not exercised by idempotency test |
✅ c6d3a68 |
| 3 | Stale constant comments describing old hot-path sweep | ✅ af2d12b |
| 4 | loadOrInit Delete+LoadOrStore TOCTOU (CompareAndDelete) |
✅ 29c7296 |
The only acknowledged but deferred items are: token-before-validation ordering in validateSend (minor AWS contract delta, follow-up), Prometheus metrics (§4.1, deferred), §6.5 cross-protocol parity test (blocked on PR #662), and §6.6 failover behaviour test (needs Jepsen scaffolding). All are documented in the PR description's "Out of scope" section.
The implementation is ready.
Implements PR 2 of the §11 multi-PR rollout from
docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. The schema
fields land on sqsQueueMeta with the design's wire types; the
validator enforces the §3.2 cross-attribute rules; a temporary
CreateQueue gate rejects PartitionCount > 1 until PR 5 lifts the
gate atomically with the data-plane fanout.
Schema (sqs_catalog.go):
- sqsQueueMeta gains three optional fields:
* PartitionCount uint32 — number of FIFO partitions; 0/1 means
legacy single-partition; > 1 enables HT-FIFO.
* FifoThroughputLimit string — "perMessageGroupId" (default for
HT-FIFO) or "perQueue" (collapses every group to partition 0).
* DeduplicationScope string — "messageGroup" (default for HT-FIFO)
or "queue" (legacy single-window).
- attributesEqual extended via baseAttributesEqual + htfifoAttributesEqual
split (kept under cyclop ceiling).
- queueMetaToAttributes surfaces the configured HT-FIFO fields via
addHTFIFOAttributes so GetQueueAttributes("All") round-trips.
Routing (sqs_partitioning.go, new):
- partitionFor(meta, messageGroupId) uint32 implements §3.3:
FNV-1a over MessageGroupId & (PartitionCount - 1). Edge cases
documented in the godoc — PartitionCount 0/1 → 0, FifoThroughputLimit
perQueue short-circuits to 0, empty MessageGroupId → 0.
- The bitwise-mask optimisation requires PartitionCount be a power
of two; the validator enforces it. A future bug that leaks a
non-power-of-two value is caught by TestPartitionFor_PowerOfTwoMaskingMatchesMod.
Validation (sqs_partitioning.go):
- validatePartitionConfig enforces the §3.2 cross-attribute rules:
* PartitionCount must be a power of two in [1, htfifoMaxPartitions=32].
* FifoThroughputLimit / DeduplicationScope are FIFO-only.
* {PartitionCount > 1, DeduplicationScope = "queue"} rejects with
InvalidParameterValue (incoherent params, vs InvalidAttributeValue
for malformed individual values) per §3.2 cross-attribute gate.
- Runs in parseAttributesIntoMeta (after resolveFifoQueueFlag, so
the IsFIFO check sees the post-resolution flag) and in
trySetQueueAttributesOnce (after applyAttributes; IsFIFO comes
from the loaded meta).
- validatePartitionImmutability is the §3.2 rule that PartitionCount,
FifoThroughputLimit, and DeduplicationScope are immutable from
CreateQueue. SetQueueAttributes is all-or-nothing: a request that
touches a mutable attribute alongside an attempted immutable
change rejects the whole request before persisting either.
- snapshotImmutableHTFIFO captures the pre-apply values so the
immutability check has a clean before/after pair.
Dormancy gate (sqs_partitioning.go + sqs_catalog.go):
- validatePartitionDormancyGate is the §11 PR 2 temporary gate that
rejects CreateQueue with PartitionCount > 1 with
InvalidAttributeValue("PartitionCount > 1 requires HT-FIFO data
plane — not yet enabled"). The schema field exists in the meta
type but no partitioned data can land. Removed in PR 5 in the same
commit that wires the data-plane fanout — gate-and-lift is atomic
so a half-deployed cluster can never accept a partitioned queue
without the data plane to serve it.
Tests:
- adapter/sqs_partitioning_test.go: 14 unit tests covering
partitionFor (legacy zero/one, perQueue short-circuit, empty
MessageGroupId, determinism, distribution within ±5% across 8
partitions on 100k samples, power-of-two masking < N), isPowerOfTwo,
validatePartitionConfig (power-of-two, max cap, FIFO-only,
cross-attr InvalidParameterValue, single-partition + queue-dedup OK),
validatePartitionDormancyGate (rejects > 1, allows 0/1),
validatePartitionImmutability (per-attribute change rejects, same-
value no-op succeeds), htfifoAttributesPresent.
- adapter/sqs_partitioning_integration_test.go: 7 end-to-end tests
against a real createNode cluster covering the wire surface:
dormancy gate rejects PartitionCount > 1 (and the gate's reason
surfaces to the operator), allows PartitionCount=1, validator
rejects non-power-of-two, FIFO-only rejection on Standard queue,
cross-attr {partitions, queue dedup} rejects, immutability rejects
SetQueueAttributes change with same-value no-op succeeds, all-or-
nothing — combined mutable+immutable rejects entirely without
persisting the mutable change, GetQueueAttributes round-trip.
- All tests pass under -race; golangci-lint clean.
Out of scope for this PR (§11 PR 3-8):
- PR 3: keyspace threading — partitionIndex through every sqsMsg*Key
constructor, defaulting to 0 so existing queues stay byte-identical.
Gate from PR 2 still in place.
- PR 4: routing layer + --sqsFifoPartitionMap flag + mixed-version
gate (§8.5 capability advertisement + §8 leadership-refusal hook
in kv/lease_state.go). Gate still in place.
- PR 5: send/receive partition fanout, receipt-handle v2, removes
the dormancy gate atomically with the data-plane fanout.
- PR 6: PurgeQueue/DeleteQueue partition iteration + tombstone +
reaper.
- PR 7: Jepsen HT-FIFO workload + metrics.
- PR 8: doc lifecycle bump.
Three medium-priority Gemini findings + one cyclop fix from the rebase onto the updated PR #679 throttling branch. (1) sqsErrInvalidParameterValue removed (sqs_catalog.go, sqs_partitioning.go, sqs_partitioning_test.go): the constant duplicated sqsErrValidation which already carries "InvalidParameterValue". Reuses the existing constant; comment on the validator clarifies the choice. (2) snapshotImmutableHTFIFO heap allocation gated on htfifoAttributesPresent (sqs_catalog.go): the snapshot is only needed when the request actually carries an HT-FIFO attribute. A SetQueueAttributes that touches only mutable attributes (e.g. VisibilityTimeout — the common path) now skips the allocation entirely. The validator short-circuit on preApply == nil keeps correctness identical: a request with no HT-FIFO attribute can never change an HT-FIFO field. (3) partitionFor inlined FNV-1a (sqs_partitioning.go): the hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap- allocates the byte slice and pays the hash.Hash interface dispatch on every SendMessage. Replaced with a manual FNV-1a loop over the string — measurably faster on the routing hot path. MessageGroupId is capped at 128 chars by validation so the loop is bounded. (4) cyclop (trySetQueueAttributesOnce): the conditional preApply branch + immutability check + Throttle validator pushed cyclop to 12. Extracted applyAndValidateSetAttributes so the function stays under the ceiling; the helper is also useful for any future admin-surface callers that want the same apply+validate flow without the OCC dispatch. All tests pass under -race; golangci-lint clean.
…eanup Three items from the round-2 Claude review on PR #681: (1) Bug (latent, live in PR 5): validatePartitionConfig only guarded FifoThroughputLimit / DeduplicationScope as FIFO-only, not PartitionCount > 1. A Standard queue with PartitionCount=2 would slip past the validator after PR 5 lifts the dormancy gate. Added a guard inside the !meta.IsFIFO block: PartitionCount > 1 → InvalidAttributeValue("PartitionCount > 1 is only valid on FIFO queues"). PartitionCount 0 / 1 stay accepted on Standard (both mean "single-partition layout"). Test TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs extended to cover PartitionCount=2/4/8/16/32 → reject and PartitionCount=0/1 → accept. (2) Low: TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned had two contradictory comments. The function-level doc said "Test sets PartitionCount=1 to bypass dormancy" but the test actually sends PartitionCount=2; the inline comment claimed "dormancy fires first" but validatePartitionConfig (inside parseAttributesIntoMeta) runs before validatePartitionDormancyGate, so the cross-attr rule fires first. Rewrote the doc to describe the actual control-plane order (validatePartitionConfig → cross-attr rejection → wire 400) and removed the misleading "dormancy fires first" assertion. (3) Nit: replaced the custom 11-line contains/indexOf helpers with strings.Contains. Identical behaviour, idiomatic. All findings noted in the round-2 Claude review on PR #681.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 29c7296028
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Default* covers any future non-Send/Recv verb that gets wired | ||
| // into the throttle path — no batch verb is in scope today, so the | ||
| // minimum-batch-capacity check is off for this bucket. | ||
| if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, false); err != nil { |
There was a problem hiding this comment.
Enforce batch-capacity floor on fallback Default throttle bucket
validateThrottleConfig disables the batch-capacity check for ThrottleDefault* (requireBatchCapacity=false), but resolveActionConfig routes Send/Receive traffic to Default whenever Send*/Recv* are unset. That means configs like ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 are accepted, yet full SendMessageBatch/DeleteMessageBatch requests (charge=10) can never succeed because the bucket can never accumulate 10 tokens. This creates a permanently unserviceable, hard-throttled batch path from a configuration the validator currently allows.
Useful? React with 👍 / 👎.
| idle := bucket.lastRefill.Before(cutoff) | ||
| bucket.mu.Unlock() | ||
| if idle { | ||
| b.buckets.Delete(k) |
There was a problem hiding this comment.
Make idle-sweep eviction conditional on unchanged bucket entry
In sweep, idle is computed under bucket.mu, but deletion happens later via unconditional b.buckets.Delete(k). A concurrent charge can run between unlock and delete, refresh lastRefill, and deduct tokens; the subsequent delete then drops that live bucket and the next request recreates a full-capacity bucket, effectively discarding recent deductions and allowing excess throughput. Eviction should use a compare-and-delete (or recheck under lock) so only the same still-idle entry is removed.
Useful? React with 👍 / 👎.
…2) (#681) ## Summary Phase 3.D **PR 2** (schema + validators + dormancy gate) per the §11 multi-PR rollout in [`docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md`](https://github.com/bootjp/elastickv/blob/docs/sqs-phase3-proposals/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md) (currently on PR #664). Stacks on **PR #679 (Phase 3.C)**: branched off `feat/sqs-throttling-phase3c` so the 3.D schema work shares the same base as the throttling implementation. Will be rebased once PR #679 merges to main. - `sqsQueueMeta` gains `PartitionCount uint32`, `FifoThroughputLimit string`, `DeduplicationScope string`. - `partitionFor(meta, messageGroupId) uint32` implements §3.3 — FNV-1a over `MessageGroupId`, masked by `PartitionCount-1` (validator-enforced power-of-two so the bitwise mask is equivalent to mod). Edge cases: `PartitionCount` 0/1 → 0, `FifoThroughputLimit=perQueue` short-circuits to 0, empty `MessageGroupId` → 0. - `validatePartitionConfig` enforces the §3.2 cross-attribute rules: power-of-two `[1, htfifoMaxPartitions=32]`, `FifoThroughputLimit`/`DeduplicationScope` are FIFO-only, `{PartitionCount > 1, DeduplicationScope = "queue"}` rejects with `InvalidParameterValue` at control-plane time so the operator sees the error before the first `SendMessage`. - `validatePartitionImmutability` enforces the §3.2 immutability rule (the three HT-FIFO fields are immutable from `CreateQueue` onward). `SetQueueAttributes` is **all-or-nothing** — a request that touches a mutable attribute alongside an attempted immutable change rejects the whole request before persisting either. - `validatePartitionDormancyGate` is the §11 PR 2 temporary gate: `CreateQueue` with `PartitionCount > 1` rejects with `InvalidAttributeValue("PartitionCount > 1 requires HT-FIFO data plane — not yet enabled")`. The schema field exists in the meta type but no partitioned data can land. Removed in PR 5 in the same commit that wires the data-plane fanout — gate-and-lift is **atomic** so a half-deployed cluster can never accept a partitioned queue without the data plane to serve it. - `GetQueueAttributes("All")` round-trips the configured HT-FIFO fields via `addHTFIFOAttributes`. ## Test plan - [x] `adapter/sqs_partitioning_test.go` — 14 unit tests covering `partitionFor` (legacy zero/one, perQueue short-circuit, empty MessageGroupId, determinism, distribution within ±5% across 8 partitions on 100k samples, power-of-two masking always < N), `isPowerOfTwo`, `validatePartitionConfig` (power-of-two, max cap, FIFO-only, cross-attr `InvalidParameterValue`, single-partition + queue-dedup OK), `validatePartitionDormancyGate` (rejects > 1, allows 0/1, gate reason surfaces to the operator), `validatePartitionImmutability` (per-attribute change rejects, same-value no-op succeeds), `htfifoAttributesPresent`. - [x] `adapter/sqs_partitioning_integration_test.go` — 7 end-to-end tests: dormancy gate rejects `PartitionCount > 1` (and the gate's "not yet enabled" message surfaces), allows `PartitionCount=1`, validator rejects non-power-of-two, FIFO-only rejection on Standard queue, cross-attr `{PartitionCount=2, DeduplicationScope=queue}` rejects, immutability `SetQueueAttributes` change rejects with same-value no-op succeeding, all-or-nothing combined mutable+immutable rejects without persisting the mutable change, `GetQueueAttributes` round-trip omits unset fields. - [x] All tests pass under `-race`. - [x] `golangci-lint run ./adapter/...` → 0 issues. ## Self-review (5 lenses) 1. **Data loss** — Schema fields land on `sqsQueueMeta` and are persisted via the existing OCC `Put` on `sqsQueueMetaKey`. The dormancy gate runs at `CreateQueue` admission time; once PR 5 lifts the gate, no schema migration is needed (existing queues have `PartitionCount=0` which is equivalent to 1). The keyspace stays untouched in this PR (PR 3 threads `partitionIndex` through `sqsMsg*Key`). 2. **Concurrency / distributed failures** — Validation is purely function-of-input; no shared state. The immutability check loads the meta in the same `nextTxnReadTS` snapshot that `applyAttributes` uses, so a concurrent `SetQueueAttributes` from another writer is caught by the existing OCC `StartTS + ReadKeys` fence (already in place for `setQueueAttributesWithRetry`). Dormancy gate runs at `CreateQueue` admission so a concurrent gate-lift mid-request is impossible. 3. **Performance** — `partitionFor` is one FNV-1a hash + one bitwise mask; constant time. `validatePartitionConfig` is field-comparison only, runs once per `CreateQueue` / `SetQueueAttributes`. `addHTFIFOAttributes` adds at most three map writes and only when fields are non-empty. Hot path `SendMessage` / `ReceiveMessage` is unaffected by this PR (no routing changes — those land in PR 5). 4. **Data consistency** — Cross-attribute rule `{partitions, queue dedup}` rejects at control-plane time so an operator can never end up with a created-but-unserviceable queue (the runtime path that would have surfaced the same error at first send is unreachable). Immutability rule prevents mid-life routing-key reconfiguration that would silently violate within-group FIFO ordering. 5. **Test coverage** — 14 unit + 7 integration tests cover the validator's full decision tree (each rule has a dedicated table-driven case), the dormancy gate's accept/reject paths with the operator-visible reason, the immutability all-or-nothing semantics, and `partitionFor`'s edge cases plus a 100k-sample distribution test that catches non-uniform routing immediately. Lint is clean. ## Out of scope (subsequent §11 rollout PRs) - **PR 3** — keyspace threading: `partitionIndex` through every `sqsMsg*Key` constructor, defaulting to 0 so existing queues stay byte-identical. Dormancy gate from this PR remains in place. - **PR 4** — routing layer + `--sqsFifoPartitionMap` flag + mixed-version gate (§8.5 capability advertisement + §8 leadership-refusal hook in `kv/lease_state.go`). Dormancy gate still in place. - **PR 5** — send/receive partition fanout + receipt-handle v2 codec. **Removes the dormancy gate atomically with the data-plane fanout** so the gate-and-lift land in one commit. - **PR 6** — `PurgeQueue` / `DeleteQueue` partition iteration + tombstone schema update + reaper. - **PR 7** — Jepsen HT-FIFO workload + metrics. - **PR 8** — partial-doc lifecycle bump. ## Branch base Branched off `feat/sqs-throttling-phase3c` (PR #679) which itself branches off `docs/sqs-phase3-proposals` (PR #664). Will rebase against `main` after both ancestors merge.
…round 5) Two findings from the round-5 Codex review: (1) P1 — Default* batch-capacity floor: validateThrottleConfig disabled the >=10 batch floor for ThrottleDefault* on the assumption that Default never serves a batch verb. But resolveActionConfig in sqs_throttle.go falls Send and Receive traffic through to the Default bucket whenever the corresponding Send*/Recv* pair is unset. So a config like ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 was accepted at the validator, then made every full SendMessageBatch / DeleteMessageBatch (charge=10) permanently unserviceable because the Default bucket could never accumulate 10 tokens. Flipped the third validateThrottlePair call's requireBatchCapacity from false to true; updated the inline comment to spell out why Default* needs the floor (the design doc note in §3.2 about Default* being exempt was wrong about the fall-through). New unit test TestValidateThrottleConfig_DefaultBucketBatchFloor covers DefaultCapacity=1/5 reject and DefaultCapacity=10 accept. (2) P2 — sweep eviction race: Earlier sweep computed idle under bucket.mu, released the lock, then unconditionally Deleted the entry. A concurrent charge() in that window could refill+take a token, then Delete evicted the just-used bucket and the next request got a fresh full-cap bucket — the just-taken token was effectively undone, allowing excess throughput. Fixed in two parts: - Hold bucket.mu across the Delete so the idle observation cannot be invalidated between check and delete. A concurrent charge() that wants the bucket either runs to completion before sweep acquires mu (sweep then sees the freshened lastRefill and skips the delete) or waits for sweep to release mu (charge then takes the bucket — but sweep has already removed it from the map, so the charge succeeds against an orphan bucket and only the next-after-charge request gets the full-cap rebuild — bounded one-token leak). - CompareAndDelete(k, v) ensures sweep does not evict a replacement bucket inserted by invalidateQueue or a future reconciliation path. Holding bucket.mu across sync.Map.Delete is safe — sync.Map.Load is wait-free on the read-only path and never blocks on bucket.mu, so there is no AB-BA cycle with charge(). New regression test TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket backdates a bucket's lastRefill by 2h, advances the clock by 2h to force the bucket past the idle cutoff, then races sweep against a 50-iteration charge loop. After both finish the bucket must still be in the store — at least one charge interleaves with sweep's re-check window and sweep observes the freshened lastRefill. All tests pass under -race; golangci-lint clean.
|
Round 5 Codex の P1 + P2 を対応( P1 — Default batch-capacity floor* (なお元の §3.2 デザインドキュメントの "Default* is exempt because the catch-all action set has no batch verb in scope for this PR" 注記は誤りで、コードの fall-through で Send/Recv batch verb が Default に流れる経路を見落としていました。次回 docs/sqs-phase3-proposals 系 PR に follow-up doc fix を入れます。) P2 — sweep eviction race 修正は2点:
新規 regression test Verified: @claude review |
|
Claude finished @bootjp's task in 5m 46s —— View job Review of
|
| Round | Issue | Status |
|---|---|---|
| 1–4 | (all prior items) | ✅ |
| 5 | P1 — Default* batch floor | ✅ 576cf8c |
| 5 | P2 — sweep eviction race (bucket.mu held across CompareAndDelete) |
✅ 576cf8c |
| 5 | P2 test assertion vacuously true |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 576cf8c39e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if bucket.lastRefill.Before(cutoff) { | ||
| b.buckets.CompareAndDelete(k, v) | ||
| } |
There was a problem hiding this comment.
Prevent sweep from orphaning buckets with blocked chargers
Even with CompareAndDelete, this eviction path can still break throttle correctness because callers may have already loaded v from sync.Map and be blocked on bucket.mu while sweep deletes the entry. After unlock, those goroutines continue charging the now-orphaned bucket, while subsequent requests allocate a fresh full-capacity bucket in the map, so a burst arriving near an idle sweep can exceed configured capacity. This is especially visible when a queue wakes up after being idle and many requests race at once.
Useful? React with 👍 / 👎.
| if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle) { | ||
| return nil, 0, 0, false |
There was a problem hiding this comment.
Avoid charging with stale queue metadata after delete races
This charges using meta.Throttle loaded earlier, so if DeleteQueue commits between the meta read and this call, the in-flight request can recreate a throttle bucket after deleteQueue already invalidated it. Because buckets are keyed by queue name/action (not generation), a same-name recreate can inherit stale token state instead of starting fresh, violating the delete+recreate reset behavior the invalidation is meant to guarantee.
Useful? React with 👍 / 👎.
… round 6) Round 5's sweep+CompareAndDelete fix closed half the eviction race — sweep can no longer evict a bucket whose lastRefill was freshened by a concurrent charge — but left a second window. Goroutines that loaded the bucket from sync.Map *before* sweep removed it from the map were still blocked on bucket.mu, and once sweep released mu they would charge the orphan bucket while later requests created a fresh full-capacity entry via loadOrInit. A burst aligned with a sweep tick could therefore consume up to 2x the configured capacity. Three changes plug the window: - tokenBucket gains an `evicted bool` field, protected by mu. - sweep, invalidateQueue, and the loadOrInit reconciliation path all set evicted=true under mu *after* successfully removing the bucket from the map (CompareAndDelete success or LoadAndDelete return). - charge() loops on a chargeBucket helper that checks evicted under mu and signals retry; the caller drops the orphan reference and loops back through loadOrInit, which converges on whatever live bucket the map now holds. CreateQueue now calls invalidateQueue on a genuine commit (tryCreateQueueOnce post-Dispatch) — DeleteQueue invalidated, but a sendMessage holding pre-delete meta could recreate a bucket between DeleteQueue's invalidate and the next CreateQueue commit, so a same- name recreate would inherit used token state. Invalidating after the genuine create resets the bucket regardless of in-flight traffic to the prior incarnation. The idempotent-return path (existing queue with identical attributes) deliberately skips invalidation. Tests: - TestBucketStore_SweepRaceDoesNotInflateBudget: -race stress test that runs 4 sweeps against 64 chargers on a single backdated bucket; total successful charges must stay <= capacity. The old code path could yield up to 2x capacity. - TestBucketStore_OrphanedBucketRetriesToLiveEntry: deterministic exercise of the evicted-flag retry — sweep evicts, the next charge allocates a fresh bucket via loadOrInit. - TestBucketStore_InvalidateMarksOrphanEvicted: pins the round-6 invalidateQueue change (LoadAndDelete + flag set under mu). - The pre-existing TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket is replaced; the prior assertion (stillThere) was vacuously true because the for-range 50 charge loop would always recreate the bucket via loadOrInit after any eviction (Claude Low on round 5). go test -race -count=20 of the new tests passes; full SQS test package passes under -race; golangci-lint clean. Round-6 review pulled four other items not actioned: - Codex P2 "CreateQueue idempotency must compare Throttle" — already addressed: attributesEqual at sqs_catalog.go:726 calls baseAttributesEqual + throttleConfigEqual + htfifoAttributesEqual. - Gemini high "chargeQueue performs redundant storage read" — already addressed: hot-path handlers (sendMessage, receiveMessage) call chargeQueueWithThrottle to skip the second meta load. - Gemini high "maybeSweep on hot path" — already addressed: runSweepLoop runs sweep on a background ticker wired to reaperCtx. - Gemini medium "cap retryAfter at 10ms to prevent overflow" — already addressed: throttleRetryAfterCap=1h cap before multiply prevents overflow; 10ms cap is inappropriate for SQS rate-limit semantics (clients would retry constantly under contention).
|
Round 6 ( Codex P2 #2 (sweep orphan bucket) と Claude Low (test assertion vacuously true) の対応 ご指摘どおり、round 5 の Round 6 の対応:
Claude Low ご指摘の
Codex P2 #3 (CreateQueue + stale-meta sendMessage race) の対応 ご指摘どおり、
の race で、新しい queue が既に 1 token 消費済みの bucket を継承する経路がありました。 Throttle config が異なれば loadOrInit reconciliation で reset されますが、identical Throttle で delete+recreate の場合のみこの fix が効きます。 スルーした findings (4件)
@claude review |
|
Claude finished @bootjp's task in 5m 52s —— View job Review of
|
| Finding | Addressed in |
|---|---|
| Codex P2 "CreateQueue idempotency must compare Throttle" | Round 1: attributesEqual at sqs_catalog.go:726 calls baseAttributesEqual + throttleConfigEqual + htfifoAttributesEqual |
| Gemini high "chargeQueue redundant storage read" | Round 2: hot-path handlers use chargeQueueWithThrottle; chargeQueue only used for handlers without their own meta load |
| Gemini high "maybeSweep on hot path" | Round 2: runSweepLoop runs on background ticker wired to reaperCtx |
| Gemini medium "cap retryAfter at 10ms" | Round 3 (cap fix): 10ms cap would cause constant client retries under SQS rate-limit semantics; the 1h cap is correctly reasoned |
Summary
All Round 6 changes are correctly implemented. The evicted flag closes the 2× burst window from Round 5's residual race (goroutines holding stale pointers before sweep's CompareAndDelete). The tryCreateQueueOnce invalidation correctly handles the delete-then-create stale-meta window. All three new tests are meaningful and pin the right invariants — the vacuously-true stillThere assertion from Round 5 is correctly replaced.
Complete history:
| Round | Issue | Status |
|---|---|---|
| 1–5 | (all prior items) | ✅ |
| 6 | Codex P2 — orphan-bucket 2× burst window (evicted flag + retry loop) |
✅ 7d7e620 |
| 6 | Codex P2 — CreateQueue post-commit invalidation | ✅ 7d7e620 |
| 6 | Claude Low — vacuously-true stillThere test assertion |
✅ 7d7e620 |
| 6 | loadOrInit matches path doesn't check !evicted (extra retry iteration) |
⬜ Micro-opt, not a bug |
The implementation is ready.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7d7e6203ef
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } | ||
| for _, action := range throttleAllActions { | ||
| key := bucketKey{queue: queue, action: action} | ||
| v, ok := b.buckets.LoadAndDelete(key) |
There was a problem hiding this comment.
Mark bucket evicted before deleting queue entry
invalidateQueue removes the bucket from sync.Map via LoadAndDelete before it acquires bucket.mu and sets evicted=true. If a concurrent charge() already loaded that pointer and wins the mutex race, it can consume tokens while evicted is still false on an orphaned bucket; the next request then creates a fresh full-capacity bucket, effectively discarding those deductions and allowing a burst above configured limits right after SetQueueAttributes/DeleteQueue/CreateQueue invalidation events.
Useful? React with 👍 / 👎.
Round 6's invalidateQueue used LoadAndDelete-then-lock — a charger
that loaded the pointer pre-LoadAndDelete could win the mu race and
charge the just-removed bucket while evicted was still false (Codex
P2 on round 6). The fix mirrors sweep's ordering:
1. Load the pointer from the map (without deleting).
2. Acquire bucket.mu.
3. CompareAndDelete with the loaded v (rejects a replacement
bucket inserted by a concurrent reconciliation).
4. Set evicted=true under mu.
5. Unlock.
A charger blocked on mu now sees evicted=true on entry and retries
against the live entry, instead of charging the orphan.
Replaced the over-strict TestBucketStore_InvalidateRace... with
TestBucketStore_InvalidateUnderConcurrencyIsRaceFree: invalidate's
reset-to-full-capacity is the *intended* semantics of an invalidation
event (DeleteQueue / SetQueueAttributes / CreateQueue all want the
new policy applied to a fresh bucket), so the post-invalidate fresh
bucket can legitimately absorb up to capacity additional tokens —
that 2x window is structural, not a bug. Asserting successful-charge
count <= capacity would also pass on the buggy code path under most
schedulings, so the assertion would not differentiate old vs new.
The deterministic TestBucketStore_InvalidateMarksOrphanEvicted
already pins the actual mechanism (evicted=true under mu on dropped
bucket); the new stress test exists to surface any -race finding the
new lock ordering might introduce.
go test -race -count=20 of every TestBucketStore_* test passes;
golangci-lint clean.
|
Round 6.1 ( ご指摘どおり、round 6 の 修正は sweep と同じ pattern に揃えました:
mu を取得して block していた charger は、unblock 直後に 新規 race stress test その他 round-6 review で再投稿された findings (Codex P2 #1 CreateQueue idempotency / #3 stale meta、Gemini high chargeQueue / maybeSweep、Gemini medium retryAfter cap) はいずれも prior round で対応済 — 前回のコメントで詳述したとおりです。
@claude review |
|
Claude finished @bootjp's task in 4m 10s —— View job Review of
|
| Step | Old (LoadAndDelete-then-lock) |
New (Load-then-lock-then-CompareAndDelete) |
|---|---|---|
| 1 | LoadAndDelete(key) — evicts from map immediately |
Load(key) — pointer obtained, bucket still in map |
| 2 | bucket.mu.Lock() — race window: charger that loaded pre-LoadAndDelete can win mu and see evicted=false |
bucket.mu.Lock() — blocks any concurrent charger until we're done |
| 3 | bucket.evicted = true |
CompareAndDelete(key, v) — only evicts if map still holds this specific pointer |
| 4 | — | bucket.evicted = true under mu |
The race the old code allowed: goroutine C loads v1 pre-LoadAndDelete → LoadAndDelete removes v1 → C wins bucket.mu → sees evicted=false → charges the orphan → invalidateQueue acquires mu → sets evicted=true → next request gets a fresh full-capacity bucket → 2× burst. The new ordering eliminates this: C is now blocked on mu until invalidateQueue releases with evicted=true set, and C retries into the live entry.
LoadAndDelete is fully removed from live code. Confirmed: the only occurrences of LoadAndDelete in sqs_throttle.go are in the invalidateQueue comment block (lines 345-348) explaining why the old approach was wrong.
All three eviction sites now use identical ordering — loadOrInit reconciliation (line 308), invalidateQueue (line 362), and sweep (line 427) all hold bucket.mu across CompareAndDelete before setting evicted=true. The code is now internally consistent.
AB-BA deadlock analysis confirmed safe:
invalidateQueuepath:sync.Map.Load(lock-free, no internal lock held) →bucket.mu.Lock()→CompareAndDelete(acquires sync.Map's internalmu)charge()path:sync.Map.Load(lock-free) →bucket.mu.Lock()runSweepLoop/sweep(): Range callback is invoked outside sync.Map's read lock in Go's implementation →bucket.mu.Lock()→CompareAndDelete
No path holds sync.Map's internal mu while waiting for bucket.mu. No cycle.
CompareAndDelete(key, v) on a concurrent-reconciliation replacement bucket (the case where loadOrInit reconciliation replaced v1 with fresh between invalidateQueue's Load and its CompareAndDelete): CompareAndDelete correctly returns false (map holds fresh, not v1) — invalidateQueue leaves fresh alone. fresh was built by reconciliation from the same current-config parameters, so this is correct: the effectively-current-config bucket does not get wiped. Meanwhile, loadOrInit's own reconciliation already set v1.evicted=true at line 308-311.
New Test Verified ✅
TestBucketStore_InvalidateUnderConcurrencyIsRaceFree (lines 472-497)
The reasoning for withdrawing the count assertion is sound and correctly stated in the comment:
invalidateQueue's reset-to-full-capacity is intended semantics — a post-invalidate fresh bucket should legitimately absorb up tocapacitytokens under new policy. Assertingsuccesses ≤ capacitywould pass on the old buggy code too (under most schedulings), so it wouldn't differentiate.- What the fix guarantees is not a charge-count bound, but that no orphaned bucket is charged while
evictedis still false — which is whatTestBucketStore_InvalidateMarksOrphanEvictedpins deterministically. - The stress test's value is
-racedetection for the new lock ordering, not a semantics assertion.
This split (deterministic pin + stress -race sweep) is the right coverage strategy.
No New Issues Found
The fix is minimal and surgical — one call site changed in invalidateQueue, one stress test added, one over-strict test replaced. No other code paths were touched. The comment block accurately documents both what changed and why the old ordering was wrong.
Complete History
All findings from rounds 1–6.1 are resolved:
| Round | Issue | Status |
|---|---|---|
| 1–5 | (all prior items) | ✅ |
| 6 | Codex P2 — orphan-bucket 2× burst window (evicted flag + retry loop) |
✅ 7d7e620 |
| 6 | Codex P2 — CreateQueue post-commit invalidation | ✅ 7d7e620 |
| 6 | Claude Low — vacuously-true stillThere test assertion |
✅ 7d7e620 |
| 6.1 | Codex P2 — invalidateQueue LoadAndDelete-then-lock race |
✅ e9c20de |
Remaining acknowledged-and-deferred items (all in PR "Out of scope" section): token-before-validation ordering in validateSend, Prometheus metrics (§4.1), §6.5 cross-protocol parity test (blocked on PR #662), §6.6 failover behaviour test (needs Jepsen scaffolding).
The implementation is ready.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e9c20de48e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
|
||
| // htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields. | ||
| func htfifoAttributesEqual(a, b *sqsQueueMeta) bool { | ||
| return a.PartitionCount == b.PartitionCount && |
There was a problem hiding this comment.
Treat PartitionCount 0 and 1 as equal in idempotency checks
validatePartitionConfig documents PartitionCount == 0 as the unset value equivalent to single-partition routing, but htfifoAttributesEqual compares PartitionCount with strict numeric equality. That means a queue created without PartitionCount (stored as 0) is later considered "different attributes" from the same queue created with explicit PartitionCount=1, so CreateQueue can incorrectly return QueueNameExists for a semantically identical request. This can break idempotent provisioning/retry flows that normalize defaults differently.
Useful? React with 👍 / 👎.
… round 6.2) validatePartitionConfig documents PartitionCount=0 (unset) and =1 (explicit single partition) as semantically identical legacy / single-partition routing, but htfifoAttributesEqual was using strict numeric equality. So a queue created without PartitionCount (stored as 0) followed by a CreateQueue retry that explicitly passes PartitionCount=1 was rejected as "different attributes" — broken idempotency for any provisioning flow that normalises defaults differently between attempts. (Codex P2 on PR #679 round 6.1.) Fix: normalisePartitionCount maps 0 -> 1 before comparison; values >1 pass through unchanged so HT-FIFO partition counts still require exact match. The on-disk schema is unchanged. Tests: - TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent — pins (0, 1) equality, symmetry, and that real divergence (2 vs 0, 2 vs 1) still rejects. - TestNormalisePartitionCount — pins the helper directly. go test -race ./adapter/... pass; golangci-lint clean.
|
Round 6.2 ( ご指摘どおり、 修正: 新規 test:
なお round-6.1 の他の Codex/Gemini findings (CreateQueue idempotency Throttle / sweep orphan / stale meta / chargeQueue / maybeSweep / retryAfter cap) はいずれも prior round で対応済の re-post です。Claude round-6 / 6.1 は "The implementation is ready" verdict (#679 (comment) / 4327071094)。 @claude review |
|
Claude finished @bootjp's task in 2m 24s —— View job Review of
|
| Round | Issue | Status |
|---|---|---|
| 1–6.1 | (all prior items) | ✅ |
| 6.2 | Codex P2 — PartitionCount 0 ≡ 1 in CreateQueue idempotency |
✅ dbd7a66 |
| 6.2 | validatePartitionImmutability has same 0 ≡ 1 gap for SetQueueAttributes |
#679 round 6.3) Round 6.2 normalised the CreateQueue idempotency path (htfifoAttributesEqual) but left the SetQueueAttributes immutability path (validatePartitionImmutability) using strict equality. Same 0 == 1 gap (Claude Low on round 6.2): 1. CreateQueue with no PartitionCount -> stored as 0. 2. SetQueueAttributes(PartitionCount=1) -> requested.PartitionCount = 1. 3. validatePartitionImmutability: 0 != 1 -> rejects "PartitionCount is immutable", even though the partition layout is unchanged. Fix: validatePartitionImmutability now normalises both sides through normalisePartitionCount before comparison, mirroring the round-6.2 fix. The narrow gap surfaced under the §11 PR 2 dormancy gate (SetQueueAttributes does not call the gate, so this path is reachable on existing queues), so a fix is correct now rather than deferring to PR 5. Test: - TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent table-driven over all (stored, requested) ∈ {0,1,2,4,8} pairs; no-op pairs accept, real-change pairs reject. Reverting either call to strict equality flips the relevant rows. go test -race ./adapter/... pass; golangci-lint clean.
|
Round 6.3 ( ご指摘どおり、round-6.2 で 具体的な失敗 path:
修正: Claude が指摘されているとおり scope は narrow (Phase 3.D §11 PR 2 の dormancy gate は SetQueueAttributes 経路では走らないので、explicit 新規 test:
@claude review |
|
Claude finished @bootjp's task in 2m 22s —— View job Review of
|
| Site | Uses normalise? | Correct? |
|---|---|---|
htfifoAttributesEqual (CreateQueue idempotency) |
✅ Round 6.2 | ✓ |
validatePartitionImmutability (SetQueueAttributes immutability) |
✅ Round 6.3 | ✓ |
Routing logic (PartitionCount <= 1, > 1) |
N/A — literal comparisons for routing decisions, not equality checks | ✓ |
validatePartitionConfig (power-of-two, FIFO-only gate, etc.) |
N/A — validation semantics intentionally distinguish 0 vs 1 | ✓ |
No other equality comparison site was missed.
New Test Verified ✅
TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent (sqs_partitioning_test.go:275)
The 10-case table is complete and non-vacuous:
| Case | Expected | Catches regression? |
|---|---|---|
(0, 1) no-op |
no error | ✅ flips with strict equality |
(1, 0) no-op |
no error | ✅ flips with strict equality (symmetric) |
(0, 0) no-op |
no error | passes either way |
(1, 1) no-op |
no error | passes either way |
(0, 2) real change |
error | passes either way |
(1, 2) real change |
error | passes either way |
(2, 1) real change |
error | ✅ flips with strict equality (reverse direction) |
(2, 0) real change |
error | ✅ flips with strict equality (reverse direction) |
(4, 8) real change |
error | passes either way |
(8, 8) no-op |
no error | passes either way |
The (2,1) and (2,0) cases are particularly useful: they verify that normalisation only collapses 0/1 and not 2/1 or 2/0 — a correct but non-obvious property of normalisePartitionCount. Reverting to strict equality flips exactly 4 cases, confirming the test is not vacuously true.
TestNormalisePartitionCount (sqs_partitioning_test.go:348) from Round 6.2 pins the helper itself and is unchanged — no regression there.
One Very Low Observation
Integration test gap: SetQueueAttributes(PartitionCount=1) on a stored-as-0 queue
TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects creates a queue via mustCreateFIFOWithThroughputLimit, which does not set PartitionCount (stored as 0). The test exercises FifoThroughputLimit immutability but not the 0→1 PartitionCount no-op path. An integration-level test for:
CreateQueue (no PartitionCount → stored as 0)
SetQueueAttributes(PartitionCount=1) → expect 200
would complete the wire-level coverage for this fix. Since validatePartitionImmutability is a pure function exercised directly by the unit test and the wire path through trySetQueueAttributesOnce is already covered by the existing FifoThroughputLimit scenario, this is low priority — but it would be the natural follow-up to pin the actual HTTP response.
Overall
The fix is a one-line change that correctly extends the Round 6.2 normalisation pattern to the SetQueueAttributes immutability path. Test coverage is thorough at the unit level and confirms the regression-detection property. The minor integration test gap is the only remaining observation.
Complete history — all rounds resolved:
| Round | Issue | Status |
|---|---|---|
| 1–6.1 | (all prior items) | ✅ |
| 6.2 | Codex P2 — PartitionCount 0 ≡ 1 in CreateQueue idempotency (htfifoAttributesEqual) |
✅ dbd7a66 |
| 6.3 | Claude Low — validatePartitionImmutability same 0 ≡ 1 gap for SetQueueAttributes |
✅ dcc5574 |
| 6.3 | Integration test gap for 0→1 no-op wire path | ⬜ Very low, not a correctness issue |
The implementation is ready.
…md (PR #664 round 14) Claude low on round 13.1: ~25+ comments across the SQS Go files reference review attributions ('Codex P1 on PR #679', 'CodeRabbit Major on PR #664 round 11', etc.). CLAUDE.md is explicit: 'Don't reference the current task, fix, or callers ... since those belong in the PR description and rot as the codebase evolves.' Sweep across: - adapter/sqs_throttle.go - adapter/sqs_catalog.go - adapter/sqs_partitioning.go - adapter/sqs_messages.go - adapter/sqs_throttle_test.go - adapter/sqs_partitioning_test.go - adapter/sqs_throttle_integration_test.go - adapter/sqs_partitioning_integration_test.go - adapter/sqs_catalog_test.go - adapter/sqs_query_protocol.go - adapter/sqs_query_protocol_test.go Each comment now retains the substantive WHY (why the design choice was made) and drops the parenthetical attribution to a specific reviewer / PR number / round. Design docs (.md files) are exempt — those are archival records where review attribution is acceptable practice. go test -race ./adapter/... pass; golangci-lint ./... clean. No behaviour change.

Summary
Phase 3.C implementation: per-queue rate limiting via token-bucket throttling. Implements the design from
docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md(currently on PR #664).*SQSServer:sync.Map[bucketKey]*tokenBucket, per-bucketsync.Mutexso cross-queue traffic never serialises on a process-wide lock. Lazy idle-evict (1h) bounds memory.sqsQueueMeta.Throttle *sqsQueueThrottlefield with six float64 sub-fields (Send/Recv/Default × Capacity/RefillPerSecond), wired through the existingsqsAttributeAppliersdispatch asThrottleSendCapacity/ThrottleSendRefillPerSecond/ etc.validateThrottleConfig) enforces the §3.2 cross-attribute rules: each (capacity, refill) pair both-zero or both-positive; capacity ≥ refill; Send/Recv capacities ≥ 10 (the SendMessageBatch / DeleteMessageBatch max charge); per-field hard ceiling 100k.400 Throttlingwithx-amzn-ErrorType+ AWS-shaped JSON envelope +Retry-Afterheader computed from the §3.4 formula (numerator is requested count, not 1, so a batch verb does not get told to retry in 1s when it really needs 10s).SetQueueAttributesandDeleteQueue(§3.1): drops every bucket belonging to the queue after the Raft commit so new limits take effect on the very next request, not after the 1h idle-evict sweep.GetQueueAttributes("All")round-trips Throttle* fields so SDKs can confirm the config landed.Test plan
adapter/sqs_throttle_test.go— 18 unit tests:invalidateQueuedrops all action keys;-raceclean, exactly-capacity successes;parseThrottleFloatrange checks,computeRetryAfterfloor.adapter/sqs_throttle_integration_test.go— 8 end-to-end tests against a realcreateNodecluster:Retry-After;SetQueueAttributesinvalidation (raise-and-immediate-success);DeleteQueue+CreateQueuelifecycle invalidation;GetQueueAttributesround-trip;golangci-lint runclean.go test -race -run "TestBucketStore|TestValidateThrottleConfig|TestParseThrottleFloat|TestComputeRetryAfter|TestSQSServer_Throttle" ./adapter/...clean.Self-review (5 lenses)
sqsQueueMeta, persisted via the existingsetQueueAttributesWithRetryOCC commit. Bucket state is per-process and never written to storage, by design (§3.1: replicating bucket state would cost a Raft commit per token decrement, defeating the bucket). On leader failover the new leader rebuilds at full capacity — same as the AWS region-failover semantic. No write path can lose throttle config.sync.Mutex, never held acrosssync.Mapops.LoadOrStorerace on first insert is safe (both racers compute identical config from the same meta snapshot). ConcurrentSetQueueAttributes+ in-flight charge: the charge may briefly use the old bucket; the next request rebuilds from the fresh meta.-racetest pins the count invariant: exactlycapacitysuccesses out of N concurrent goroutines, never more.nilcheck (Throttle == nil) → return. On configured queues: onesync.Map.Load(lock-free), one mutex acquire on the bucket. No global lock.sync.Map's read-mostly optimisation matches the access pattern. The lazy idle-evict sweep runs at most once per minute from the hot path so a many-queue cluster does not pay an O(N) cost per request.sendMessageWithRetrycannot busy-loop on a permanent rate-limit failure. Cache invalidation after the Raft commit (not before) so the rebuilt bucket reads the freshly committed config — there is no window where a request can read a stale bucket built from the old config.-race. Validator has a dedicated table-driven test per rule.Out of scope (deferred to follow-ups)
sqs_throttled_requests_total{queue,action}+ gaugesqs_throttle_tokens_remaining{queue,action}per the design's §4.1 monitoring/registry.go entry). The bucket store'schargeOutcomealready exposestokensAfterso the gauge wiring is one site; the counter is a one-line increment afterwriteSQSThrottlingError. Punted to keep this PR focused on data-plane behaviour.writeSQSErrorhandles both), so the test is a follow-up after feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof) #662 lands.Branch base
Branched off
docs/sqs-phase3-proposals(PR #664) so the design doc is part of the reviewable surface. Will rebase againstmainafter #664 merges.