Skip to content

feat(sqs): per-queue throttling (Phase 3.C)#679

Merged
bootjp merged 14 commits intodocs/sqs-phase3-proposalsfrom
feat/sqs-throttling-phase3c
Apr 27, 2026
Merged

feat(sqs): per-queue throttling (Phase 3.C)#679
bootjp merged 14 commits intodocs/sqs-phase3-proposalsfrom
feat/sqs-throttling-phase3c

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 26, 2026

Summary

Phase 3.C implementation: per-queue rate limiting via token-bucket throttling. Implements the design from docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md (currently on PR #664).

  • Token-bucket store on *SQSServer: sync.Map[bucketKey]*tokenBucket, per-bucket sync.Mutex so cross-queue traffic never serialises on a process-wide lock. Lazy idle-evict (1h) bounds memory.
  • New sqsQueueMeta.Throttle *sqsQueueThrottle field with six float64 sub-fields (Send/Recv/Default × Capacity/RefillPerSecond), wired through the existing sqsAttributeAppliers dispatch as ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc.
  • Validator (validateThrottleConfig) enforces the §3.2 cross-attribute rules: each (capacity, refill) pair both-zero or both-positive; capacity ≥ refill; Send/Recv capacities ≥ 10 (the SendMessageBatch / DeleteMessageBatch max charge); per-field hard ceiling 100k.
  • All seven message-plane handlers wired (SendMessage[Batch], ReceiveMessage, DeleteMessage[Batch], ChangeMessageVisibility[Batch]). Throttle check sits outside the OCC retry loop per §4.2 — a rejected request never reaches the coordinator.
  • On rejection: 400 Throttling with x-amzn-ErrorType + AWS-shaped JSON envelope + Retry-After header computed from the §3.4 formula (numerator is requested count, not 1, so a batch verb does not get told to retry in 1s when it really needs 10s).
  • Cache invalidation on SetQueueAttributes and DeleteQueue (§3.1): drops every bucket belonging to the queue after the Raft commit so new limits take effect on the very next request, not after the 1h idle-evict sweep.
  • GetQueueAttributes("All") round-trips Throttle* fields so SDKs can confirm the config landed.

Test plan

  • adapter/sqs_throttle_test.go — 18 unit tests:
    • bucket math: fresh capacity, refill elapsed, refill cap-at-capacity, batch reject preserves partial credit, Retry-After uses requested count, sub-1-RPS floor;
    • isolation: per-action, per-queue, Default-fallthrough shares one bucket;
    • lifecycle: invalidateQueue drops all action keys;
    • concurrency: -race clean, exactly-capacity successes;
    • default-off short-circuit;
    • validator: nil/empty canonicalisation, both-zero-or-both-positive, capacity ≥ 10 batch floor, Default* exempt, capacity ≥ refill, parseThrottleFloat range checks, computeRetryAfter floor.
  • adapter/sqs_throttle_integration_test.go — 8 end-to-end tests against a real createNode cluster:
    • default-off allows unbounded;
    • send/recv reject after capacity with correct envelope + Retry-After;
    • batch charges by entry count;
    • SetQueueAttributes invalidation (raise-and-immediate-success);
    • DeleteQueue + CreateQueue lifecycle invalidation;
    • GetQueueAttributes round-trip;
    • validator rejects below batch min.
  • golangci-lint run clean.
  • go test -race -run "TestBucketStore|TestValidateThrottleConfig|TestParseThrottleFloat|TestComputeRetryAfter|TestSQSServer_Throttle" ./adapter/... clean.

Self-review (5 lenses)

  1. Data loss — Throttle config is on sqsQueueMeta, persisted via the existing setQueueAttributesWithRetry OCC commit. Bucket state is per-process and never written to storage, by design (§3.1: replicating bucket state would cost a Raft commit per token decrement, defeating the bucket). On leader failover the new leader rebuilds at full capacity — same as the AWS region-failover semantic. No write path can lose throttle config.
  2. Concurrency — Per-bucket sync.Mutex, never held across sync.Map ops. LoadOrStore race on first insert is safe (both racers compute identical config from the same meta snapshot). Concurrent SetQueueAttributes + in-flight charge: the charge may briefly use the old bucket; the next request rebuilds from the fresh meta. -race test pins the count invariant: exactly capacity successes out of N concurrent goroutines, never more.
  3. Performance — Hot path on unconfigured queues is one nil check (Throttle == nil) → return. On configured queues: one sync.Map.Load (lock-free), one mutex acquire on the bucket. No global lock. sync.Map's read-mostly optimisation matches the access pattern. The lazy idle-evict sweep runs at most once per minute from the hot path so a many-queue cluster does not pay an O(N) cost per request.
  4. Data consistency — Throttling sits outside OCC; a rejected request never touches the coordinator (§4.2). The OCC retry loop in sendMessageWithRetry cannot busy-loop on a permanent rate-limit failure. Cache invalidation after the Raft commit (not before) so the rebuilt bucket reads the freshly committed config — there is no window where a request can read a stale bucket built from the old config.
  5. Test coverage — 18 unit tests cover bucket math, isolation, lifecycle, concurrency, default-off; 8 integration tests cover the end-to-end wire path. Both pass under -race. Validator has a dedicated table-driven test per rule.

Out of scope (deferred to follow-ups)

  • Prometheus metrics (counter sqs_throttled_requests_total{queue,action} + gauge sqs_throttle_tokens_remaining{queue,action} per the design's §4.1 monitoring/registry.go entry). The bucket store's chargeOutcome already exposes tokensAfter so the gauge wiring is one site; the counter is a one-line increment after writeSQSThrottlingError. Punted to keep this PR focused on data-plane behaviour.
  • §6.5 cross-protocol (Query) parity test — the Query protocol layer lives on PR feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof) #662 (separate branch); the throttle envelope already has the same shape on both protocols (writeSQSError handles both), so the test is a follow-up after feat(sqs): Phase 3.B — XML query-protocol support (3-verb proof) #662 lands.
  • §6.6 failover behaviour test — the §3.1 "fresh bucket on failover" contract is implementation-correct (the bucket map is per-process, no Raft state) but a 3-node failover test needs cluster scaffolding and is best added with the Jepsen workload.

Branch base

Branched off docs/sqs-phase3-proposals (PR #664) so the design doc is part of the reviewable surface. Will rebase against main after #664 merges.

Implements the design from
docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.

Schema (sqs_catalog.go):
- sqsQueueMeta gains an optional Throttle *sqsQueueThrottle field
  with six float64 sub-fields (Send/Recv/Default x Capacity/RefillPerSecond).
- Six new ThrottleSendCapacity / ThrottleSendRefillPerSecond /
  ThrottleRecvCapacity / ThrottleRecvRefillPerSecond /
  ThrottleDefaultCapacity / ThrottleDefaultRefillPerSecond attributes
  go through the standard sqsAttributeAppliers dispatch.
- validateThrottleConfig enforces the cross-field rules from the
  design: each (capacity, refill) pair must be both-zero or
  both-positive; capacity must be >= refill so the bucket can burst;
  Send/Recv capacities must be >= 10 (the SendMessageBatch /
  DeleteMessageBatch max charge) so a small capacity does not make
  every full batch permanently unserviceable; per-field hard ceiling
  of 100k. Validator runs once at the end of applyAttributes so a
  multi-field update sees the post-apply state as a whole.
- queueMetaToAttributes surfaces the configured Throttle* fields so
  GetQueueAttributes("All") round-trips. Extracted into
  addThrottleAttributes to stay under the cyclop ceiling.

Bucket store (sqs_throttle.go, new):
- bucketStore: sync.Map[bucketKey]*tokenBucket; per-bucket sync.Mutex
  on the hot path so cross-queue traffic never serialises on a
  process-wide lock. Per Gemini medium on PR #664 the single-mutex
  alternative was rejected. Lazy idle-evict sweep removes inactive
  buckets after 1h to bound memory.
- charge(): lock-free Load, LoadOrStore on miss (race-tolerant -- both
  racers compute identical capacity/refill from the same meta), then
  per-bucket lock for refill + take + release. Refill is elapsed *
  refillRate, capped at capacity. On reject, computes Retry-After
  from the actual refillRate and the requestedCount per the §3.4
  formula (numerator is requested count, not 1, so a batch verb does
  not get told to retry in 1s when it really needs 10s).
- resolveActionConfig() handles the Default* fall-through: when only
  DefaultCapacity is set, Send and Recv requests share the same
  bucketKey{action:"*"} so Default behaves as one shared cap rather
  than three independent quotas.
- invalidateQueue() drops every bucket belonging to a queue. Called
  after the Raft commit on SetQueueAttributes / DeleteQueue so new
  limits take effect on the very next request, not after the 1h
  idle-evict sweep. Without this step the §3.1 cache-invalidation
  contract fails and operators see stale enforcement.

Charging (sqs_messages.go, sqs_messages_batch.go):
- All seven message-plane handlers wired:
  SendMessage / SendMessageBatch (Send bucket; batch charges by entry
  count), ReceiveMessage / DeleteMessage / DeleteMessageBatch /
  ChangeMessageVisibility / ChangeMessageVisibilityBatch (Receive
  bucket).
- Throttle check sits OUTSIDE the OCC retry loop per §4.2 -- a
  rejected request never reaches the coordinator, so the existing
  sendMessageWithRetry et al. cannot busy-loop on a permanent
  rate-limit failure.
- sendMessage extracted into prepareSendMessage + validateSend +
  body to stay under the cyclop ceiling once the throttle branch
  was added.

Error envelope (sqs.go):
- New sqsErrThrottling code + writeSQSThrottlingError helper that
  writes 400 + AWS-shaped JSON body + x-amzn-ErrorType + Retry-After
  header. The envelope is the same shape AWS uses; SDKs that key off
  x-amzn-ErrorType handle it without changes.

Tests:
- adapter/sqs_throttle_test.go: 18 unit tests covering bucket math
  (fresh capacity, refill elapsed, refill cap, batch reject preserves
  partial credit, Retry-After uses requested count, sub-1-RPS floor,
  per-action / per-queue / Default-fallthrough isolation,
  invalidateQueue, concurrent -race, default-off short-circuit) and
  the validator (nil/empty canonicalisation, both-zero-or-both-positive,
  capacity >= 10 batch floor, Default* exempt, capacity >= refill,
  parseThrottleFloat range checks, computeRetryAfter floor).
- adapter/sqs_throttle_integration_test.go: 8 end-to-end tests
  covering the §6 testing strategy (default-off allows unbounded,
  send/recv reject after capacity with correct envelope + Retry-After,
  batch charges by entry count, SetQueueAttributes invalidation,
  DeleteQueue + CreateQueue lifecycle invalidation, GetQueueAttributes
  round-trip, validator rejects below batch min). Run with -race
  clean.

Out of scope for this PR (deferred to a follow-up):
- Prometheus counter (sqs_throttled_requests_total) and gauge
  (sqs_throttle_tokens_remaining) per the design's §4.1 monitoring/
  registry.go entry. The wiring needs a new prometheus collector and
  hooking through the Registry seam; punted to keep this PR focused
  on the data-plane behaviour. The bucket store's chargeOutcome
  already exposes tokensAfter so the gauge wiring is one site.
- §6.5 cross-protocol (Query) parity test: the Query protocol layer
  lives on PR #662 and the throttle envelope already has the same
  shape on both protocols (writeSQSError handles both), so the test
  is a follow-up after #662 lands.
- §6.6 failover behaviour test: the §3.1 "fresh bucket on failover"
  contract is implementation-correct (the bucket map is per-process,
  no Raft state) but a 3-node failover test needs the cluster
  scaffolding and is best added with the Jepsen workload.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 26, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 820c564c-b3ec-4794-b47e-bbac8403544e

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-throttling-phase3c

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements per-queue rate limiting for SQS using a token-bucket mechanism. It introduces new queue attributes for configuring capacity and refill rates, a centralized bucket store, and integrates throttling checks into the message processing pipeline. Feedback highlights several performance and robustness concerns: the current implementation performs redundant storage lookups on every request, the cache-sweeping logic runs on the hot path potentially causing latency spikes, and the retry-after calculation could be improved to prevent overflows and ensure system responsiveness.

Comment thread adapter/sqs_throttle.go
if s.throttle == nil {
return true
}
throttle := s.queueThrottleConfig(r, queueName)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

chargeQueue performs a storage read via queueThrottleConfig on every request, even for queues without throttling configured. Furthermore, since handlers (e.g., sendMessage, receiveMessage) perform their own metadata load shortly after, this results in two storage lookups and two nextTxnReadTS calls per request. Consider caching the throttle configuration in memory or passing the already-loaded metadata to chargeQueue to avoid redundant I/O and timestamp acquisition.

Comment thread adapter/sqs_throttle.go
Comment on lines +245 to +254
b.buckets.Range(func(k, v any) bool {
bucket, _ := v.(*tokenBucket)
bucket.mu.Lock()
idle := bucket.lastRefill.Before(cutoff)
bucket.mu.Unlock()
if idle {
b.buckets.Delete(k)
}
return true
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

maybeSweep performs an O(N) iteration over all active buckets using sync.Map.Range on the request hot path. While it is gated to run at most once per minute, the specific request that triggers the sweep will experience a significant latency spike proportional to the number of queues in the system. This logic should be moved to a background goroutine to avoid impacting request latency. Additionally, ensure that the collection of active buckets is strictly bounded to prevent unbounded memory growth and potential OOM issues.

References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.

Comment thread adapter/sqs_throttle.go
Comment on lines +298 to +302
secs := math.Ceil(needed / refillRate)
if secs < 1 {
secs = 1
}
return time.Duration(secs) * time.Second
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Converting secs to time.Duration and then multiplying by time.Second can lead to an integer overflow if refillRate is extremely small. Furthermore, for low-latency systems, it is preferred to use a short maximum backoff duration for retries. Consider capping the retryAfter duration to 10ms to prevent overflow and ensure the system remains responsive under contention.

Suggested change
secs := math.Ceil(needed / refillRate)
if secs < 1 {
secs = 1
}
return time.Duration(secs) * time.Second
secs := math.Ceil(needed / refillRate)
if secs > 0.01 {
return 10 * time.Millisecond
}
return time.Duration(secs * float64(time.Second))
References
  1. For low-latency systems, it is acceptable to use a short maximum backoff duration for retries (e.g., 10ms) as long-lived contention is not the primary scenario.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e105815ce8

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_throttle.go Outdated
Comment on lines +196 to +199
if v, ok := b.buckets.Load(key); ok {
// type assertion is sound: only tokenBucket pointers are stored.
bucket, _ := v.(*tokenBucket)
return bucket
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reconcile reused buckets with the latest throttle config

loadOrInit returns an existing bucket entry without checking whether its capacity/refillRate still match the queue’s current Throttle* metadata. After a leadership change, a node can retain old in-memory buckets from a prior leader term while SetQueueAttributes was applied on another node, then enforce stale limits/tokens when it becomes leader again (until idle eviction or another local invalidation). This breaks correctness for failover scenarios and can over- or under-throttle traffic for up to the eviction window.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_catalog.go Outdated
// tenant in real time. The LoadOrStore race a concurrent in-flight
// request might run with the stale bucket is benign: the rebuilt
// bucket starts at full capacity, same as failover semantics.
s.throttle.invalidateQueue(name)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Invalidate throttle buckets only when throttle attrs change

This unconditional invalidation runs for every successful SetQueueAttributes, including updates to unrelated fields (or no-op writes), which resets the queue’s token state to full capacity. In environments where callers can invoke SetQueueAttributes, this allows bypassing configured rate limits by repeatedly sending benign attribute updates. Invalidation should be gated to changes that actually affect throttle configuration.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_catalog.go
// names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc.
// Persisted on the meta so a leader failover loads the configuration
// along with the rest of the queue.
Throttle *sqsQueueThrottle `json:"throttle,omitempty"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Compare throttle settings in CreateQueue idempotency checks

Throttle configuration was added to queue metadata, but CreateQueue idempotency still relies on attributesEqual, which does not include Throttle* fields. As a result, creating an existing queue with different throttle settings can be treated as “same attributes” and incorrectly return success instead of QueueNameExists, hiding configuration drift during provisioning.

Useful? React with 👍 / 👎.

…ini high/medium)

Six fixes from the first round of automated reviews on PR #679:

Codex P1 -- bucket reconciliation on stale config (sqs_throttle.go):
loadOrInit returned cached buckets without checking that capacity /
refillRate still match the queue's current Throttle config. After a
leadership change, a node retaining buckets from a prior leader term
would keep enforcing the prior term's limits even after a new
SetQueueAttributes had committed -- the invalidation only runs on the
leader that processed the commit, so a different leader's stale
buckets survive. Now compares cap/refill on every Load hit and
rebuilds (Delete + LoadOrStore) on mismatch.

Codex P1 -- invalidate only on actual throttle change (sqs_catalog.go,
sqs_throttle.go): cache invalidation in setQueueAttributes ran
unconditionally after every successful commit, including unrelated-
field updates and no-op writes. Result: any caller could silently
restore a noisy tenant's burst capacity by writing a no-op
SetQueueAttributes. Now gated on throttleAttributesPresent(in.Attributes)
which checks the request for any Throttle* key. Bucket reconciliation
above acts as the safety net if a future code path bypasses the gate.

Codex P2 -- attributesEqual covers Throttle (sqs_catalog.go):
CreateQueue idempotency relied on attributesEqual which did not
include Throttle*. A re-create with different limits was treated as
idempotent and silently kept the old limits. Now compares the full
Throttle struct via throttleConfigEqual; baseAttributesEqual extracted
to keep cyclop under the ceiling.

Gemini high -- thread throttle through existing meta load
(sqs_messages.go, sqs_throttle.go): chargeQueue did one Pebble read
per request even though the hot-path handlers (sendMessage,
receiveMessage) load the meta moments later. Added
chargeQueueWithThrottle that takes pre-loaded throttle config; both
hot-path handlers now load meta once and pass throttle in. Throttle
check now sits AFTER the QueueDoesNotExist branch so a missing queue
no longer consumes a token. Batch + delete handlers keep chargeQueue
(one extra meta read) -- low-QPS verbs where the simplification of
not pulling meta out of the retry loop is worth the per-call cost.

Gemini high -- move sweep off hot path (sqs.go, sqs_throttle.go):
maybeSweep ran the O(N) sync.Map.Range on whichever request was
unlucky enough to trigger the per-minute sweep, causing latency
spikes on many-queue clusters. Replaced with runSweepLoop on a
background ticker tied to s.reaperCtx (started in Run alongside the
existing message reaper, cleaned up by the same reaperCancel in
Stop). The hot-path charge() no longer calls into the sweep at all.

Gemini medium -- cap retry-after duration (sqs_throttle.go):
computeRetryAfter could compute a multi-day Retry-After (or worse,
overflow time.Duration arithmetic) for pathologically small
refillRate / large requested values. Capped at throttleRetryAfterCap
(1h, matching the bucket idle-evict window). Cap is applied before
the Duration multiplication so overflow is impossible.

New tests:
- TestBucketStore_ReconcilesBucketOnConfigChange pins the Codex P1
  reconciliation contract.
- TestComputeRetryAfter_CapsAtMaximum pins the Gemini medium cap.
- TestThrottleAttributesPresent covers the request-gate helper used
  by the conditional invalidation.

All tests pass under -race; golangci-lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

最初のレビュー round の指摘 6件すべて対応しました(e3530e93):

Codex P1 — bucket reconciliation on stale config
loadOrInit が cached bucket の cap/refill が現行 cfg と一致するか確認していなかったので、leadership change → SetQueueAttributes (別 leader で commit) → 元の leader 復帰、というシーケンスで stale buckets が prior leader-term の制限を enforce し続ける問題があった。loadOrInit で Load hit 時に cap+refill を比較し、mismatch なら Delete + LoadOrStore で fresh bucket に置き換えるように修正。新規テスト TestBucketStore_ReconcilesBucketOnConfigChange で pin。

Codex P1 — invalidate only on actual throttle change
setQueueAttributes の cache invalidation が無条件に走っていたので、関係ないフィールドの更新や no-op write でも bucket が reset されていた。throttleAttributesPresent(in.Attributes) で gate して、Throttle* フィールドが request に含まれている時だけ invalidate するように。Bucket reconciliation も safety net として動くので、将来 gate を bypass する code path が出ても correctness は保たれる。

Codex P2 — attributesEqual covers Throttle
attributesEqual が Throttle を比較していなかったので、CreateQueue idempotency が throttle 設定の差異を見逃していた(同名で異なる limits でも idempotent 扱い)。throttleConfigEqual ヘルパーを追加。baseAttributesEqual を抽出して cyclop ceiling を維持。

Gemini high — thread throttle through existing meta load
chargeQueue が hot-path handlers (sendMessage / receiveMessage) で要分の Pebble read を 1 回だけ済ませる構造に refactor。chargeQueueWithThrottle(throttle) を新設、両 handler で meta 1 回 load → throttle config を直接渡すパターンに。Throttle check が QueueDoesNotExist branch の AFTER に移ったので、存在しない queue が token を消費することがなくなった。Batch + delete handlers は per-call meta load を残しています(low-QPS、retry loop からの meta load 取り出しは別 PR の scope)。

Gemini high — move sweep off hot path
maybeSweep の O(N) sync.Map.Range が per-minute トリガーされた request に latency spike を起こしていた。runSweepLoop を新設、Run()s.reaperCtx に紐付けた background goroutine として起動。Stop()reaperCancel() で cleanup される(既存 reaper パターンと同様)。Hot-path charge() から sweep への呼び出しは削除。

Gemini medium — cap retry-after duration
computeRetryAfter が tiny refillRate + large requested で multi-day の Retry-After を出力したり time.Duration overflow したりする可能性があった。throttleRetryAfterCap = 1h (idle-evict window と一致)で cap、Duration multiplication の前に cap 適用なので overflow も防げる。TestComputeRetryAfter_CapsAtMaximum で pin。

Verified: go test -race ./adapter/... (throttle tests) clean, golangci-lint run clean.

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 5m 54s —— View job


Review of e3530e93 — Round 2

Tasks

  • Gather context and understand request
  • Read changed files (sqs_throttle.go, sqs_catalog.go, sqs_messages.go, sqs.go, tests)
  • Review round-1 fixes for correctness
  • Review for new issues introduced by the fixes
  • Check test coverage quality
  • Post final review

Round-1 Fixes Verified ✅

All six claimed fixes are correctly implemented:

Codex P1 — Bucket reconciliation (loadOrInit)
The mismatch detection is correct: acquire bucket.mu → compare capacity + refillRate → release → DeleteLoadOrStore with fresh bucket. The concurrent-racer comment is accurate: both racers build from the same capacity/refill params, so the loser's fresh is safely discarded by LoadOrStore. TestBucketStore_ReconcilesBucketOnConfigChange pins the invariant well.

Codex P1 — Conditional invalidation (throttleAttributesPresent)
Gate is correct. throttleAttributeNames slice is the single source of truth, so a future Throttle* attribute added there automatically participates in the gate. Tested in TestThrottleAttributesPresent.

Codex P2 — attributesEqual covers Throttle (throttleConfigEqual)
The nil-handling in throttleConfigEqual (both nil → equal, one nil → unequal) is correct. Splitting out baseAttributesEqual to maintain cyclomatic complexity is a clean approach. attributesEqual nil-guard is correctly retained in the wrapper.

Gemini high — Thread throttle through meta load (validateSend / chargeQueueWithThrottle)
The split is clean: prepareSendMessage decodes + extracts queue name, validateSend loads meta → charges → validates. The throttle check is correctly ordered: after loadQueueMetaForSend (which fails fast on QueueDoesNotExist) and outside the OCC transaction. receiveMessage correctly uses chargeQueueWithThrottle inline after its existing meta load.

Gemini high — Move sweep off hot path (runSweepLoop)
runSweepLoop is wired to reaperCtx in Run() so the goroutine is cleaned up by Stop() alongside the existing reaper — no goroutine leak. The background ticker cadence defaults to throttleEvictSweepEvery (1 min); the hot-path chargeQueue no longer pays the O(N) cost.

Gemini medium — Cap retry-after (computeRetryAfter)
Cap is applied to secs (the float) before the time.Duration multiplication, so the overflow is correctly prevented. throttleRetryAfterCap = time.Hour is well-reasoned: beyond the idle-evict window, the bucket would be rebuilt at full capacity anyway. TestComputeRetryAfter_CapsAtMaximum pins it.


Issues Found

Medium — Dead fields: sweepMu and lastSweep in bucketStore

After the move to runSweepLoop, lastSweep is written in sweep() but never read by anything. sweepMu only protects lastSweep, so it is also dead. The field comment still describes the old hot-path design:

// sweepMu serialises the lazy idle-evict sweep so concurrent first
// requests cannot all run a full sweep in parallel.

This no longer applies — runSweepLoop runs sweep() sequentially on a single goroutine driven by a ticker, so concurrent sweeps are structurally impossible. Retaining these fields creates misleading documentation and unnecessary lock overhead in every sweep() call.

Suggested removal:

  • Drop sweepMu sync.Mutex and lastSweep time.Time from bucketStore
  • Remove the b.sweepMu.Lock() / b.lastSweep = now / b.sweepMu.Unlock() block from sweep() (sqs_throttle.go:314-317)

Fix this →

Low — throttleConfigEqual not exercised by any integration test

TestSQSServer_CatalogCreateIsIdempotent only uses VisibilityTimeout for the "differing attributes → QueueNameExists" case. If throttleConfigEqual contained a bug (e.g., always returned true), no existing test would catch it. A minimal addition to that test:

// Fourth call: same name, same non-throttle attr, but different throttle → QueueNameExists
withThrottle := map[string]any{
    "QueueName": "idempotent",
    "Attributes": map[string]string{
        "VisibilityTimeout":        "60",
        "ThrottleSendCapacity":     "10",
        "ThrottleSendRefillPerSecond": "1",
    },
}
status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle)
if status4 != http.StatusBadRequest { ... } // expect QueueNameExists

Fix this →

Low — Token consumed before full attribute validation in validateSend

In validateSend (sqs_messages.go:326):

1. loadQueueMetaForSend  ← body size check
2. chargeQueueWithThrottle  ← token consumed HERE
3. validateMessageAttributes
4. validateSendFIFOParams
5. resolveSendDelay

A request with an invalid MessageAttribute key or a missing FIFO MessageGroupId will consume a throttle token before being rejected. AWS SQS validates attributes before rate-limiting. This is a minor behavioral difference from the AWS contract — not worth restructuring now, but worth a follow-up comment or issue so it doesn't surprise anyone building AWS-compatibility tests.


Other Observations

resolveActionConfig bucket-key sharing is correct. When only Default* is configured, both a Send call and a direct bucketActionAny call resolve to the "*" bucket key, sharing one quota. TestBucketStore_DefaultBucketCovers verifies this end-to-end.

throttleAllActions as the invalidation vocabulary. Using a single slice for both invalidateQueue and the "exhaustive key set" is a good defensive pattern. A future verb that grows a new bucket cannot sneak past invalidation if it's added to this slice.

Reconciliation window is benign. Between loadOrInit detecting a mismatch and completing LoadOrStore, a concurrent charge() that already holds a pointer to the old stale bucket can still complete its charge. This is a one-request window that self-corrects and matches the documented failover semantics.

sendMessageBatch double meta load acknowledged. The remaining double load in sendMessageBatch (one in chargeQueue, one implicitly in sendMessageBatchWithRetry) is the stated scope of the deferred follow-up — no objection to deferring this.

Test suite is thorough. 18 unit tests + 8 integration tests cover all the primary contracts including the concurrency invariant (-race + exactly-capacity count), the lifecycle (invalidate + rebuild), and the wire-level envelope (x-amzn-ErrorType + Retry-After). The only gap noted above is the throttleConfigEqual CreateQueue path.


Summary: All six round-1 findings are correctly resolved. Two actionable items remain: (1) the dead sweepMu/lastSweep fields left over from the old maybeSweep design, and (2) a test gap for throttleConfigEqual in the CreateQueue idempotency path. The token-before-validation ordering in validateSend is a minor follow-up. Otherwise the implementation is ready.

bootjp added a commit that referenced this pull request Apr 27, 2026
Three medium-priority Gemini findings + one cyclop fix from the
rebase onto the updated PR #679 throttling branch.

(1) sqsErrInvalidParameterValue removed (sqs_catalog.go,
sqs_partitioning.go, sqs_partitioning_test.go): the constant
duplicated sqsErrValidation which already carries
"InvalidParameterValue". Reuses the existing constant; comment on
the validator clarifies the choice.

(2) snapshotImmutableHTFIFO heap allocation gated on
htfifoAttributesPresent (sqs_catalog.go): the snapshot is only
needed when the request actually carries an HT-FIFO attribute. A
SetQueueAttributes that touches only mutable attributes (e.g.
VisibilityTimeout — the common path) now skips the allocation
entirely. The validator short-circuit on preApply == nil keeps
correctness identical: a request with no HT-FIFO attribute can
never change an HT-FIFO field.

(3) partitionFor inlined FNV-1a (sqs_partitioning.go): the
hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap-
allocates the byte slice and pays the hash.Hash interface dispatch
on every SendMessage. Replaced with a manual FNV-1a loop over the
string — measurably faster on the routing hot path. MessageGroupId
is capped at 128 chars by validation so the loop is bounded.

(4) cyclop (trySetQueueAttributesOnce): the conditional preApply
branch + immutability check + Throttle validator pushed cyclop to
12. Extracted applyAndValidateSetAttributes so the function stays
under the ceiling; the helper is also useful for any future
admin-surface callers that want the same apply+validate flow
without the OCC dispatch.

All tests pass under -race; golangci-lint clean.
…est (PR #679 round 2)

Two items from the round-2 Claude review on PR #679:

(1) bucketStore.sweepMu and bucketStore.lastSweep became dead code
when the move from on-hot-path maybeSweep to the background
runSweepLoop landed in round 1. The sweep() ticker is the only
caller, so the serialisation primitive that protected against
concurrent on-hot-path callers no longer has a job. Removed both
fields and the sweepMu.Lock/lastSweep=now/sweepMu.Unlock block;
sweep() now reads b.clock() inline. Field comment updated to
describe the post-runSweepLoop design (single-goroutine driver).

(2) TestSQSServer_CatalogCreateIsIdempotent now exercises
throttleConfigEqual via a fourth case: same name, same
VisibilityTimeout, but the second create adds Throttle* attributes.
attributesEqual must notice the diff and the call must reject as
QueueNameExists. Without this case a future bug in
throttleConfigEqual (e.g. always returning true) would slip past
the existing VisibilityTimeout-only test.

Both findings noted in the round-2 Claude review on PR #679.
@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 2 review の 2 件対応(c6d3a683):

Medium — dead sweepMu/lastSweep 削除
Round 1 で maybeSweeprunSweepLoop 背景 goroutine に移した時点で、sweepMulastSweep が dead になっていました。sweep() の唯一の caller が ticker driven の単一 goroutine なので、concurrent on-hot-path callers を直列化する目的の lock primitive はもう不要。両 field を削除し、sweep() 内の lock block も除去、コメントも post-runSweepLoop の設計を反映するよう書き換え。

Low — throttleConfigEqual の idempotency test ギャップ
TestSQSServer_CatalogCreateIsIdempotent に 4 ケース目を追加: 同名・同 VisibilityTimeout で 2 回目に Throttle* を追加 → attributesEqual が差分を検出 → QueueNameExists を返す、を pin。throttleConfigEqual に常に true を返すような将来的な regression があれば既存 VisibilityTimeout-only ケースは見逃すので、独立テストケースとして追加。

Token-before-validation の low (follow-up note): validateSend で MessageAttributes / FIFO params 検証より前に throttle token が消費される件は、AWS の SQS が attribute 検証を rate limit より前に行う点と差分があります。restructure ほどではなく follow-up note として認識。

Verified: go test -race ./adapter/... (throttle + idempotency) clean, golangci-lint run clean.

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 1m 39s —— View job


Claude finished @bootjp's task in 4m 3s ——


Review of c6d3a683 — Round 3

Tasks

  • Gather context and understand request
  • Read changed files (sqs_throttle.go, sqs_catalog_test.go)
  • Verify Round 2 fix Configure Renovate #1: dead sweepMu/lastSweep removal
  • Verify Round 2 fix Update dependency go to v1.21.4 #2: throttleConfigEqual idempotency test
  • Check for new issues introduced by fixes
  • Post final review

Round 2 Fixes Verified ✅

Fix 1 — Dead sweepMu/lastSweep removed

Both fields are gone from bucketStore (struct at sqs_throttle.go:112-117). Grep for sweepMu, lastSweep, and maybeSweep returns nothing. sweep() (sqs_throttle.go:312-324) is clean — no mutex acquire or timestamp write, no per-call serialisation. The bucketStore struct comment was correctly updated to describe the new goroutine-driven design: "there is no hot-path serialisation primitive because the only caller of sweep() is the sole goroutine the ticker drives".

Fix 2 — throttleConfigEqual idempotency test gap closed

TestSQSServer_CatalogCreateIsIdempotent now has a fourth case (sqs_catalog_test.go:140-161): same name as the original "idempotent" queue, same VisibilityTimeout: "60", but adds ThrottleSendCapacity: "10" and ThrottleSendRefillPerSecond: "1". The original create had no throttle config (nil), so throttleConfigEqual(nil, nonNilThrottle) must return false and the test asserts 400 QueueNameExists. This covers exactly the regression that attributesEqual-only coverage would miss (a throttleConfigEqual bug that always returned true would sail through the VisibilityTimeout-only cases).


One New Issue Found

Low — Two constant comments describe the old maybeSweep design, not the current runSweepLoop design

The runSweepLoop fix landed in Round 1 and the bucketStore struct comment was updated, but the two constant comments above it were not updated and now directly contradict the implementation:

throttleIdleEvictAfter (sqs_throttle.go:75-79):

// throttleIdleEvictAfter is the idle window after which a quiet bucket
// is dropped from the in-memory store. The evictor fires lazily — there
// is no goroutine; lookups call sweep() opportunistically. A queue that
// resumes activity rebuilds its bucket from the meta record at full
// capacity, matching the failover semantics documented in §3.1.

"There is no goroutine; lookups call sweep() opportunistically" is now false — there IS a goroutine (runSweepLoop) and lookups do not call sweep at all.

throttleEvictSweepEvery (sqs_throttle.go:82-84):

// throttleEvictSweepEvery bounds how often the sweep runs from the hot
// path so a flood of misses on a many-queue cluster cannot turn the
// idle-evict cost into a per-request hot path.

"Runs from the hot path" was the old design; sweepEvery is now just the background ticker interval.

These contradict the correctly-updated bucketStore struct comment three lines below and would mislead the next reader doing a grep for where sweep is triggered. Suggested replacement:

// throttleIdleEvictAfter is the idle window after which a quiet bucket
// is dropped from the in-memory store by the background sweep goroutine.
// A queue that resumes activity rebuilds its bucket from the meta record
// at full capacity, matching the failover semantics documented in §3.1.
const throttleIdleEvictAfter = time.Hour

// throttleEvictSweepEvery is the interval at which runSweepLoop fires
// the idle-evict sweep in its background goroutine.
const throttleEvictSweepEvery = time.Minute

Fix this →


Other Observations

runSweepLoop wiring confirmed correct. sqs.go:149 launches go s.throttle.runSweepLoop(s.reaperCtx) in Run(), tied to the same reaperCtx that Stop() cancels — goroutine lifecycle is clean, no leak path.

validateSend order verified. Throttle check at sqs_messages.go:332 is correctly ordered: after loadQueueMetaForSend (which rejects non-existent queues before a token is touched) and outside the OCC transaction. The prior "token consumed before full attribute validation" note from Round 2 stands as an acknowledged follow-up; the ordering is deliberate per §4.2 and the comment reflects it.


Summary: Both Round 2 items are correctly implemented. The only new finding is two stale constant comments that describe the old hot-path sweep design — a low-severity doc fix. Otherwise the implementation is ready.

bootjp added a commit that referenced this pull request Apr 27, 2026
Three medium-priority Gemini findings + one cyclop fix from the
rebase onto the updated PR #679 throttling branch.

(1) sqsErrInvalidParameterValue removed (sqs_catalog.go,
sqs_partitioning.go, sqs_partitioning_test.go): the constant
duplicated sqsErrValidation which already carries
"InvalidParameterValue". Reuses the existing constant; comment on
the validator clarifies the choice.

(2) snapshotImmutableHTFIFO heap allocation gated on
htfifoAttributesPresent (sqs_catalog.go): the snapshot is only
needed when the request actually carries an HT-FIFO attribute. A
SetQueueAttributes that touches only mutable attributes (e.g.
VisibilityTimeout — the common path) now skips the allocation
entirely. The validator short-circuit on preApply == nil keeps
correctness identical: a request with no HT-FIFO attribute can
never change an HT-FIFO field.

(3) partitionFor inlined FNV-1a (sqs_partitioning.go): the
hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap-
allocates the byte slice and pays the hash.Hash interface dispatch
on every SendMessage. Replaced with a manual FNV-1a loop over the
string — measurably faster on the routing hot path. MessageGroupId
is capped at 128 chars by validation so the loop is bounded.

(4) cyclop (trySetQueueAttributesOnce): the conditional preApply
branch + immutability check + Throttle validator pushed cyclop to
12. Extracted applyAndValidateSetAttributes so the function stays
under the ceiling; the helper is also useful for any future
admin-surface callers that want the same apply+validate flow
without the OCC dispatch.

All tests pass under -race; golangci-lint clean.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e3530e93bb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_throttle.go Outdated
if matches {
return bucket
}
b.buckets.Delete(key)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard stale-bucket delete with compare-and-delete

When loadOrInit detects a config mismatch, it unconditionally calls b.buckets.Delete(key) before LoadOrStore. If multiple requests race on the same stale bucket (common right after failover/config change), each goroutine can delete a bucket that another goroutine has already recreated and charged, then recreate a fresh full-capacity bucket again. That lets more than the configured capacity through because earlier token deductions are applied to orphaned bucket instances no longer in the map. This reconciliation path needs a conditional delete (only if the map still points to the stale pointer) or a CAS loop to avoid resetting live buckets.

Useful? React with 👍 / 👎.

Round-3 Claude review on PR #679 caught two stale comments left over
from the runSweepLoop refactor:

throttleIdleEvictAfter said "no goroutine; lookups call sweep()
opportunistically" -- both clauses are now false (runSweepLoop is
the goroutine, hot-path charge() never calls sweep()).

throttleEvictSweepEvery said "bounds how often the sweep runs from
the hot path" -- the hot path no longer runs sweep at all; this
constant is now just the background ticker interval.

Both updated to describe the post-runSweepLoop design and to
reinforce that the sweep cost is amortised across the goroutine
ticker rather than concentrated on whichever request was unlucky
enough to trigger it (which was the old behaviour Gemini high
flagged in PR #679 round 1).
@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 3 review の low fix(af2d12ba):

throttleIdleEvictAfterthrottleEvictSweepEvery のコメントが Round 1 の runSweepLoop 化 refactor で stale になっていた点を修正しました。前者は "no goroutine; lookups call sweep() opportunistically" → 現在は背景 goroutine で動くので両方 false、後者は "bounds how often the sweep runs from the hot path" → 現在 hot path は sweep を呼ばない、なのでただの ticker interval、という内容に書き換え。

Verified: golangci-lint run clean.

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 2m 32s —— View job


Review of af2d12ba — Round 4

Tasks

  • Gather context and understand request
  • Read adapter/sqs_throttle.go — verify constant comment updates
  • Check git diff for the specific commit
  • Scan for any new issues introduced
  • Post final review

Round 3 Fix Verified ✅

Stale constant comments updated (sqs_throttle.go:75-88)

Both comments are now accurate and internally consistent:

throttleIdleEvictAfter (lines 75-82): Removed the two false clauses ("there is no goroutine; lookups call sweep() opportunistically") and correctly describes the background goroutine (runSweepLoop) and the fact that the hot path never calls sweep(). The failover semantics note (§3.1, full-capacity rebuild) is preserved.

throttleEvictSweepEvery (lines 84-88): Removed "bounds how often the sweep runs from the hot path" (the hot path no longer runs the sweep at all) and now correctly describes this as the background ticker interval. The new sentence "charge() never calls into the sweep so a many-queue cluster pays the O(N) cost only on the goroutine's tick, never on a request" accurately mirrors the bucketStore struct comment added in the Round 2 fix.

The updated comments are also consistent with sweep()'s own doc (line 309: "Called from runSweepLoop on a background ticker — the ticker is the only caller, so sweep() does not need its own serialisation") — no contradictions remain anywhere in the file.

No new issues introduced by this commit. One delta: the commit message accurately attributes both stale-comment fixes and explains why the original wording was wrong (Round 1 Gemini-high triggered the runSweepLoop refactor, which orphaned these comments).


Outstanding Item From Previous Rounds

Medium — Codex P1 (loadOrInit Delete+LoadOrStore TOCTOU) unaddressed

The Codex review at e3530e93 raised a P1 that was not addressed in Rounds 2 or 3. Restating it here since it's the only remaining substantive concern:

In loadOrInit (lines 252-266), when a config mismatch is detected, the code does:

b.buckets.Delete(key)               // unconditional delete
// ...
b.buckets.LoadOrStore(key, fresh)   // insert fresh bucket

The race: goroutine A loads the stale bucket → detects mismatch → DeleteLoadOrStore stores freshA. Goroutine B also loaded the stale bucket (before A's Delete) → detects mismatch → Delete(key) evicts freshALoadOrStore stores freshB. Meanwhile, a goroutine that obtained a reference to freshA before B's Delete (via a concurrent charge() call) charges freshA — but freshA is no longer in the map. The next charge() gets freshB (full capacity) and charges it again. Net result: more tokens consumed than the bucket's capacity in the mismatch window.

Fix: Use sync.Map.CompareAndDelete (available since Go 1.20, project targets Go 1.25):

// Only delete if the map still holds THIS stale pointer.
b.buckets.CompareAndDelete(key, v)

This ensures B's delete is a no-op if A has already replaced the stale bucket with freshA.

Impact context: The window is narrow (concurrent requests all hitting the same stale bucket right after a failover or SetQueueAttributes), and the worst case is one extra full-bucket's worth of tokens consumed — the same quantum as the "fresh bucket on failover" semantic. It is not a safety violation, but it could allow a short burst past the configured limit during reconciliation.

Fix this →


Overall

The comment-only fix in af2d12ba is correct and complete — all stale references to the old on-hot-path sweep design are gone. The one remaining open item is the Codex P1 CompareAndDelete gap in loadOrInit, which predates this commit. Once that is addressed (or explicitly deferred with a tracking comment), the implementation is ready.

bootjp added a commit that referenced this pull request Apr 27, 2026
Three medium-priority Gemini findings + one cyclop fix from the
rebase onto the updated PR #679 throttling branch.

(1) sqsErrInvalidParameterValue removed (sqs_catalog.go,
sqs_partitioning.go, sqs_partitioning_test.go): the constant
duplicated sqsErrValidation which already carries
"InvalidParameterValue". Reuses the existing constant; comment on
the validator clarifies the choice.

(2) snapshotImmutableHTFIFO heap allocation gated on
htfifoAttributesPresent (sqs_catalog.go): the snapshot is only
needed when the request actually carries an HT-FIFO attribute. A
SetQueueAttributes that touches only mutable attributes (e.g.
VisibilityTimeout — the common path) now skips the allocation
entirely. The validator short-circuit on preApply == nil keeps
correctness identical: a request with no HT-FIFO attribute can
never change an HT-FIFO field.

(3) partitionFor inlined FNV-1a (sqs_partitioning.go): the
hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap-
allocates the byte slice and pays the hash.Hash interface dispatch
on every SendMessage. Replaced with a manual FNV-1a loop over the
string — measurably faster on the routing hot path. MessageGroupId
is capped at 128 chars by validation so the loop is bounded.

(4) cyclop (trySetQueueAttributesOnce): the conditional preApply
branch + immutability check + Throttle validator pushed cyclop to
12. Extracted applyAndValidateSetAttributes so the function stays
under the ceiling; the helper is also useful for any future
admin-surface callers that want the same apply+validate flow
without the OCC dispatch.

All tests pass under -race; golangci-lint clean.
…(PR #679 round 4)

Round-4 Claude review on PR #679 surfaced a TOCTOU race in the
loadOrInit reconciliation path the round-1 fix introduced.

The race: two concurrent goroutines hit the same stale bucket. A
detects mismatch, Delete + LoadOrStore stores freshA. B (which
also Loaded the stale bucket before A's Delete) then runs its own
Delete -- which under the previous unconditional Delete would
evict freshA. B's LoadOrStore then stores freshB. Net effect: a
charge() that obtained freshA via the in-flight LoadOrStore from
A's path keeps charging a bucket that is no longer in the map,
while subsequent requests get freshB at full capacity. Total
tokens consumed across the mismatch window can exceed the new
capacity by one bucket's worth.

Fix: replace the unconditional b.buckets.Delete(key) with
b.buckets.CompareAndDelete(key, v) where v is the original stale
pointer. CompareAndDelete is a no-op when the map already holds
someone else's fresh bucket, so the racer that arrives second
takes the winner's fresh bucket via LoadOrStore (which returns
the existing entry) instead of orphaning it.

Test: TestBucketStore_ConcurrentReconciliationRespectsNewCapacity
seeds a stale bucket (cfgOld, drained), then races 200 goroutines
through a charge with cfgNew (capacity 50). Asserts exactly 50
successes -- a Delete-after-replace race would let some past the
cap. Runs under -race; the new test plus the existing
TestBucketStore_ConcurrentChargesPreserveCount and
TestBucketStore_ReconcilesBucketOnConfigChange cover the
concurrent-fresh-bucket, concurrent-stale-mismatch, and
sequential-mismatch cases.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 4 review の TOCTOU 指摘に対応(29c72960):

Medium — loadOrInit reconciliation の Delete-after-replace race

ご指摘の通り、round 1 で導入した reconciliation path で unconditional Delete が concurrent fresh bucket を evict してしまう race がありました。Goroutine A が stale を Delete + LoadOrStore で freshA に置換した直後、A の Delete 前に同じく stale を Load した goroutine B が Delete を実行 → freshA が evict される、という flow。さらに A の LoadOrStore から freshA pointer を受け取って既に charge を進めている第三の goroutine がいる場合、map から消えた freshA を charge し続け、後続 request は freshB を full capacity で受け取る、という形で mismatch window 中に new capacity 分の tokens を超えて消費されうる状態でした。

Fix: b.buckets.Delete(key)b.buckets.CompareAndDelete(key, v) (Go 1.20+, project は 1.25+ なので OK)。v は元の stale pointer なので、map がすでに他の goroutine の fresh bucket を持っている場合 CompareAndDelete は no-op になり、後続の LoadOrStore がその fresh bucket を返して B 側の freshB は捨てられます。Map が常に "first-writer-wins" を維持できるように。

新規テスト TestBucketStore_ConcurrentReconciliationRespectsNewCapacity:

  1. cfgOld (cap=5) で stale bucket を seed して drain
  2. cfgNew (cap=50) で 200 goroutines が同時に charge を発行
  3. 全部終わった時点で successes ちょうど 50 を assert

-race clean、unconditional Delete に戻すと test が落ちる(余分な成功が出る)ことを確認しました。

Verified: go test -race 全 throttle テスト clean、golangci-lint run clean。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 2m 41s —— View job


Review of 29c72960 — Round 5

Tasks

  • Gather context and understand request
  • Read adapter/sqs_throttle.go — verify CompareAndDelete fix in loadOrInit
  • Read adapter/sqs_throttle_test.go — verify new concurrency test
  • Check for new issues introduced by the fix
  • Post final review

Round 4 Fix Verified ✅

Fix — loadOrInit reconciliation guards with CompareAndDelete (sqs_throttle.go:264)

The one-line change is correct and complete. Using b.buckets.CompareAndDelete(key, v) (where v is the pointer loaded at line 238) instead of the unconditional b.buckets.Delete(key) eliminates the TOCTOU race:

  • Goroutine A sees stale v, CompareAndDelete(key, v) succeeds (map still holds v), LoadOrStore(key, freshA) stores freshA.
  • Goroutine B also loaded stale v, CompareAndDelete(key, v) is a no-op (map now holds freshA, not v), LoadOrStore(key, freshB) returns freshA and discards freshB.
  • Goroutine C which already received freshA from A's LoadOrStore charges freshA. B also charges freshA.
  • Result: all concurrent racers share a single fresh bucket. Total successful charges = new capacity. ✓

The comment block (lines 252–263) accurately describes the race and the fix. The reference (Claude P1 on PR #679 round 4 caught this.) is consistent with the existing comment attribution style in this file (// Codex P1 on PR #679, // Gemini high on PR #679).

Test — TestBucketStore_ConcurrentReconciliationRespectsNewCapacity (sqs_throttle_test.go:269-308)

The test is well-designed and correctly pins the invariant:

  • Deterministic capacity: clock is frozen (time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC)), so no refill occurs during the race. cfgNew.SendCapacity = 50 means exactly 50 tokens are ever available.
  • Sufficient goroutine count: 200 goroutines vs. 50-token capacity creates strong contention — any Double-Replace race would produce > 50 successes, caught by require.EqualValues(t, 50, successes).
  • Regression detection: reverting to unconditional Delete makes the assertion fail (extra goroutines charge orphaned buckets at full capacity).
  • The wg.Wait() before the assertion ensures all goroutines complete before counting. The sync.Mutex counter is correct (could use atomic.Int64 instead, but there's no correctness issue).

Three Delete usages are all correct (verified at lines 264, 293, 335):

Site Type Rationale
loadOrInit CompareAndDelete(key, v) Conditional — only evict the specific stale pointer; leaves any already-replaced fresh bucket alone.
invalidateQueue Delete(key) unconditional Intentional full eviction after a config commit; the next request rebuilds from fresh meta.
sweep Delete(k) unconditional Idle eviction on the sole background goroutine; no concurrent sweep races.

No New Issues Found

The fix is minimal and surgical — exactly one call site changed, one test added. No other paths were touched. The comment is accurate; the test is deterministic and covers the race that the old code exposed.


Overall

All five rounds of findings have been resolved:

Round Issue Status
1 Codex P1 — bucket reconciliation on stale config e3530e9
1 Codex P1 — conditional invalidation on throttle-attr change e3530e9
1 Codex P2 — attributesEqual covers Throttle e3530e9
1 Gemini high — thread throttle through existing meta load e3530e9
1 Gemini high — move sweep off hot path e3530e9
1 Gemini medium — cap retry-after duration e3530e9
2 Dead sweepMu/lastSweep fields c6d3a68
2 throttleConfigEqual not exercised by idempotency test c6d3a68
3 Stale constant comments describing old hot-path sweep af2d12b
4 loadOrInit Delete+LoadOrStore TOCTOU (CompareAndDelete) 29c7296

The only acknowledged but deferred items are: token-before-validation ordering in validateSend (minor AWS contract delta, follow-up), Prometheus metrics (§4.1, deferred), §6.5 cross-protocol parity test (blocked on PR #662), and §6.6 failover behaviour test (needs Jepsen scaffolding). All are documented in the PR description's "Out of scope" section.

The implementation is ready.

bootjp added 3 commits April 27, 2026 20:22
Implements PR 2 of the §11 multi-PR rollout from
docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. The schema
fields land on sqsQueueMeta with the design's wire types; the
validator enforces the §3.2 cross-attribute rules; a temporary
CreateQueue gate rejects PartitionCount > 1 until PR 5 lifts the
gate atomically with the data-plane fanout.

Schema (sqs_catalog.go):
- sqsQueueMeta gains three optional fields:
  * PartitionCount uint32 — number of FIFO partitions; 0/1 means
    legacy single-partition; > 1 enables HT-FIFO.
  * FifoThroughputLimit string — "perMessageGroupId" (default for
    HT-FIFO) or "perQueue" (collapses every group to partition 0).
  * DeduplicationScope string — "messageGroup" (default for HT-FIFO)
    or "queue" (legacy single-window).
- attributesEqual extended via baseAttributesEqual + htfifoAttributesEqual
  split (kept under cyclop ceiling).
- queueMetaToAttributes surfaces the configured HT-FIFO fields via
  addHTFIFOAttributes so GetQueueAttributes("All") round-trips.

Routing (sqs_partitioning.go, new):
- partitionFor(meta, messageGroupId) uint32 implements §3.3:
  FNV-1a over MessageGroupId & (PartitionCount - 1). Edge cases
  documented in the godoc — PartitionCount 0/1 → 0, FifoThroughputLimit
  perQueue short-circuits to 0, empty MessageGroupId → 0.
- The bitwise-mask optimisation requires PartitionCount be a power
  of two; the validator enforces it. A future bug that leaks a
  non-power-of-two value is caught by TestPartitionFor_PowerOfTwoMaskingMatchesMod.

Validation (sqs_partitioning.go):
- validatePartitionConfig enforces the §3.2 cross-attribute rules:
  * PartitionCount must be a power of two in [1, htfifoMaxPartitions=32].
  * FifoThroughputLimit / DeduplicationScope are FIFO-only.
  * {PartitionCount > 1, DeduplicationScope = "queue"} rejects with
    InvalidParameterValue (incoherent params, vs InvalidAttributeValue
    for malformed individual values) per §3.2 cross-attribute gate.
- Runs in parseAttributesIntoMeta (after resolveFifoQueueFlag, so
  the IsFIFO check sees the post-resolution flag) and in
  trySetQueueAttributesOnce (after applyAttributes; IsFIFO comes
  from the loaded meta).
- validatePartitionImmutability is the §3.2 rule that PartitionCount,
  FifoThroughputLimit, and DeduplicationScope are immutable from
  CreateQueue. SetQueueAttributes is all-or-nothing: a request that
  touches a mutable attribute alongside an attempted immutable
  change rejects the whole request before persisting either.
- snapshotImmutableHTFIFO captures the pre-apply values so the
  immutability check has a clean before/after pair.

Dormancy gate (sqs_partitioning.go + sqs_catalog.go):
- validatePartitionDormancyGate is the §11 PR 2 temporary gate that
  rejects CreateQueue with PartitionCount > 1 with
  InvalidAttributeValue("PartitionCount > 1 requires HT-FIFO data
  plane — not yet enabled"). The schema field exists in the meta
  type but no partitioned data can land. Removed in PR 5 in the same
  commit that wires the data-plane fanout — gate-and-lift is atomic
  so a half-deployed cluster can never accept a partitioned queue
  without the data plane to serve it.

Tests:
- adapter/sqs_partitioning_test.go: 14 unit tests covering
  partitionFor (legacy zero/one, perQueue short-circuit, empty
  MessageGroupId, determinism, distribution within ±5% across 8
  partitions on 100k samples, power-of-two masking < N), isPowerOfTwo,
  validatePartitionConfig (power-of-two, max cap, FIFO-only,
  cross-attr InvalidParameterValue, single-partition + queue-dedup OK),
  validatePartitionDormancyGate (rejects > 1, allows 0/1),
  validatePartitionImmutability (per-attribute change rejects, same-
  value no-op succeeds), htfifoAttributesPresent.
- adapter/sqs_partitioning_integration_test.go: 7 end-to-end tests
  against a real createNode cluster covering the wire surface:
  dormancy gate rejects PartitionCount > 1 (and the gate's reason
  surfaces to the operator), allows PartitionCount=1, validator
  rejects non-power-of-two, FIFO-only rejection on Standard queue,
  cross-attr {partitions, queue dedup} rejects, immutability rejects
  SetQueueAttributes change with same-value no-op succeeds, all-or-
  nothing — combined mutable+immutable rejects entirely without
  persisting the mutable change, GetQueueAttributes round-trip.
- All tests pass under -race; golangci-lint clean.

Out of scope for this PR (§11 PR 3-8):
- PR 3: keyspace threading — partitionIndex through every sqsMsg*Key
  constructor, defaulting to 0 so existing queues stay byte-identical.
  Gate from PR 2 still in place.
- PR 4: routing layer + --sqsFifoPartitionMap flag + mixed-version
  gate (§8.5 capability advertisement + §8 leadership-refusal hook
  in kv/lease_state.go). Gate still in place.
- PR 5: send/receive partition fanout, receipt-handle v2, removes
  the dormancy gate atomically with the data-plane fanout.
- PR 6: PurgeQueue/DeleteQueue partition iteration + tombstone +
  reaper.
- PR 7: Jepsen HT-FIFO workload + metrics.
- PR 8: doc lifecycle bump.
Three medium-priority Gemini findings + one cyclop fix from the
rebase onto the updated PR #679 throttling branch.

(1) sqsErrInvalidParameterValue removed (sqs_catalog.go,
sqs_partitioning.go, sqs_partitioning_test.go): the constant
duplicated sqsErrValidation which already carries
"InvalidParameterValue". Reuses the existing constant; comment on
the validator clarifies the choice.

(2) snapshotImmutableHTFIFO heap allocation gated on
htfifoAttributesPresent (sqs_catalog.go): the snapshot is only
needed when the request actually carries an HT-FIFO attribute. A
SetQueueAttributes that touches only mutable attributes (e.g.
VisibilityTimeout — the common path) now skips the allocation
entirely. The validator short-circuit on preApply == nil keeps
correctness identical: a request with no HT-FIFO attribute can
never change an HT-FIFO field.

(3) partitionFor inlined FNV-1a (sqs_partitioning.go): the
hash/fnv.New64a + h.Write([]byte(messageGroupID)) path heap-
allocates the byte slice and pays the hash.Hash interface dispatch
on every SendMessage. Replaced with a manual FNV-1a loop over the
string — measurably faster on the routing hot path. MessageGroupId
is capped at 128 chars by validation so the loop is bounded.

(4) cyclop (trySetQueueAttributesOnce): the conditional preApply
branch + immutability check + Throttle validator pushed cyclop to
12. Extracted applyAndValidateSetAttributes so the function stays
under the ceiling; the helper is also useful for any future
admin-surface callers that want the same apply+validate flow
without the OCC dispatch.

All tests pass under -race; golangci-lint clean.
…eanup

Three items from the round-2 Claude review on PR #681:

(1) Bug (latent, live in PR 5): validatePartitionConfig only guarded
FifoThroughputLimit / DeduplicationScope as FIFO-only, not
PartitionCount > 1. A Standard queue with PartitionCount=2 would
slip past the validator after PR 5 lifts the dormancy gate. Added
a guard inside the !meta.IsFIFO block: PartitionCount > 1 →
InvalidAttributeValue("PartitionCount > 1 is only valid on FIFO
queues"). PartitionCount 0 / 1 stay accepted on Standard (both
mean "single-partition layout"). Test
TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs
extended to cover PartitionCount=2/4/8/16/32 → reject and
PartitionCount=0/1 → accept.

(2) Low: TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned
had two contradictory comments. The function-level doc said "Test
sets PartitionCount=1 to bypass dormancy" but the test actually
sends PartitionCount=2; the inline comment claimed "dormancy fires
first" but validatePartitionConfig (inside parseAttributesIntoMeta)
runs before validatePartitionDormancyGate, so the cross-attr rule
fires first. Rewrote the doc to describe the actual control-plane
order (validatePartitionConfig → cross-attr rejection → wire 400)
and removed the misleading "dormancy fires first" assertion.

(3) Nit: replaced the custom 11-line contains/indexOf helpers with
strings.Contains. Identical behaviour, idiomatic.

All findings noted in the round-2 Claude review on PR #681.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 29c7296028

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_catalog.go Outdated
// Default* covers any future non-Send/Recv verb that gets wired
// into the throttle path — no batch verb is in scope today, so the
// minimum-batch-capacity check is off for this bucket.
if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, false); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Enforce batch-capacity floor on fallback Default throttle bucket

validateThrottleConfig disables the batch-capacity check for ThrottleDefault* (requireBatchCapacity=false), but resolveActionConfig routes Send/Receive traffic to Default whenever Send*/Recv* are unset. That means configs like ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 are accepted, yet full SendMessageBatch/DeleteMessageBatch requests (charge=10) can never succeed because the bucket can never accumulate 10 tokens. This creates a permanently unserviceable, hard-throttled batch path from a configuration the validator currently allows.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_throttle.go Outdated
idle := bucket.lastRefill.Before(cutoff)
bucket.mu.Unlock()
if idle {
b.buckets.Delete(k)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Make idle-sweep eviction conditional on unchanged bucket entry

In sweep, idle is computed under bucket.mu, but deletion happens later via unconditional b.buckets.Delete(k). A concurrent charge can run between unlock and delete, refresh lastRefill, and deduct tokens; the subsequent delete then drops that live bucket and the next request recreates a full-capacity bucket, effectively discarding recent deductions and allowing excess throughput. Eviction should use a compare-and-delete (or recheck under lock) so only the same still-idle entry is removed.

Useful? React with 👍 / 👎.

bootjp added 2 commits April 27, 2026 20:43
…2) (#681)

## Summary

Phase 3.D **PR 2** (schema + validators + dormancy gate) per the §11
multi-PR rollout in
[`docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md`](https://github.com/bootjp/elastickv/blob/docs/sqs-phase3-proposals/docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md)
(currently on PR #664).

Stacks on **PR #679 (Phase 3.C)**: branched off
`feat/sqs-throttling-phase3c` so the 3.D schema work shares the same
base as the throttling implementation. Will be rebased once PR #679
merges to main.

- `sqsQueueMeta` gains `PartitionCount uint32`, `FifoThroughputLimit
string`, `DeduplicationScope string`.
- `partitionFor(meta, messageGroupId) uint32` implements §3.3 — FNV-1a
over `MessageGroupId`, masked by `PartitionCount-1` (validator-enforced
power-of-two so the bitwise mask is equivalent to mod). Edge cases:
`PartitionCount` 0/1 → 0, `FifoThroughputLimit=perQueue` short-circuits
to 0, empty `MessageGroupId` → 0.
- `validatePartitionConfig` enforces the §3.2 cross-attribute rules:
power-of-two `[1, htfifoMaxPartitions=32]`,
`FifoThroughputLimit`/`DeduplicationScope` are FIFO-only,
`{PartitionCount > 1, DeduplicationScope = "queue"}` rejects with
`InvalidParameterValue` at control-plane time so the operator sees the
error before the first `SendMessage`.
- `validatePartitionImmutability` enforces the §3.2 immutability rule
(the three HT-FIFO fields are immutable from `CreateQueue` onward).
`SetQueueAttributes` is **all-or-nothing** — a request that touches a
mutable attribute alongside an attempted immutable change rejects the
whole request before persisting either.
- `validatePartitionDormancyGate` is the §11 PR 2 temporary gate:
`CreateQueue` with `PartitionCount > 1` rejects with
`InvalidAttributeValue("PartitionCount > 1 requires HT-FIFO data plane —
not yet enabled")`. The schema field exists in the meta type but no
partitioned data can land. Removed in PR 5 in the same commit that wires
the data-plane fanout — gate-and-lift is **atomic** so a half-deployed
cluster can never accept a partitioned queue without the data plane to
serve it.
- `GetQueueAttributes("All")` round-trips the configured HT-FIFO fields
via `addHTFIFOAttributes`.

## Test plan

- [x] `adapter/sqs_partitioning_test.go` — 14 unit tests covering
`partitionFor` (legacy zero/one, perQueue short-circuit, empty
MessageGroupId, determinism, distribution within ±5% across 8 partitions
on 100k samples, power-of-two masking always < N), `isPowerOfTwo`,
`validatePartitionConfig` (power-of-two, max cap, FIFO-only, cross-attr
`InvalidParameterValue`, single-partition + queue-dedup OK),
`validatePartitionDormancyGate` (rejects > 1, allows 0/1, gate reason
surfaces to the operator), `validatePartitionImmutability`
(per-attribute change rejects, same-value no-op succeeds),
`htfifoAttributesPresent`.
- [x] `adapter/sqs_partitioning_integration_test.go` — 7 end-to-end
tests: dormancy gate rejects `PartitionCount > 1` (and the gate's "not
yet enabled" message surfaces), allows `PartitionCount=1`, validator
rejects non-power-of-two, FIFO-only rejection on Standard queue,
cross-attr `{PartitionCount=2, DeduplicationScope=queue}` rejects,
immutability `SetQueueAttributes` change rejects with same-value no-op
succeeding, all-or-nothing combined mutable+immutable rejects without
persisting the mutable change, `GetQueueAttributes` round-trip omits
unset fields.
- [x] All tests pass under `-race`.
- [x] `golangci-lint run ./adapter/...` → 0 issues.

## Self-review (5 lenses)

1. **Data loss** — Schema fields land on `sqsQueueMeta` and are
persisted via the existing OCC `Put` on `sqsQueueMetaKey`. The dormancy
gate runs at `CreateQueue` admission time; once PR 5 lifts the gate, no
schema migration is needed (existing queues have `PartitionCount=0`
which is equivalent to 1). The keyspace stays untouched in this PR (PR 3
threads `partitionIndex` through `sqsMsg*Key`).
2. **Concurrency / distributed failures** — Validation is purely
function-of-input; no shared state. The immutability check loads the
meta in the same `nextTxnReadTS` snapshot that `applyAttributes` uses,
so a concurrent `SetQueueAttributes` from another writer is caught by
the existing OCC `StartTS + ReadKeys` fence (already in place for
`setQueueAttributesWithRetry`). Dormancy gate runs at `CreateQueue`
admission so a concurrent gate-lift mid-request is impossible.
3. **Performance** — `partitionFor` is one FNV-1a hash + one bitwise
mask; constant time. `validatePartitionConfig` is field-comparison only,
runs once per `CreateQueue` / `SetQueueAttributes`.
`addHTFIFOAttributes` adds at most three map writes and only when fields
are non-empty. Hot path `SendMessage` / `ReceiveMessage` is unaffected
by this PR (no routing changes — those land in PR 5).
4. **Data consistency** — Cross-attribute rule `{partitions, queue
dedup}` rejects at control-plane time so an operator can never end up
with a created-but-unserviceable queue (the runtime path that would have
surfaced the same error at first send is unreachable). Immutability rule
prevents mid-life routing-key reconfiguration that would silently
violate within-group FIFO ordering.
5. **Test coverage** — 14 unit + 7 integration tests cover the
validator's full decision tree (each rule has a dedicated table-driven
case), the dormancy gate's accept/reject paths with the operator-visible
reason, the immutability all-or-nothing semantics, and `partitionFor`'s
edge cases plus a 100k-sample distribution test that catches non-uniform
routing immediately. Lint is clean.

## Out of scope (subsequent §11 rollout PRs)

- **PR 3** — keyspace threading: `partitionIndex` through every
`sqsMsg*Key` constructor, defaulting to 0 so existing queues stay
byte-identical. Dormancy gate from this PR remains in place.
- **PR 4** — routing layer + `--sqsFifoPartitionMap` flag +
mixed-version gate (§8.5 capability advertisement + §8
leadership-refusal hook in `kv/lease_state.go`). Dormancy gate still in
place.
- **PR 5** — send/receive partition fanout + receipt-handle v2 codec.
**Removes the dormancy gate atomically with the data-plane fanout** so
the gate-and-lift land in one commit.
- **PR 6** — `PurgeQueue` / `DeleteQueue` partition iteration +
tombstone schema update + reaper.
- **PR 7** — Jepsen HT-FIFO workload + metrics.
- **PR 8** — partial-doc lifecycle bump.

## Branch base

Branched off `feat/sqs-throttling-phase3c` (PR #679) which itself
branches off `docs/sqs-phase3-proposals` (PR #664). Will rebase against
`main` after both ancestors merge.
…round 5)

Two findings from the round-5 Codex review:

(1) P1 — Default* batch-capacity floor:
validateThrottleConfig disabled the >=10 batch floor for
ThrottleDefault* on the assumption that Default never serves a batch
verb. But resolveActionConfig in sqs_throttle.go falls Send and
Receive traffic through to the Default bucket whenever the
corresponding Send*/Recv* pair is unset. So a config like
ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 was
accepted at the validator, then made every full SendMessageBatch /
DeleteMessageBatch (charge=10) permanently unserviceable because the
Default bucket could never accumulate 10 tokens. Flipped the third
validateThrottlePair call's requireBatchCapacity from false to true;
updated the inline comment to spell out why Default* needs the floor
(the design doc note in §3.2 about Default* being exempt was wrong
about the fall-through). New unit test
TestValidateThrottleConfig_DefaultBucketBatchFloor covers
DefaultCapacity=1/5 reject and DefaultCapacity=10 accept.

(2) P2 — sweep eviction race:
Earlier sweep computed idle under bucket.mu, released the lock,
then unconditionally Deleted the entry. A concurrent charge() in
that window could refill+take a token, then Delete evicted the
just-used bucket and the next request got a fresh full-cap bucket
— the just-taken token was effectively undone, allowing excess
throughput. Fixed in two parts:

- Hold bucket.mu across the Delete so the idle observation cannot
  be invalidated between check and delete. A concurrent charge()
  that wants the bucket either runs to completion before sweep
  acquires mu (sweep then sees the freshened lastRefill and skips
  the delete) or waits for sweep to release mu (charge then takes
  the bucket — but sweep has already removed it from the map, so
  the charge succeeds against an orphan bucket and only the
  next-after-charge request gets the full-cap rebuild — bounded
  one-token leak).
- CompareAndDelete(k, v) ensures sweep does not evict a
  replacement bucket inserted by invalidateQueue or a future
  reconciliation path.

Holding bucket.mu across sync.Map.Delete is safe — sync.Map.Load
is wait-free on the read-only path and never blocks on bucket.mu,
so there is no AB-BA cycle with charge().

New regression test TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket
backdates a bucket's lastRefill by 2h, advances the clock by 2h to
force the bucket past the idle cutoff, then races sweep against a
50-iteration charge loop. After both finish the bucket must still
be in the store — at least one charge interleaves with sweep's
re-check window and sweep observes the freshened lastRefill.

All tests pass under -race; golangci-lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 5 Codex の P1 + P2 を対応(576cf8c3):

P1 — Default batch-capacity floor*
ご指摘どおり、resolveActionConfig が Send/Recv 未設定時に Default* に fall through するので、ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1 は validator を通っても full SendMessageBatch/DeleteMessageBatch (charge=10) を永久に処理できなくなる構成でした。validateThrottlePair("ThrottleDefault", ..., requireBatchCapacity=true) に変更し、Default* も >=10 floor を適用するように。インラインコメントで「Send/Recv が unset の時 Default に fall through するので、no batch in scope の前提が崩れている」点を明記しています。TestValidateThrottleConfig_DefaultBucketBatchFloorDefaultCapacity=1/5 reject、=10 accept を pin。

(なお元の §3.2 デザインドキュメントの "Default* is exempt because the catch-all action set has no batch verb in scope for this PR" 注記は誤りで、コードの fall-through で Send/Recv batch verb が Default に流れる経路を見落としていました。次回 docs/sqs-phase3-proposals 系 PR に follow-up doc fix を入れます。)

P2 — sweep eviction race
ご指摘どおり、旧 sweep は idle を bucket.mu 下で計算 → unlock → unconditional Delete という形で、unlock〜Delete 間に走った concurrent charge() が refill+take した直後に Delete が走ると、token を取った bucket が evict され、次の request が full-cap fresh bucket を受け取って取った token が事実上巻き戻る、という race がありました。

修正は2点:

  • bucket.mu を Delete 横断で保持 → 並行 charge() は (a) sweep が mu を取る前に完了して sweep が freshened lastRefill を観測し evict 不要、または (b) sweep の release を待ってから charge → bucket は既に map から消えているので orphan bucket に対して charge 成立、bounded one-token leak。どちらの fork も "live bucket を evict して deductions を失う" 経路を排除。
  • b.buckets.Deleteb.buckets.CompareAndDelete(k, v) に変更し、invalidateQueue や将来の reconciliation path が差し替えた replacement bucket を sweep が誤って evict しない安全側の guard を追加。

bucket.mu を sync.Map.Delete 横断で持つのは安全(sync.Map.Load が wait-free な read-only path で bucket.mu で blocking しないので AB-BA cycle ない)。

新規 regression test TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket: bucket の lastRefill を 2h 前に backdate → clock を 2h 進めて idle cutoff 越えに → sweep と 50回 charge を並行実行 → 終了後 bucket がまだ store にあることを assert。

Verified: go test -race ./adapter/... (throttle tests 全部) clean、golangci-lint run clean。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 5m 46s —— View job


Review of 576cf8c3 — Round 5

Tasks

  • Gather context and understand request
  • Read adapter/sqs_throttle.go — verify P2 sweep eviction race fix
  • Read adapter/sqs_catalog.go — verify P1 Default* batch-capacity floor fix
  • Read test files — verify new regression tests
  • Check for new issues introduced by fixes
  • Post final review

Round 5 Fixes Verified ✅

P1 — Default batch-capacity floor* (sqs_catalog.go:656)

The one-character change (requireBatchCapacity: false → true) is correct, and the reason is accurately documented in the updated comment. The fall-through in resolveActionConfig that makes this necessary was confirmed by reading the message-plane handlers:

  • deleteMessageBatch uses bucketActionReceive (charge up to 10) — if RecvCapacity is unset, falls through to Default
  • changeMessageVisibilityBatch uses bucketActionReceive (charge up to 10) — same fall-through

So Default* must accommodate max-10 batch charges from at least three verbs (send, delete, changeVisibility), not just the originally-assumed "future verb". The fix is conservative and correct.

TestValidateThrottleConfig_DefaultBucketBatchFloor is an effective regression test: the old test had require.NoError for DefaultCapacity=1 — the new test has require.Error for the same input. Any revert to requireBatchCapacity=false will flip that assertion back.

P2 — Sweep eviction race (sqs_throttle.go:354-358)

The implementation is:

bucket.mu.Lock()
if bucket.lastRefill.Before(cutoff) {
    b.buckets.CompareAndDelete(k, v)
}
bucket.mu.Unlock()

This is correct. Holding bucket.mu across CompareAndDelete makes the idle-check + delete atomic with respect to charge(), which also acquires bucket.mu. Lock-order analysis confirms no AB-BA cycle:

  • charge() path: sync.Map.Load/LoadOrStore (internal sync.Map.mu, released before return) → then bucket.mu.Lock() — never holds sync.Map.mu while blocking on bucket.mu
  • sweep() path: bucket.mu.Lock()CompareAndDelete (acquires sync.Map.mu)
  • Range callback runs after Range's own sync.Map.mu acquisition is released, so CompareAndDelete in the callback can freely acquire it

No goroutine holds sync.Map.mu while waiting for bucket.mu. No deadlock.

CompareAndDelete(k, v) using the snapshot v from the Range callback is also correct for the invalidateQueue interaction: if invalidateQueue evicts + a fresh LoadOrStore inserts v2 before the callback runs, CompareAndDelete(k, v) is a no-op (the map holds v2, not v), protecting the new bucket.


One Issue Found

Low — TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket assertion is vacuously true

The test comment states: "If the old code (compute-then-delete-without-recheck) regresses, the bucket may be evicted and the assertion fails."

This claim is incorrect. The stillThere assertion passes regardless of old vs new code:

  • Old code (unlock → unconditional Delete): if sweep evicts the bucket mid-loop, the NEXT sequential iteration of for range 50 calls store.charge(cfg, "orders", ...)loadOrInitsync.Map.Load misses → LoadOrStore creates a new bucket → bucket is back in the store before wg.Wait().
  • New code: same net result — either sweep skips (path a) or evicts + charge recreates (path b).

The old logical bug was "token deduction undone" (an active bucket gets evicted, then the next request gets a fresh full-capacity bucket, effectively reversing the deduction). The stillThere check doesn't pin that invariant — it would need a token-count assertion after the race:

// Count total successes: must not exceed capacity + one orphan-bucket window.
// A `unlock → Delete` regression lets the bucket be evicted while tokens
// are being held, then the next request sees a fresh full-capacity bucket,
// so successes can reach 2 × capacity.
var successes atomic.Int64
// ... (run charges as before, incrementing successes on allowed=true)
wg.Wait()
require.LessOrEqual(t, successes.Load(), int64(cfg.SendCapacity)+1,
    "token deduction must not be silently undone by sweep-evict race")

The test still has value as a -race stress test (the implementation does have lock/unlock patterns that a race detector should exercise). However the comment overstates its regression coverage.

Fix this →


Other Observations

resolveActionConfig fall-through is complete: only bucketActionSend and bucketActionReceive have named buckets; all other actions (including future verbs) fall through to Default. The throttleAllActions slice (["Send", "Receive", "*"]) covers all three cases in invalidation — consistent with the three-bucket vocabulary.

No concerns with the comment block on sweep() (lines 327–349). Both paths (a) and (b) are correctly described, and the AB-BA safety argument is sound.

validateThrottlePair with both-zero still short-circuits before the batch-capacity check, so Default*(0, 0) (unset Default) is still accepted. Only a configured Default with capacity < 10 is now rejected.


Summary

Both Round 5 fixes are correctly implemented. The P2 regression test has a weak assertion (it always passes regardless of old/new code), which is worth correcting but is low priority — the -race value is real, only the comment's claim about "regressing the old code makes it fail" is inaccurate.

All ten findings from rounds 1–5 are now addressed:

Round Issue Status
1–4 (all prior items)
5 P1 — Default* batch floor 576cf8c
5 P2 — sweep eviction race (bucket.mu held across CompareAndDelete) 576cf8c
5 P2 test assertion vacuously true ⚠️ Low — noted above

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 576cf8c39e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_throttle.go
Comment on lines +355 to +357
if bucket.lastRefill.Before(cutoff) {
b.buckets.CompareAndDelete(k, v)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent sweep from orphaning buckets with blocked chargers

Even with CompareAndDelete, this eviction path can still break throttle correctness because callers may have already loaded v from sync.Map and be blocked on bucket.mu while sweep deletes the entry. After unlock, those goroutines continue charging the now-orphaned bucket, while subsequent requests allocate a fresh full-capacity bucket in the map, so a burst arriving near an idle sweep can exceed configured capacity. This is especially visible when a queue wakes up after being idle and many requests race at once.

Useful? React with 👍 / 👎.

Comment thread adapter/sqs_messages.go
Comment on lines +332 to +333
if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle) {
return nil, 0, 0, false
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid charging with stale queue metadata after delete races

This charges using meta.Throttle loaded earlier, so if DeleteQueue commits between the meta read and this call, the in-flight request can recreate a throttle bucket after deleteQueue already invalidated it. Because buckets are keyed by queue name/action (not generation), a same-name recreate can inherit stale token state instead of starting fresh, violating the delete+recreate reset behavior the invalidation is meant to guarantee.

Useful? React with 👍 / 👎.

… round 6)

Round 5's sweep+CompareAndDelete fix closed half the eviction race —
sweep can no longer evict a bucket whose lastRefill was freshened by a
concurrent charge — but left a second window. Goroutines that loaded
the bucket from sync.Map *before* sweep removed it from the map were
still blocked on bucket.mu, and once sweep released mu they would
charge the orphan bucket while later requests created a fresh
full-capacity entry via loadOrInit. A burst aligned with a sweep tick
could therefore consume up to 2x the configured capacity.

Three changes plug the window:

- tokenBucket gains an `evicted bool` field, protected by mu.
- sweep, invalidateQueue, and the loadOrInit reconciliation path all
  set evicted=true under mu *after* successfully removing the bucket
  from the map (CompareAndDelete success or LoadAndDelete return).
- charge() loops on a chargeBucket helper that checks evicted under
  mu and signals retry; the caller drops the orphan reference and
  loops back through loadOrInit, which converges on whatever live
  bucket the map now holds.

CreateQueue now calls invalidateQueue on a genuine commit
(tryCreateQueueOnce post-Dispatch) — DeleteQueue invalidated, but a
sendMessage holding pre-delete meta could recreate a bucket between
DeleteQueue's invalidate and the next CreateQueue commit, so a same-
name recreate would inherit used token state. Invalidating after the
genuine create resets the bucket regardless of in-flight traffic to
the prior incarnation. The idempotent-return path (existing queue
with identical attributes) deliberately skips invalidation.

Tests:

- TestBucketStore_SweepRaceDoesNotInflateBudget: -race stress test
  that runs 4 sweeps against 64 chargers on a single backdated
  bucket; total successful charges must stay <= capacity. The old
  code path could yield up to 2x capacity.
- TestBucketStore_OrphanedBucketRetriesToLiveEntry: deterministic
  exercise of the evicted-flag retry — sweep evicts, the next
  charge allocates a fresh bucket via loadOrInit.
- TestBucketStore_InvalidateMarksOrphanEvicted: pins the round-6
  invalidateQueue change (LoadAndDelete + flag set under mu).
- The pre-existing TestBucketStore_SweepDoesNotEvictRecentlyUsedBucket
  is replaced; the prior assertion (stillThere) was vacuously true
  because the for-range 50 charge loop would always recreate the
  bucket via loadOrInit after any eviction (Claude Low on round 5).

go test -race -count=20 of the new tests passes; full SQS test
package passes under -race; golangci-lint clean.

Round-6 review pulled four other items not actioned:

- Codex P2 "CreateQueue idempotency must compare Throttle" — already
  addressed: attributesEqual at sqs_catalog.go:726 calls
  baseAttributesEqual + throttleConfigEqual + htfifoAttributesEqual.
- Gemini high "chargeQueue performs redundant storage read" — already
  addressed: hot-path handlers (sendMessage, receiveMessage) call
  chargeQueueWithThrottle to skip the second meta load.
- Gemini high "maybeSweep on hot path" — already addressed:
  runSweepLoop runs sweep on a background ticker wired to reaperCtx.
- Gemini medium "cap retryAfter at 10ms to prevent overflow" —
  already addressed: throttleRetryAfterCap=1h cap before multiply
  prevents overflow; 10ms cap is inappropriate for SQS rate-limit
  semantics (clients would retry constantly under contention).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 6 (7d7e6203): orphan-bucket window から burst budget を 2x まで膨らませる経路を閉じた + 隣接して見つかった CreateQueue/charge race にも防御を追加。

Codex P2 #2 (sweep orphan bucket) と Claude Low (test assertion vacuously true) の対応

ご指摘どおり、round 5 の bucket.mu を holding しながらの CompareAndDelete は「sweep が active charge() の lastRefill 更新を上書きして evict する」経路を閉じましたが、sync.Map.Load で bucket を取得済みで bucket.mu 取得待ちの concurrent goroutine がいる場合、sweep が release した瞬間にそれらの goroutine は orphan bucket に対して charge → 同時にそれ以降の request は loadOrInit で fresh full-cap bucket を作成 → sweep tick に request が aligning すると 2x capacity の burst が発生する経路が残っていました。

Round 6 の対応:

  • tokenBucketevicted bool フィールドを追加 (mu 保護)。
  • sweep / invalidateQueue / loadOrInit reconciliation で map から bucket を removal した後、mu 下で evicted = true にセット (CompareAndDelete success または LoadAndDelete return 後)。
  • charge()chargeBucket helper のループに refactor し、helper は mu 下で evicted を check → orphan なら retry signal を返す → caller は orphan reference を drop して loadOrInit 経由で live bucket を取得。最大 4 iterations で bound (sweep/reconciliation が同じ fresh bucket を何度も evict する経路は時間軸的に存在しないので、実態は 2 iterations が ceiling)。

Claude Low ご指摘の TestBucketStore_SweepDoesNotEvictRecentlyUsedBucketstillThere assertion は確かに vacuously true でした (for range 50 charge loop が eviction 後でも loadOrInit.LoadOrStore 経由で bucket を再構築するため、新旧どちらの code でも assertion 通過)。これを 3 本の新規 test に置き換え:

  • TestBucketStore_SweepRaceDoesNotInflateBudget: 4 sweeps × 64 chargers を backdated bucket に対して並行実行 → 終了後 total successful charges ≤ capacity (旧 code は 2x まで通る経路あり)。
  • TestBucketStore_OrphanedBucketRetriesToLiveEntry: sweep → eviction → charge と決定論的に interleave して evicted-flag retry path を verification。
  • TestBucketStore_InvalidateMarksOrphanEvicted: invalidateQueue が LoadAndDelete + evicted=true を mu 下で実行することを pin。

Codex P2 #3 (CreateQueue + stale-meta sendMessage race) の対応

ご指摘どおり、tryCreateQueueOnce post-Dispatch で s.throttle.invalidateQueue(name) を呼んでいなかったため、

  1. T1: SendMessage A が meta(gen=N, Throttle=X) を load
  2. T2: DeleteQueue B が commit → invalidateQueue で bucket drop
  3. T3: SendMessage A が chargeQueueWithThrottle 経由で fresh bucket(Throttle=X) を recreate
  4. T4: CreateQueue C が同名で commit (Throttle=X identical)

の race で、新しい queue が既に 1 token 消費済みの bucket を継承する経路がありました。tryCreateQueueOnce の Dispatch success 直後に invalidateQueue を呼ぶことで、in-flight traffic に関係なく新規 queue は fresh full-cap bucket からスタート。idempotent return path (existing queue with identical attrs) は意図的に skip しています (long-lived queue の no-op CreateQueue で bucket をリセットされたら任意の caller が tenant の burst capacity を silent restore できる reasoning は変わらず)。

Throttle config が異なれば loadOrInit reconciliation で reset されますが、identical Throttle で delete+recreate の場合のみこの fix が効きます。

スルーした findings (4件)

  • Codex P2 (sqs_catalog.go:101) "CreateQueue idempotency must compare Throttle" — 既に対応済 (round 1)。attributesEqual (sqs_catalog.go:726) は baseAttributesEqual + throttleConfigEqual + htfifoAttributesEqual を呼んでいます。
  • Gemini high (sqs_throttle.go:466) "chargeQueue redundant storage read" — 既に対応済 (round 2)。hot-path handlers (sendMessage, receiveMessage) は chargeQueueWithThrottle を呼んで second meta load を回避しています。chargeQueue は meta を load していない handlers (deleteMessage, changeMessageVisibility) のみで使用。
  • Gemini high (sqs_throttle.go:360) "maybeSweep on hot path" — 既に対応済 (round 2)。runSweepLoop が background ticker で reaperCtx に wired up されており、hot path に sweep cost は乗りません。
  • Gemini medium (sqs_throttle.go:428) "cap retryAfter at 10ms" — 既に overflow 防御済 (throttleRetryAfterCap = 1h で multiply 前に cap)。10ms の cap は SQS rate-limit semantics に対しては不適切で (contention 下で client が constantly retry する) 採用しません。

go test -race -count=20 の新規 test pass、SQS test package pass、golangci-lint clean。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 5m 52s —— View job


Review of 7d7e6203 — Round 6

Tasks

  • Gather context and understand request
  • Read adapter/sqs_throttle.go — verify evicted flag, chargeBucket retry loop, sweep/invalidate changes
  • Read adapter/sqs_throttle_test.go — verify three new tests replacing vacuously-true assertion
  • Read adapter/sqs_catalog.go — verify tryCreateQueueOnce invalidation
  • Check for new issues introduced by fixes
  • Verify "skipped findings" reasoning
  • Post final review

Round 6 Fixes Verified ✅

Fix 1 — evicted flag closes 2× burst window (sqs_throttle.go:108-115, 220-251, 337-352, 406-419)

The three-part guard is correctly wired:

tokenBucket.evicted (line 114): single boolean, protected by per-bucket mu. The comment documents the "why" precisely: goroutines blocked on mu before sweep's CompareAndDelete would otherwise charge against an orphaned bucket while later requests get a fresh full-capacity entry.

chargeBucket (lines 220-251): acquires bucket.mu, checks evicted before any refill/take. Returns (chargeOutcome{}, true) on eviction — the caller drops the reference and retries through loadOrInit. The function is split out cleanly from the charge loop so the retry signal doesn't bleed into the outcome path.

charge() retry loop (lines 201-211): for range 4 is an appropriate upper bound. The realistic ceiling is 2 iterations (stale/evicted bucket → miss-and-rebuild → live bucket). The 4-iteration cap prevents a pathological spin from an invariant violation that shouldn't be reachable, and fails-open (allow) rather than hard-429-storming on mis-configuration. The fail-open comment is accurate: any scenario where a goroutine loops past 2 iterations requires sweep to evict a freshly-created bucket within the same call, which is structurally impossible (fresh bucket has lastRefill = b.clock(), so lastRefill.Before(cutoff) = now.Before(now - evictedAfter) = false for any positive evictedAfter).

sweep() (lines 406-419): holds bucket.mu across CompareAndDelete, then sets evicted=true within the same critical section. This closes the window from Round 5. Analysis of the AB-BA cycle question:

  • sweep path: acquires bucket.mu → calls CompareAndDelete (which acquires sync.Map internal state internally, but that is released before CompareAndDelete returns; Range callback does not hold the map's read lock during the callback body)
  • charge() path: sync.Map.Load (lock-free read path) → acquires bucket.mu
    No cycle. Correct.

invalidateQueue() (lines 337-352): correctly upgraded from the old separate Load+Delete to atomic LoadAndDelete, followed by evicted=true under mu. This ensures no goroutine that loaded the entry before LoadAndDelete can charge the orphan without seeing the eviction signal.

Fix 2 — tryCreateQueueOnce invalidation (sqs_catalog.go:941-950)

The placement is correct: after Dispatch success, before the return. The idempotent-return path (if exists { if attributesEqual(...) { return true, nil } }) exits before this point — deliberately skipping invalidation for no-op CreateQueue calls (a no-op CreateQueue resetting the token bucket would let any caller silently restore burst capacity via a benign idempotent call). The comment accurately states the targeted race: sendMessage holding pre-DeleteQueue meta recreates the bucket in the gap between DeleteQueue's invalidateQueue and this CreateQueue's commit; the second invalidateQueue covers that window.


New Tests Verified ✅

TestBucketStore_SweepRaceDoesNotInflateBudget (lines 332-384)

This replaces the vacuously-true stillThere assertion from Round 5. The test correctly pins the invariant:

  • Bucket is backdated (lastRefill = now.Add(-2h)), tokens set to capacity=10, clock advanced to now.Add(2h). Clock frozen so no refill during the race — every token can only be consumed once.
  • 4 sweep goroutines + 64 charger goroutines race.
  • LessOrEqual(successes, capacity=10) is meaningful: with the old code, a charger that acquired mu before sweep's CompareAndDelete could drain the orphan (up to 10 tokens), while later chargers get a fresh full-capacity bucket — total up to 20. With the evicted flag, orphan chargers retry and converge on the live bucket, so total is bounded by 10.

One detail worth noting that confirms the test is not vacuously true: chargeBucket updates lastRefill = now when elapsed > 0. The first charger to acquire mu (with lastRefill = now.Add(-2h), elapsed = 4h > 0) updates lastRefill = clk = now.Add(2h). After that update, sweep's lastRefill.Before(cutoff) check evaluates now.Add(2h).Before(now.Add(1h)) = false — sweep skips eviction. So the test only demonstrates the evicted-flag protection when sweep wins the mu race against all chargers, which under Go's scheduler is a plausible concurrent outcome. The -race stress (-count=20) exercises this probabilistically.

TestBucketStore_OrphanedBucketRetriesToLiveEntry (lines 391-426)

Deterministic exercise of the evicted-flag path: manually backdate, call sweep(), verify evicted=true and key absent from map, then verify a subsequent charge() succeeds via a fresh bucket (different pointer). Clean test structure.

TestBucketStore_InvalidateMarksOrphanEvicted (lines 433-450)

Pins the LoadAndDelete + evicted=true path for invalidateQueue. Minimal and correct.


One Very Low Observation

loadOrInit matches path could return an already-evicted bucket

In loadOrInit (lines 282-286):

bucket.mu.Lock()
matches := bucket.capacity == capacity && bucket.refillRate == refill
bucket.mu.Unlock()
if matches {
    return bucket  // ← could return evicted bucket if sweep ran between Load and here
}

If sweep evicts the bucket between the b.buckets.Load(key) at line 274 and the bucket.mu.Lock() at line 282, matches is true but evicted is also true. The returned bucket causes chargeBucket to see evicted=true → retry. On retry, b.buckets.Load(key) misses → LoadOrStore inserts a fresh bucket. Correct, but one extra loop iteration.

Adding && !bucket.evicted to the matches check would convert this to a fall-through to LoadOrStore directly, saving the extra round trip:

matches := bucket.capacity == capacity && bucket.refillRate == refill && !bucket.evicted

This is a micro-optimization on an already-rare path (bucket must be idle-expired and have the exact same capacity/refill as the current config). Not worth a separate fix, but the 4-iteration loop bound in charge() was chosen with this path in mind — the comment there is accurate.


Skipped Findings — Reasoning Verified ✅

All four "skipped" items from the trigger comment are correctly characterized as already addressed:

Finding Addressed in
Codex P2 "CreateQueue idempotency must compare Throttle" Round 1: attributesEqual at sqs_catalog.go:726 calls baseAttributesEqual + throttleConfigEqual + htfifoAttributesEqual
Gemini high "chargeQueue redundant storage read" Round 2: hot-path handlers use chargeQueueWithThrottle; chargeQueue only used for handlers without their own meta load
Gemini high "maybeSweep on hot path" Round 2: runSweepLoop runs on background ticker wired to reaperCtx
Gemini medium "cap retryAfter at 10ms" Round 3 (cap fix): 10ms cap would cause constant client retries under SQS rate-limit semantics; the 1h cap is correctly reasoned

Summary

All Round 6 changes are correctly implemented. The evicted flag closes the 2× burst window from Round 5's residual race (goroutines holding stale pointers before sweep's CompareAndDelete). The tryCreateQueueOnce invalidation correctly handles the delete-then-create stale-meta window. All three new tests are meaningful and pin the right invariants — the vacuously-true stillThere assertion from Round 5 is correctly replaced.

Complete history:

Round Issue Status
1–5 (all prior items)
6 Codex P2 — orphan-bucket 2× burst window (evicted flag + retry loop) 7d7e620
6 Codex P2 — CreateQueue post-commit invalidation 7d7e620
6 Claude Low — vacuously-true stillThere test assertion 7d7e620
6 loadOrInit matches path doesn't check !evicted (extra retry iteration) ⬜ Micro-opt, not a bug

The implementation is ready.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7d7e6203ef

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_throttle.go Outdated
}
for _, action := range throttleAllActions {
key := bucketKey{queue: queue, action: action}
v, ok := b.buckets.LoadAndDelete(key)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark bucket evicted before deleting queue entry

invalidateQueue removes the bucket from sync.Map via LoadAndDelete before it acquires bucket.mu and sets evicted=true. If a concurrent charge() already loaded that pointer and wins the mutex race, it can consume tokens while evicted is still false on an orphaned bucket; the next request then creates a fresh full-capacity bucket, effectively discarding those deductions and allowing a burst above configured limits right after SetQueueAttributes/DeleteQueue/CreateQueue invalidation events.

Useful? React with 👍 / 👎.

Round 6's invalidateQueue used LoadAndDelete-then-lock — a charger
that loaded the pointer pre-LoadAndDelete could win the mu race and
charge the just-removed bucket while evicted was still false (Codex
P2 on round 6). The fix mirrors sweep's ordering:

  1. Load the pointer from the map (without deleting).
  2. Acquire bucket.mu.
  3. CompareAndDelete with the loaded v (rejects a replacement
     bucket inserted by a concurrent reconciliation).
  4. Set evicted=true under mu.
  5. Unlock.

A charger blocked on mu now sees evicted=true on entry and retries
against the live entry, instead of charging the orphan.

Replaced the over-strict TestBucketStore_InvalidateRace... with
TestBucketStore_InvalidateUnderConcurrencyIsRaceFree: invalidate's
reset-to-full-capacity is the *intended* semantics of an invalidation
event (DeleteQueue / SetQueueAttributes / CreateQueue all want the
new policy applied to a fresh bucket), so the post-invalidate fresh
bucket can legitimately absorb up to capacity additional tokens —
that 2x window is structural, not a bug. Asserting successful-charge
count <= capacity would also pass on the buggy code path under most
schedulings, so the assertion would not differentiate old vs new.
The deterministic TestBucketStore_InvalidateMarksOrphanEvicted
already pins the actual mechanism (evicted=true under mu on dropped
bucket); the new stress test exists to surface any -race finding the
new lock ordering might introduce.

go test -race -count=20 of every TestBucketStore_* test passes;
golangci-lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 6.1 (e9c20de4) で Codex round-6 の P2 (Mark bucket evicted before deleting queue entry) を対応:

ご指摘どおり、round 6 の invalidateQueueLoadAndDelete-then-lock の順序で、LoadAndDelete で map から bucket を取り除いた後 bucket.mu.Lock() するまでの window で、LoadAndDelete 直前に同じ pointer を load した concurrent charger が mu を先取りして evicted=false の orphan に対して charge できる経路がありました。

修正は sweep と同じ pattern に揃えました:

  1. Load(key) で pointer を取得 (delete はしない)
  2. bucket.mu.Lock()
  3. CompareAndDelete(key, v) (concurrent reconciliation の replacement bucket を誤って evict しない guard)
  4. CompareAndDelete success なら mu 下で evicted = true
  5. Unlock

mu を取得して block していた charger は、unblock 直後に evicted=true を観測 → retry → live entry へ converge。

新規 race stress test TestBucketStore_InvalidateUnderConcurrencyIsRaceFree を追加 (簡素化済). 当初書こうとしていた "successful charges ≤ capacity" 形の assertion は不適切だったため取り下げました — invalidate の reset-to-full-capacity は DeleteQueue/SetQueueAttributes/CreateQueue で意図された behaviour (新 policy が fresh bucket に適用されるべき) であり、post-invalidate の fresh bucket は capacity 分まで legitimate に accept します。この 2x window は structural であって bug ではありません。実際のメカニズムは deterministic な TestBucketStore_InvalidateMarksOrphanEvicted が pin しています (dropped bucket が mu 下で evicted=true になることを直接 assert)。

その他 round-6 review で再投稿された findings (Codex P2 #1 CreateQueue idempotency / #3 stale meta、Gemini high chargeQueue / maybeSweep、Gemini medium retryAfter cap) はいずれも prior round で対応済 — 前回のコメントで詳述したとおりです。

go test -race -count=20 ./adapter/... -run TestBucketStore clean、golangci-lint clean。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 4m 10s —— View job


Review of e9c20de4 — Round 6.1

Tasks

  • Gather context and understand request
  • Read adapter/sqs_throttle.go — verify invalidateQueue fix (Load-lock-CompareAndDelete-evicted)
  • Read adapter/sqs_throttle_test.go — verify new concurrency test
  • Check for new issues introduced by the fix
  • Post final review

Round 6 Fix Verified ✅

Fix — invalidateQueue mirrors sweep()'s lock-then-delete-then-flag ordering (sqs_throttle.go:350-367)

The one-site change is correct and complete. The three-step transition:

Step Old (LoadAndDelete-then-lock) New (Load-then-lock-then-CompareAndDelete)
1 LoadAndDelete(key) — evicts from map immediately Load(key) — pointer obtained, bucket still in map
2 bucket.mu.Lock() — race window: charger that loaded pre-LoadAndDelete can win mu and see evicted=false bucket.mu.Lock() — blocks any concurrent charger until we're done
3 bucket.evicted = true CompareAndDelete(key, v) — only evicts if map still holds this specific pointer
4 bucket.evicted = true under mu

The race the old code allowed: goroutine C loads v1 pre-LoadAndDeleteLoadAndDelete removes v1 → C wins bucket.mu → sees evicted=false → charges the orphan → invalidateQueue acquires mu → sets evicted=true → next request gets a fresh full-capacity bucket → 2× burst. The new ordering eliminates this: C is now blocked on mu until invalidateQueue releases with evicted=true set, and C retries into the live entry.

LoadAndDelete is fully removed from live code. Confirmed: the only occurrences of LoadAndDelete in sqs_throttle.go are in the invalidateQueue comment block (lines 345-348) explaining why the old approach was wrong.

All three eviction sites now use identical orderingloadOrInit reconciliation (line 308), invalidateQueue (line 362), and sweep (line 427) all hold bucket.mu across CompareAndDelete before setting evicted=true. The code is now internally consistent.

AB-BA deadlock analysis confirmed safe:

  • invalidateQueue path: sync.Map.Load (lock-free, no internal lock held) → bucket.mu.Lock()CompareAndDelete (acquires sync.Map's internal mu)
  • charge() path: sync.Map.Load (lock-free) → bucket.mu.Lock()
  • runSweepLoop/sweep(): Range callback is invoked outside sync.Map's read lock in Go's implementation → bucket.mu.Lock()CompareAndDelete

No path holds sync.Map's internal mu while waiting for bucket.mu. No cycle.

CompareAndDelete(key, v) on a concurrent-reconciliation replacement bucket (the case where loadOrInit reconciliation replaced v1 with fresh between invalidateQueue's Load and its CompareAndDelete): CompareAndDelete correctly returns false (map holds fresh, not v1) — invalidateQueue leaves fresh alone. fresh was built by reconciliation from the same current-config parameters, so this is correct: the effectively-current-config bucket does not get wiped. Meanwhile, loadOrInit's own reconciliation already set v1.evicted=true at line 308-311.


New Test Verified ✅

TestBucketStore_InvalidateUnderConcurrencyIsRaceFree (lines 472-497)

The reasoning for withdrawing the count assertion is sound and correctly stated in the comment:

  • invalidateQueue's reset-to-full-capacity is intended semantics — a post-invalidate fresh bucket should legitimately absorb up to capacity tokens under new policy. Asserting successes ≤ capacity would pass on the old buggy code too (under most schedulings), so it wouldn't differentiate.
  • What the fix guarantees is not a charge-count bound, but that no orphaned bucket is charged while evicted is still false — which is what TestBucketStore_InvalidateMarksOrphanEvicted pins deterministically.
  • The stress test's value is -race detection for the new lock ordering, not a semantics assertion.

This split (deterministic pin + stress -race sweep) is the right coverage strategy.


No New Issues Found

The fix is minimal and surgical — one call site changed in invalidateQueue, one stress test added, one over-strict test replaced. No other code paths were touched. The comment block accurately documents both what changed and why the old ordering was wrong.


Complete History

All findings from rounds 1–6.1 are resolved:

Round Issue Status
1–5 (all prior items)
6 Codex P2 — orphan-bucket 2× burst window (evicted flag + retry loop) 7d7e620
6 Codex P2 — CreateQueue post-commit invalidation 7d7e620
6 Claude Low — vacuously-true stillThere test assertion 7d7e620
6.1 Codex P2 — invalidateQueue LoadAndDelete-then-lock race e9c20de

Remaining acknowledged-and-deferred items (all in PR "Out of scope" section): token-before-validation ordering in validateSend, Prometheus metrics (§4.1), §6.5 cross-protocol parity test (blocked on PR #662), §6.6 failover behaviour test (needs Jepsen scaffolding).

The implementation is ready.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e9c20de48e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_catalog.go Outdated

// htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields.
func htfifoAttributesEqual(a, b *sqsQueueMeta) bool {
return a.PartitionCount == b.PartitionCount &&
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Treat PartitionCount 0 and 1 as equal in idempotency checks

validatePartitionConfig documents PartitionCount == 0 as the unset value equivalent to single-partition routing, but htfifoAttributesEqual compares PartitionCount with strict numeric equality. That means a queue created without PartitionCount (stored as 0) is later considered "different attributes" from the same queue created with explicit PartitionCount=1, so CreateQueue can incorrectly return QueueNameExists for a semantically identical request. This can break idempotent provisioning/retry flows that normalize defaults differently.

Useful? React with 👍 / 👎.

… round 6.2)

validatePartitionConfig documents PartitionCount=0 (unset) and =1
(explicit single partition) as semantically identical legacy /
single-partition routing, but htfifoAttributesEqual was using strict
numeric equality. So a queue created without PartitionCount (stored
as 0) followed by a CreateQueue retry that explicitly passes
PartitionCount=1 was rejected as "different attributes" — broken
idempotency for any provisioning flow that normalises defaults
differently between attempts. (Codex P2 on PR #679 round 6.1.)

Fix: normalisePartitionCount maps 0 -> 1 before comparison; values >1
pass through unchanged so HT-FIFO partition counts still require
exact match. The on-disk schema is unchanged.

Tests:

- TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent —
  pins (0, 1) equality, symmetry, and that real divergence (2 vs 0,
  2 vs 1) still rejects.
- TestNormalisePartitionCount — pins the helper directly.

go test -race ./adapter/... pass; golangci-lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 6.2 (dbd7a666) で Codex round-6.1 P2 (Treat PartitionCount 0 and 1 as equal in idempotency checks) を対応:

ご指摘どおり、validatePartitionConfig の docstring で PartitionCount == 0 を canonical "unset" / single-partition routing と定義しているのに htfifoAttributesEqual が strict numeric equality を使っていたため、PartitionCount を渡さず create した queue (stored as 0) に対して explicit PartitionCount=1 で retry すると idempotency check が "different attributes" として reject する経路がありました。Provisioning flow が default を attempt 間で normalize しなおすケース (一部の Terraform/CloudFormation provider など) で再現します。

修正: normalisePartitionCount(0|1) -> 1, >1 は pass-through。htfifoAttributesEqual で両 operands を normalize してから比較。on-disk schema 変更なし (storage 上の値はユーザ入力のまま、equality だけ semantic 比較)。

新規 test:

  • TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent: (0, 1) equal、symmetry、real divergence (0/1 vs 2、2 vs 2 OK) を pin。
  • TestNormalisePartitionCount: helper 単体を pin。

FifoThroughputLimit / DeduplicationScope も類似の "empty vs explicit default" 問題を持ち得ますが、AWS-defined string 値の default は queue type / PartitionCount 依存 (FIFO legacy なら "queue", HT-FIFO なら "messageGroup" 等) で正規化が文脈依存になるため、今回の Codex 指摘には含まれていない範囲として後追いとします。validator は Standard queue の場合に non-empty を reject、HT-FIFO の場合に specific value を要求しているため、現状 idempotency が壊れる現実的な経路は PartitionCount のみと判断。

go test -race ./adapter/... -run "TestHTFIFO|TestNormalise|TestPartition|TestCreateQueue" clean、golangci-lint clean。

なお round-6.1 の他の Codex/Gemini findings (CreateQueue idempotency Throttle / sweep orphan / stale meta / chargeQueue / maybeSweep / retryAfter cap) はいずれも prior round で対応済の re-post です。Claude round-6 / 6.1 は "The implementation is ready" verdict (#679 (comment) / 4327071094)。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 2m 24s —— View job


Review of dbd7a666 — Round 6.2

Tasks

  • Gather context and understand request
  • Read adapter/sqs_catalog.go — verify htfifoAttributesEqual + normalisePartitionCount fix
  • Read adapter/sqs_partitioning.go — verify validatePartitionConfig / validatePartitionImmutability context
  • Read test files — verify new tests pin the right invariants
  • Check for new issues introduced by the fix
  • Post final review

Round 6.1 Fix Verified ✅

Fix — normalisePartitionCount applied in htfifoAttributesEqual (sqs_catalog.go:781)

The change is minimal and correct. normalisePartitionCount maps 0 → 1; any value > 1 passes through unchanged. The logic matches the contract documented in validatePartitionConfig (sqs_partitioning.go:118): "PartitionCount == 0 is canonical 'unset' and is equivalent to 1 for routing purposes." The on-disk schema is untouched — stored values remain 0 or the user-supplied integer; normalization is only applied at comparison time.

normalisePartitionCount is a pure function with no side effects — it cannot regress any write path.

The comment block on htfifoAttributesEqual (lines 772–779) accurately documents both the bug (validatePartitionConfig says 0 ≡ 1, but strict equality was used) and the fix (normalize before compare). The cross-reference to the Codex P2 finding is consistent with the comment style in this file.


New Tests Verified ✅

TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent (sqs_partitioning_test.go:278)

Covers all four cases that matter for correctness:

  • (0, 1) equal ✓
  • (1, 0) symmetric ✓
  • (0, 2) divergence still rejects ✓
  • (1, 2) divergence still rejects ✓
  • (2, 2) identical > 1 still equal ✓

Reverting to the old strict equality in htfifoAttributesEqual would flip the first two assertions — the test is not vacuously true.

TestNormalisePartitionCount (sqs_partitioning_test.go:306)

Pins the helper directly. Compact and sufficient — the helper has only one branch.


One Issue Found

Low — validatePartitionImmutability has the same 0 ≡ 1 gap for the SetQueueAttributes path

normalisePartitionCount was applied to the CreateQueue idempotency check (htfifoAttributesEqual) but not to the SetQueueAttributes immutability check (validatePartitionImmutability, sqs_partitioning.go:208):

if current.PartitionCount != requested.PartitionCount {
    return newSQSAPIError(..., "PartitionCount is immutable; ...")
}

Concrete failure path:

  1. User creates a queue without specifying PartitionCount → stored as 0.
  2. User calls SetQueueAttributes(PartitionCount=1) (explicit single partition — semantically identical).
  3. applyAttributes sets requested.PartitionCount = 1.
  4. validatePartitionImmutability: 0 != 1 → rejects with "PartitionCount is immutable".

The user's intent was a no-op (setting the default explicitly), but they get an error that falsely implies they tried to change the partition count. This mirrors exactly the CreateQueue idempotency bug that this commit fixes.

Suggested fix — one-line change in validatePartitionImmutability:

if normalisePartitionCount(current.PartitionCount) != normalisePartitionCount(requested.PartitionCount) {

Scope note: Under the Phase 3.D dormancy gate (validatePartitionDormancyGate), all creatable queues today have PartitionCount = 0 (the gate blocks > 1). SetQueueAttributes does not call the dormancy gate, so a PartitionCount=1 SetQueueAttributes request on such a queue hits the immutability check directly. The gap is real but narrow — only users explicitly passing PartitionCount=1 on an attribute-default queue would encounter it.

Fix this →


Other Observations

FifoThroughputLimit / DeduplicationScope deferral is correct. These fields have context-dependent defaults (Standard vs HT-FIFO, queue type, PartitionCount). A normalization rule for them would be non-trivial and would need to encode the same conditional logic as the validator — a separate and deliberate scope. The trigger comment's reasoning is sound.

GetQueueAttributes correctly omits PartitionCount when 0 (sqs_partitioning.go:254): if meta.PartitionCount > 0 → the field only appears in responses when explicitly set. This is consistent with the "0 = unset, not advertised" semantic.


Summary

The normalisePartitionCount fix in htfifoAttributesEqual is correctly implemented and fully tested. One closely related gap remains: validatePartitionImmutability uses the same strict equality pattern on the SetQueueAttributes path and needs the same one-line normalization treatment. All other prior findings from rounds 1–6.1 remain resolved.

Round Issue Status
1–6.1 (all prior items)
6.2 Codex P2 — PartitionCount 0 ≡ 1 in CreateQueue idempotency dbd7a66
6.2 validatePartitionImmutability has same 0 ≡ 1 gap for SetQueueAttributes ⚠️ Low

#679 round 6.3)

Round 6.2 normalised the CreateQueue idempotency path
(htfifoAttributesEqual) but left the SetQueueAttributes immutability
path (validatePartitionImmutability) using strict equality. Same
0 == 1 gap (Claude Low on round 6.2):

  1. CreateQueue with no PartitionCount -> stored as 0.
  2. SetQueueAttributes(PartitionCount=1) -> requested.PartitionCount = 1.
  3. validatePartitionImmutability: 0 != 1 -> rejects "PartitionCount
     is immutable", even though the partition layout is unchanged.

Fix: validatePartitionImmutability now normalises both sides through
normalisePartitionCount before comparison, mirroring the round-6.2
fix. The narrow gap surfaced under the §11 PR 2 dormancy gate
(SetQueueAttributes does not call the gate, so this path is
reachable on existing queues), so a fix is correct now rather than
deferring to PR 5.

Test:

- TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent
  table-driven over all (stored, requested) ∈ {0,1,2,4,8} pairs;
  no-op pairs accept, real-change pairs reject. Reverting either
  call to strict equality flips the relevant rows.

go test -race ./adapter/... pass; golangci-lint clean.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 27, 2026

Round 6.3 (dcc55741) で Claude round-6.2 Low (validatePartitionImmutability has the same 0 ≡ 1 gap for the SetQueueAttributes path) を対応。

ご指摘どおり、round-6.2 で htfifoAttributesEqual は normalize したのに validatePartitionImmutability は strict equality のままで、SetQueueAttributes path で同じ 0/1 gap が残っていました。

具体的な失敗 path:

  1. CreateQueue (PartitionCount 未指定) → stored = 0
  2. SetQueueAttributes(PartitionCount=1) → requested.PartitionCount = 1
  3. 0 != 1 → "PartitionCount is immutable" reject (本来 partition layout は変わっていない no-op)

修正: validatePartitionImmutability の PartitionCount 比較を normalisePartitionCount(current) != normalisePartitionCount(requested) に変更。round-6.2 と同じパターン。

Claude が指摘されているとおり scope は narrow (Phase 3.D §11 PR 2 の dormancy gate は SetQueueAttributes 経路では走らないので、explicit PartitionCount=1 を attribute-default queue に send したケースのみ hit) ですが、PR 5 で gate が外れる前に閉じておくのが正解と判断。

新規 test:

  • TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent: (stored, requested) を {0,1,2,4,8} の組合せで table-driven。no-op pairs (0/1, 1/0, 0/0, 1/1, 8/8) accept、real-change pairs (0/2, 1/2, 2/1, 2/0, 4/8) reject。strict equality に戻すと該当 row が flip。

FifoThroughputLimit / DeduplicationScope に同じ normalise を適用しないのは round-6.2 で説明したとおり (default が context 依存で normalise rule が conditional)。

go test -race ./adapter/... (immutability + idempotency 系) clean、golangci-lint clean。

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented Apr 27, 2026

Claude finished @bootjp's task in 2m 22s —— View job


Review of dcc55741 — Round 6.3

Tasks

  • Gather context and understand request
  • Read adapter/sqs_partitioning.go — verify validatePartitionImmutability fix
  • Read adapter/sqs_partitioning_test.go — verify new test coverage
  • Check for new issues introduced by the fix
  • Post final review

Round 6.2 Fix Verified ✅

Fix — validatePartitionImmutability normalises PartitionCount on both sides (sqs_partitioning.go:215)

The one-line change is correct and complete, mirroring exactly the pattern from Round 6.2 (htfifoAttributesEqual):

// before
if current.PartitionCount != requested.PartitionCount {

// after
if normalisePartitionCount(current.PartitionCount) != normalisePartitionCount(requested.PartitionCount) {

normalisePartitionCount (defined in sqs_catalog.go:790) maps 0 → 1, passes > 1 through unchanged. The fix is sourced from the same function used in htfifoAttributesEqual, maintaining a single canonical definition.

The comment block (lines 205–210) accurately documents the bug (validatePartitionConfig says 0 ≡ 1, but strict equality was used) and the specific failure path (CreateQueue without PartitionCount → stored as 0; SetQueueAttributes with PartitionCount=1 → rejected with "PartitionCount is immutable"). The attribution matches the comment style used throughout the file.

Coverage check — all PartitionCount comparison sites are now consistent:

Site Uses normalise? Correct?
htfifoAttributesEqual (CreateQueue idempotency) ✅ Round 6.2
validatePartitionImmutability (SetQueueAttributes immutability) ✅ Round 6.3
Routing logic (PartitionCount <= 1, > 1) N/A — literal comparisons for routing decisions, not equality checks
validatePartitionConfig (power-of-two, FIFO-only gate, etc.) N/A — validation semantics intentionally distinguish 0 vs 1

No other equality comparison site was missed.


New Test Verified ✅

TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent (sqs_partitioning_test.go:275)

The 10-case table is complete and non-vacuous:

Case Expected Catches regression?
(0, 1) no-op no error ✅ flips with strict equality
(1, 0) no-op no error ✅ flips with strict equality (symmetric)
(0, 0) no-op no error passes either way
(1, 1) no-op no error passes either way
(0, 2) real change error passes either way
(1, 2) real change error passes either way
(2, 1) real change error ✅ flips with strict equality (reverse direction)
(2, 0) real change error ✅ flips with strict equality (reverse direction)
(4, 8) real change error passes either way
(8, 8) no-op no error passes either way

The (2,1) and (2,0) cases are particularly useful: they verify that normalisation only collapses 0/1 and not 2/1 or 2/0 — a correct but non-obvious property of normalisePartitionCount. Reverting to strict equality flips exactly 4 cases, confirming the test is not vacuously true.

TestNormalisePartitionCount (sqs_partitioning_test.go:348) from Round 6.2 pins the helper itself and is unchanged — no regression there.


One Very Low Observation

Integration test gap: SetQueueAttributes(PartitionCount=1) on a stored-as-0 queue

TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects creates a queue via mustCreateFIFOWithThroughputLimit, which does not set PartitionCount (stored as 0). The test exercises FifoThroughputLimit immutability but not the 0→1 PartitionCount no-op path. An integration-level test for:

CreateQueue (no PartitionCount → stored as 0)
SetQueueAttributes(PartitionCount=1) → expect 200

would complete the wire-level coverage for this fix. Since validatePartitionImmutability is a pure function exercised directly by the unit test and the wire path through trySetQueueAttributesOnce is already covered by the existing FifoThroughputLimit scenario, this is low priority — but it would be the natural follow-up to pin the actual HTTP response.


Overall

The fix is a one-line change that correctly extends the Round 6.2 normalisation pattern to the SetQueueAttributes immutability path. Test coverage is thorough at the unit level and confirms the regression-detection property. The minor integration test gap is the only remaining observation.

Complete history — all rounds resolved:

Round Issue Status
1–6.1 (all prior items)
6.2 Codex P2 — PartitionCount 0 ≡ 1 in CreateQueue idempotency (htfifoAttributesEqual) dbd7a66
6.3 Claude Low — validatePartitionImmutability same 0 ≡ 1 gap for SetQueueAttributes dcc5574
6.3 Integration test gap for 0→1 no-op wire path ⬜ Very low, not a correctness issue

The implementation is ready.

@bootjp bootjp merged commit 1ae4aa3 into docs/sqs-phase3-proposals Apr 27, 2026
5 checks passed
@bootjp bootjp deleted the feat/sqs-throttling-phase3c branch April 27, 2026 15:55
bootjp added a commit that referenced this pull request Apr 28, 2026
…md (PR #664 round 14)

Claude low on round 13.1: ~25+ comments across the SQS Go files reference review attributions ('Codex P1 on PR #679', 'CodeRabbit Major on PR #664 round 11', etc.). CLAUDE.md is explicit: 'Don't reference the current task, fix, or callers ... since those belong in the PR description and rot as the codebase evolves.'

Sweep across:
- adapter/sqs_throttle.go
- adapter/sqs_catalog.go
- adapter/sqs_partitioning.go
- adapter/sqs_messages.go
- adapter/sqs_throttle_test.go
- adapter/sqs_partitioning_test.go
- adapter/sqs_throttle_integration_test.go
- adapter/sqs_partitioning_integration_test.go
- adapter/sqs_catalog_test.go
- adapter/sqs_query_protocol.go
- adapter/sqs_query_protocol_test.go

Each comment now retains the substantive WHY (why the design choice was made) and drops the parenthetical attribution to a specific reviewer / PR number / round. Design docs (.md files) are exempt — those are archival records where review attribution is acceptable practice.

go test -race ./adapter/... pass; golangci-lint ./... clean. No behaviour change.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant