diff --git a/adapter/sqs.go b/adapter/sqs.go index dfd1916cc..aac76afa9 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -5,6 +5,7 @@ import ( "io" "net" "net/http" + "strconv" "time" "github.com/bootjp/elastickv/kv" @@ -55,6 +56,11 @@ const ( sqsErrInternalFailure = "InternalFailure" sqsErrServiceUnavailable = "ServiceUnavailable" sqsErrMalformedRequest = "MalformedQueryString" + // sqsErrThrottling is the per-queue rate-limit rejection code. + // Returned with HTTP 400 and a Retry-After header derived from the + // bucket's refillRate + the request's charge count (see + // computeRetryAfter in sqs_throttle.go for the formula). + sqsErrThrottling = "Throttling" ) type SQSServerOption func(*SQSServer) @@ -76,6 +82,11 @@ type SQSServer struct { // goroutines without ordering between them. reaperCtx context.Context reaperCancel context.CancelFunc + // throttle is the per-queue rate-limit bucket store. Always + // non-nil; charge() short-circuits when the queue's meta has no + // throttle config so unconfigured queues pay one nil-check per + // request and nothing else (see sqs_throttle.go). + throttle *bucketStore } // WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to @@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin coordinator: coordinate, reaperCtx: reaperCtx, reaperCancel: reaperCancel, + throttle: newBucketStoreDefault(), } s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){ sqsCreateQueueTarget: s.createQueue, @@ -131,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin func (s *SQSServer) Run() error { s.startReaper(s.reaperCtx) + // Throttle bucket idle-evict runs on a background ticker so the + // request hot path never pays the O(N) sweep cost. Cleaned up by + // the same reaperCtx cancellation that stops the message reaper. + go s.throttle.runSweepLoop(s.reaperCtx) if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) { return errors.WithStack(err) } @@ -267,3 +283,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin w.WriteHeader(status) _ = json.NewEncoder(w).Encode(resp) } + +// writeSQSThrottlingError emits the rate-limit rejection envelope: 400 +// + the AWS-shaped JSON error body + a Retry-After header carrying +// the integer-second wait derived from the bucket's refill rate and +// the request's charge count. The action argument is the bucket-action +// vocabulary ("Send" | "Receive" | "*") so the operator-visible +// message names the bucket that ran out, not just the queue. +func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) { + if retryAfter < time.Second { + retryAfter = time.Second + } + secs := int(retryAfter / time.Second) + w.Header().Set("Retry-After", strconv.Itoa(secs)) + writeSQSError(w, http.StatusBadRequest, sqsErrThrottling, + "Rate exceeded for queue '"+queue+"' action '"+action+"'") +} diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 4dc801e68..962bb443b 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -5,6 +5,7 @@ import ( "context" "io" "log/slog" + "math" "net/http" "net/url" "regexp" @@ -92,6 +93,72 @@ type sqsQueueMeta struct { // commit time and trust HLC monotonicity to keep ordering sane. CreatedAtMillis int64 `json:"created_at_millis,omitempty"` LastModifiedAtMillis int64 `json:"last_modified_at_millis,omitempty"` + // Throttle is the per-queue rate-limit configuration. nil disables + // throttling (default). Set via SetQueueAttributes with the AWS-style + // names ThrottleSendCapacity / ThrottleSendRefillPerSecond / etc. + // Persisted on the meta so a leader failover loads the configuration + // along with the rest of the queue. + Throttle *sqsQueueThrottle `json:"throttle,omitempty"` + // PartitionCount is the number of FIFO partitions for this queue + // (Phase 3.D HT-FIFO, see docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). + // Zero or 1 means the legacy single-partition layout — no schema + // change. Greater than 1 enables HT-FIFO. Set at CreateQueue time + // and immutable thereafter (SetQueueAttributes rejects any change). + // Power-of-two values only (validator rejects others). PR 2 of the + // rollout introduces this field but a temporary CreateQueue gate + // rejects PartitionCount > 1 until PR 5 lifts the gate atomically + // with the data-plane fanout — so the schema exists but no + // partitioned data can land before the data plane is wired. + PartitionCount uint32 `json:"partition_count,omitempty"` + // FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId" + // (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId + // routing; "perQueue" activates the partition-0 short-circuit so + // every group ID routes to one partition (effectively N=1). + // Set at CreateQueue time and immutable thereafter — flipping it + // live would re-route in-flight messages and silently violate + // within-group FIFO ordering (see §3.2 of the design). + FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"` + // DeduplicationScope mirrors the AWS attribute. "messageGroup" + // (default for HT-FIFO) means the dedup window is per + // (queue, partition, MessageGroupId, dedupId); "queue" is the + // legacy single-window behaviour. Set at CreateQueue time and + // immutable thereafter — changing live can resurrect or suppress + // messages depending on the direction of the change. The + // validator additionally rejects {PartitionCount > 1, + // DeduplicationScope = "queue"} at CreateQueue time because the + // dedup key cannot be globally unique across partitions without + // a cross-partition OCC transaction. + DeduplicationScope string `json:"deduplication_scope,omitempty"` +} + +// sqsQueueThrottle is the per-queue token-bucket configuration. Three +// independent buckets per queue: Send (SendMessage[Batch]), Recv +// (ReceiveMessage / DeleteMessage[Batch] / ChangeMessageVisibility[Batch], +// charged on the consumer side), Default (catch-all for any future +// non-Send/Recv verb that gets wired into the throttle path). +// +// Field-name vocabulary uses short forms (Send*, Recv*, Default*) for the +// JSON contract and AWS-style attribute names; the in-memory bucketKey +// uses the canonical action vocabulary ("Send" | "Receive" | "*"). +// throttleConfigToBucketAction and bucketActionForCharge bridge the two. +type sqsQueueThrottle struct { + SendCapacity float64 `json:"send_capacity,omitempty"` + SendRefillPerSecond float64 `json:"send_refill_per_second,omitempty"` + RecvCapacity float64 `json:"recv_capacity,omitempty"` + RecvRefillPerSecond float64 `json:"recv_refill_per_second,omitempty"` + DefaultCapacity float64 `json:"default_capacity,omitempty"` + DefaultRefillPerSecond float64 `json:"default_refill_per_second,omitempty"` +} + +// IsEmpty reports whether the configuration is the no-op (all six +// fields zero), in which case throttling is disabled for the queue. +func (t *sqsQueueThrottle) IsEmpty() bool { + if t == nil { + return true + } + return t.SendCapacity == 0 && t.SendRefillPerSecond == 0 && + t.RecvCapacity == 0 && t.RecvRefillPerSecond == 0 && + t.DefaultCapacity == 0 && t.DefaultRefillPerSecond == 0 } var storedSQSMetaPrefix = []byte{0x00, 'S', 'Q', 0x01} @@ -297,6 +364,14 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet if meta.ContentBasedDedup && !meta.IsFIFO { return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") } + // HT-FIFO validation runs after resolveFifoQueueFlag so the + // IsFIFO-only checks see the post-resolution flag. The temporary + // dormancy gate (§11 PR 2) runs separately in createQueue so + // SetQueueAttributes paths share the schema validator without + // re-rejecting on the gate. + if err := validatePartitionConfig(meta); err != nil { + return nil, err + } return meta, nil } @@ -384,6 +459,55 @@ var sqsAttributeAppliers = map[string]attributeApplier{ m.ContentBasedDedup = b return nil }, + // PartitionCount enables HT-FIFO when > 1 (Phase 3.D, see + // docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set + // at CreateQueue time; SetQueueAttributes attempts to change it + // reject via the immutability check in trySetQueueAttributesOnce. + // PR 2 of the rollout introduces the field but the temporary + // dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1 + // until PR 5 lifts the gate atomically with the data plane. + "PartitionCount": func(m *sqsQueueMeta, v string) error { + n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32) + if err != nil { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount must be a non-negative integer") + } + m.PartitionCount = uint32(n) //nolint:gosec // bounded by ParseUint(_, _, 32) above. + return nil + }, + "FifoThroughputLimit": func(m *sqsQueueMeta, v string) error { + v = strings.TrimSpace(v) + switch v { + case "", htfifoThroughputPerMessageGroupID, htfifoThroughputPerQueue: + m.FifoThroughputLimit = v + return nil + } + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit must be 'perMessageGroupId' or 'perQueue'") + }, + "DeduplicationScope": func(m *sqsQueueMeta, v string) error { + v = strings.TrimSpace(v) + switch v { + case "", htfifoDedupeScopeMessageGroup, htfifoDedupeScopeQueue: + m.DeduplicationScope = v + return nil + } + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope must be 'messageGroup' or 'queue'") + }, + // Throttle* are non-AWS extensions for per-queue rate limiting, + // see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md. + // Each accepts a non-negative float64; the cross-attribute + // validation that enforces both-zero-or-both-positive on each + // (capacity, refill) pair, capacity ≥ refill, hard ceiling, and + // the capacity ≥ 10 floor for batch-charging buckets runs in + // validateThrottleConfig after every Throttle* applier has fired. + "ThrottleSendCapacity": applyThrottleField(throttleSetSendCapacity), + "ThrottleSendRefillPerSecond": applyThrottleField(throttleSetSendRefill), + "ThrottleRecvCapacity": applyThrottleField(throttleSetRecvCapacity), + "ThrottleRecvRefillPerSecond": applyThrottleField(throttleSetRecvRefill), + "ThrottleDefaultCapacity": applyThrottleField(throttleSetDefaultCapacity), + "ThrottleDefaultRefillPerSecond": applyThrottleField(throttleSetDefaultRefill), "RedrivePolicy": func(m *sqsQueueMeta, v string) error { // Validate the policy at attribute-apply time so a malformed // RedrivePolicy never makes it onto the queue meta record. The @@ -419,6 +543,169 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error { return err } } + // Throttle* validation has to run after every applier so the + // pair-wise rules (both-zero-or-both-positive, capacity ≥ refill, + // capacity ≥ 10 for batch buckets) see the post-update meta as a + // whole. Running per-applier would reject a valid two-attribute + // update (e.g. SendCapacity + SendRefillPerSecond) on the first + // applier because the second value is not yet present. + if err := validateThrottleConfig(meta); err != nil { + return err + } + // HT-FIFO partition validation runs in parseAttributesIntoMeta / + // trySetQueueAttributesOnce, AFTER resolveFifoQueueFlag, so the + // IsFIFO-only checks see the post-resolution flag. Running here + // would reject a valid CreateQueue with FifoQueue=true + + // FifoThroughputLimit=perMessageGroupId because IsFIFO is still + // false at this point in the flow. + return nil +} + +// applyThrottleField wraps a setter that writes one Throttle* field +// into meta.Throttle, allocating the struct lazily on first use. The +// per-field setter does the float parse + non-negative + hard-ceiling +// check; cross-field rules run later in validateThrottleConfig. +func applyThrottleField(set func(*sqsQueueThrottle, float64)) attributeApplier { + return func(m *sqsQueueMeta, v string) error { + f, err := parseThrottleFloat(v) + if err != nil { + return err + } + if m.Throttle == nil { + m.Throttle = &sqsQueueThrottle{} + } + set(m.Throttle, f) + return nil + } +} + +// parseThrottleFloat parses the wire string into a non-negative float +// bounded by the hard ceiling. Any malformed or out-of-range value +// turns into InvalidAttributeValue with a self-describing message so +// the operator sees the cause without grepping the server log. +func parseThrottleFloat(value string) (float64, error) { + v := strings.TrimSpace(value) + f, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "throttle attribute must be a non-negative number") + } + if math.IsNaN(f) || math.IsInf(f, 0) || f < 0 { + return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "throttle attribute must be finite and non-negative") + } + if f > throttleHardCeilingPerSecond { + return 0, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "throttle attribute exceeds hard ceiling 100000") + } + return f, nil +} + +// Per-field setters keep applyThrottleField a one-liner per attribute +// and let validateThrottleConfig stay outside the applier dispatch +// table. Defined as functions (not closures) so a future caller from +// outside applyAttributes — e.g. a programmatic admin surface — can +// reuse them without recreating the closure boilerplate. +func throttleSetSendCapacity(t *sqsQueueThrottle, f float64) { t.SendCapacity = f } +func throttleSetSendRefill(t *sqsQueueThrottle, f float64) { t.SendRefillPerSecond = f } +func throttleSetRecvCapacity(t *sqsQueueThrottle, f float64) { t.RecvCapacity = f } +func throttleSetRecvRefill(t *sqsQueueThrottle, f float64) { t.RecvRefillPerSecond = f } +func throttleSetDefaultCapacity(t *sqsQueueThrottle, f float64) { t.DefaultCapacity = f } +func throttleSetDefaultRefill(t *sqsQueueThrottle, f float64) { t.DefaultRefillPerSecond = f } + +// validateThrottleConfig enforces the §3.2 cross-attribute rules on +// the post-applier meta. The single-field constraints (non-negative, +// hard ceiling) are already enforced inside parseThrottleFloat; +// what's left is pair-wise: +// +// - Each (capacity, refill) pair must be both zero (action disabled) +// or both positive. A capacity-without-refill bucket would never +// refill; a refill-without-capacity bucket has no burst headroom. +// - capacity ≥ refill, otherwise the bucket can never burst above +// steady state (the bucket can only ever hold one second's worth). +// - For action buckets that cover a batch verb (Send, Recv) the +// capacity must be ≥ throttleMinBatchCapacity (== 10). A capacity +// below the largest single charge is permanently unserviceable +// for full batches. +// +// If meta.Throttle is empty (the IsEmpty short-circuit) the function +// also drops the empty struct so a round-trip GetQueueAttributes +// reports the queue as untrothttled rather than zero-valued. Mirrors +// how nil throttle on the meta means "not configured". +func validateThrottleConfig(meta *sqsQueueMeta) error { + if meta.Throttle == nil { + return nil + } + t := meta.Throttle + if err := validateThrottlePair("ThrottleSend", t.SendCapacity, t.SendRefillPerSecond, true); err != nil { + return err + } + if err := validateThrottlePair("ThrottleRecv", t.RecvCapacity, t.RecvRefillPerSecond, true); err != nil { + return err + } + // Default* gets the same batch-capacity floor as Send*/Recv* + // because resolveActionConfig in sqs_throttle.go falls Send and + // Receive traffic through to Default whenever the corresponding + // Send*/Recv* pair is unset. Without the floor, a config like + // `ThrottleDefaultCapacity=5, ThrottleDefaultRefillPerSecond=1` + // would be accepted but make every full SendMessageBatch / + // DeleteMessageBatch (charge=10) permanently unserviceable — + // the bucket can never accumulate the 10 tokens. Codex P1 on + // PR #679 round 5 caught the gap; the design doc note in §3.2 + // claiming Default* is exempt was wrong about the fall-through. + if err := validateThrottlePair("ThrottleDefault", t.DefaultCapacity, t.DefaultRefillPerSecond, true); err != nil { + return err + } + if t.IsEmpty() { + // All-zero post-apply means the operator wrote a "disable" + // command; canonicalise to nil so downstream code hits the + // nil-throttle short-circuit rather than the IsEmpty branch. + meta.Throttle = nil + } + return nil +} + +// addThrottleAttributes renders the non-zero Throttle* pairs into out. +// Per §3.2 the wire-side vocabulary stays Send*/Recv*/Default*; the +// canonical bucket-action vocabulary is internal to the bucket store. +func addThrottleAttributes(out map[string]string, t *sqsQueueThrottle) { + if t.IsEmpty() { + return + } + if t.SendCapacity > 0 { + out["ThrottleSendCapacity"] = strconv.FormatFloat(t.SendCapacity, 'g', -1, 64) + out["ThrottleSendRefillPerSecond"] = strconv.FormatFloat(t.SendRefillPerSecond, 'g', -1, 64) + } + if t.RecvCapacity > 0 { + out["ThrottleRecvCapacity"] = strconv.FormatFloat(t.RecvCapacity, 'g', -1, 64) + out["ThrottleRecvRefillPerSecond"] = strconv.FormatFloat(t.RecvRefillPerSecond, 'g', -1, 64) + } + if t.DefaultCapacity > 0 { + out["ThrottleDefaultCapacity"] = strconv.FormatFloat(t.DefaultCapacity, 'g', -1, 64) + out["ThrottleDefaultRefillPerSecond"] = strconv.FormatFloat(t.DefaultRefillPerSecond, 'g', -1, 64) + } +} + +// validateThrottlePair runs the per-(action, capacity, refill) checks. +// requireBatchCapacity gates the capacity ≥ 10 rule so the catch-all +// Default* bucket (no batch verbs in scope today) does not get the +// extra constraint. +func validateThrottlePair(prefix string, capacity, refill float64, requireBatchCapacity bool) error { + if capacity == 0 && refill == 0 { + return nil + } + if capacity == 0 || refill == 0 { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + prefix+"Capacity and "+prefix+"RefillPerSecond must both be zero (disabled) or both positive") + } + if capacity < refill { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + prefix+"Capacity must be ≥ "+prefix+"RefillPerSecond (capacity is the burst cap; below refill the bucket cannot accumulate)") + } + if requireBatchCapacity && capacity < throttleMinBatchCapacity { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + prefix+"Capacity must be ≥ 10 — batch verbs (SendMessageBatch / DeleteMessageBatch) charge up to 10 tokens per call; a smaller capacity makes every full batch permanently unserviceable") + } return nil } @@ -440,6 +727,15 @@ func attributesEqual(a, b *sqsQueueMeta) bool { if a == nil || b == nil { return false } + return baseAttributesEqual(a, b) && + throttleConfigEqual(a.Throttle, b.Throttle) && + htfifoAttributesEqual(a, b) +} + +// baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set. +// Split from attributesEqual so adding fields per phase does not +// push the function over the cyclop ceiling. +func baseAttributesEqual(a, b *sqsQueueMeta) bool { return a.IsFIFO == b.IsFIFO && a.ContentBasedDedup == b.ContentBasedDedup && a.VisibilityTimeoutSeconds == b.VisibilityTimeoutSeconds && @@ -450,6 +746,54 @@ func attributesEqual(a, b *sqsQueueMeta) bool { a.RedrivePolicy == b.RedrivePolicy } +// throttleConfigEqual compares two Throttle configs for the +// CreateQueue idempotency check. Without including the throttle +// fields in attributesEqual, a re-create with different limits would +// be treated as idempotent and silently keep the old limits. +func throttleConfigEqual(a, b *sqsQueueThrottle) bool { + aEmpty := a.IsEmpty() + bEmpty := b.IsEmpty() + if aEmpty && bEmpty { + return true + } + if aEmpty != bEmpty { + return false + } + return a.SendCapacity == b.SendCapacity && + a.SendRefillPerSecond == b.SendRefillPerSecond && + a.RecvCapacity == b.RecvCapacity && + a.RecvRefillPerSecond == b.RecvRefillPerSecond && + a.DefaultCapacity == b.DefaultCapacity && + a.DefaultRefillPerSecond == b.DefaultRefillPerSecond +} + +// htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields. +// +// PartitionCount normalisation: validatePartitionConfig documents 0 +// and 1 as equivalent ("unset" / "single-partition"); a queue created +// without PartitionCount is stored as 0 while a queue created with +// explicit PartitionCount=1 is stored as 1, so strict equality would +// have CreateQueue reject the second call as "different attributes" +// even though the queues are semantically identical (Codex P2 on +// PR #679 round 6.1). normalisePartitionCount maps both to 1 for the +// idempotency check. +func htfifoAttributesEqual(a, b *sqsQueueMeta) bool { + return normalisePartitionCount(a.PartitionCount) == normalisePartitionCount(b.PartitionCount) && + a.FifoThroughputLimit == b.FifoThroughputLimit && + a.DeduplicationScope == b.DeduplicationScope +} + +// normalisePartitionCount collapses the two "single-partition" forms +// (0 = unset, 1 = explicit) into a single canonical value so equality +// checks treat them as identical. Any value > 1 is returned unchanged +// — those cases must already match exactly to be considered equal. +func normalisePartitionCount(n uint32) uint32 { + if n == 0 { + return 1 + } + return n +} + // ------------------------ storage primitives ------------------------ func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 { @@ -517,6 +861,16 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + // Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1 + // must reject until PR 5 wires the data plane atomically with the + // gate-lift. Without this, accepting a partitioned-queue create + // would let SendMessage write under the legacy single-partition + // prefix; the PR 5 reader would never find those messages and the + // reaper would not enumerate them — silent message loss. + if err := validatePartitionDormancyGate(requested); err != nil { + writeSQSErrorFromErr(w, err) + return + } if len(in.Tags) > sqsMaxTagsPerQueue { // AWS caps tags per queue at 50. CreateQueue must reject // over-cap tag bundles up front; a silent slice-and-store @@ -605,6 +959,15 @@ func (s *SQSServer) tryCreateQueueOnce(ctx context.Context, requested *sqsQueueM if _, err := s.coordinator.Dispatch(ctx, req); err != nil { return false, errors.WithStack(err) } + // Drop any throttle bucket that survived a delete-then-create race + // (Codex P2 on PR #679 round 5). DeleteQueue invalidates after its + // commit, but a sendMessage holding pre-delete meta can recreate + // a bucket between that invalidate and this CreateQueue commit; + // invalidating again here on a genuine create (not the idempotent + // return path above, which exits before this point) guarantees + // the new queue starts with a fresh full-capacity bucket + // regardless of in-flight traffic to the prior incarnation. + s.throttle.invalidateQueue(requested.Name) return true, nil } @@ -623,6 +986,14 @@ func (s *SQSServer) deleteQueue(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + // Drop in-memory throttle buckets belonging to this queue so a + // same-name CreateQueue immediately after this delete starts with + // a fresh full-capacity bucket, not the stale balance from the + // previous incarnation. Without this step the old throttle would + // keep enforcing for up to the idle-evict window (default 1 h), + // surprising operators who use DeleteQueue+CreateQueue to reset + // queue state. + s.throttle.invalidateQueue(name) // SQS DeleteQueue returns 200 with an empty body. writeSQSJSON(w, map[string]any{}) } @@ -954,6 +1325,16 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection, if meta.RedrivePolicy != "" { all["RedrivePolicy"] = meta.RedrivePolicy } + // Throttle* are non-AWS extensions. Surfacing them in + // GetQueueAttributes lets operators read back what they set; SDKs + // that strictly validate the attribute set will ignore unknown + // keys. Extracted into a helper so queueMetaToAttributes stays + // under the cyclop ceiling. + addThrottleAttributes(all, meta.Throttle) + // HT-FIFO attributes (Phase 3.D). Same omission rule as Throttle*: + // only present when configured. Extracted into a helper so this + // function stays under the cyclop ceiling. + addHTFIFOAttributes(all, meta) if selection.expandAll { return all } @@ -989,6 +1370,20 @@ func (s *SQSServer) setQueueAttributes(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + // Drop the in-memory bucket entries belonging to this queue *after* + // the Raft commit so the next request rebuilds from the freshly + // committed throttle config. Gated on whether the request actually + // touched a Throttle* attribute — an unconditional invalidate + // would reset the bucket on every unrelated SetQueueAttributes + // (e.g. VisibilityTimeout-only update), giving any caller a way to + // silently restore a noisy tenant's burst capacity by writing a + // no-op SetQueueAttributes (Codex P1 on PR #679). The bucket + // reconciliation in loadOrInit also catches a stale bucket if a + // throttle change slips past this gate (e.g. via a future admin + // path), so the gating here is purely a hot-path optimisation. + if throttleAttributesPresent(in.Attributes) { + s.throttle.invalidateQueue(name) + } writeSQSJSON(w, map[string]any{}) } @@ -1011,6 +1406,42 @@ func (s *SQSServer) setQueueAttributesWithRetry(ctx context.Context, queueName s return newSQSAPIError(http.StatusInternalServerError, sqsErrInternalFailure, "set queue attributes retry attempts exhausted") } +// applyAndValidateSetAttributes runs the apply + cross-validator +// chain for a SetQueueAttributes request. Extracted from +// trySetQueueAttributesOnce so that function stays under the cyclop +// ceiling once HT-FIFO immutability + Throttle validators were +// added. Returns nil on success; on rejection returns the typed +// sqsAPIError the caller forwards to writeSQSErrorFromErr. +// +// preApply snapshot allocation is gated on htfifoAttributesPresent +// so the common "mutable-only update" path stays alloc-free per the +// Gemini medium feedback on PR #681. +func applyAndValidateSetAttributes(meta *sqsQueueMeta, attrs map[string]string) error { + var preApply *sqsQueueMeta + if htfifoAttributesPresent(attrs) { + preApply = snapshotImmutableHTFIFO(meta) + } + if err := applyAttributes(meta, attrs); err != nil { + return err + } + if preApply != nil { + if err := validatePartitionImmutability(preApply, meta); err != nil { + return err + } + } + // ContentBasedDeduplication is FIFO-only; a Standard queue + // silently accepting it would advertise unsupported behavior to + // clients. Same rule enforced on CreateQueue. + if meta.ContentBasedDedup && !meta.IsFIFO { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") + } + // HT-FIFO schema validator runs after applyAttributes so the + // FIFO-only checks see the post-apply state. IsFIFO comes from + // the loaded meta record (immutable from CreateQueue) so the + // validator sees the same flag CreateQueue set. + return validatePartitionConfig(meta) +} + // trySetQueueAttributesOnce is one read-validate-commit pass. The first // return reports whether the caller should stop retrying (the attrs // are now committed); an error means either a non-retryable failure @@ -1024,15 +1455,9 @@ func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName str if !exists { return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } - if err := applyAttributes(meta, attrs); err != nil { + if err := applyAndValidateSetAttributes(meta, attrs); err != nil { return false, err } - // ContentBasedDeduplication is FIFO-only; a Standard queue - // silently accepting it would advertise unsupported behavior to - // clients. Same rule enforced on CreateQueue. - if meta.ContentBasedDedup && !meta.IsFIFO { - return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") - } meta.LastModifiedAtMillis = time.Now().UnixMilli() metaBytes, err := encodeSQSQueueMeta(meta) if err != nil { diff --git a/adapter/sqs_catalog_test.go b/adapter/sqs_catalog_test.go index 291732ded..b68e6fa43 100644 --- a/adapter/sqs_catalog_test.go +++ b/adapter/sqs_catalog_test.go @@ -136,6 +136,29 @@ func TestSQSServer_CatalogCreateIsIdempotent(t *testing.T) { if got, _ := out3["__type"].(string); got != sqsErrQueueNameExists { t.Fatalf("differing-attrs error type: got %q want %q", got, sqsErrQueueNameExists) } + + // Fourth call: same name, same non-throttle attrs as the original + // create, but different Throttle* values. The original create had + // no Throttle config; this one adds one. throttleConfigEqual must + // notice the diff and the call must reject as QueueNameExists. + // Without this case a bug in throttleConfigEqual (e.g. always + // returning true) would slip past the existing VisibilityTimeout- + // only test (Claude review on PR #679 round 2). + withThrottle := map[string]any{ + "QueueName": "idempotent", + "Attributes": map[string]string{ + "VisibilityTimeout": "60", + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }, + } + status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle) + if status4 != http.StatusBadRequest { + t.Fatalf("re-create with added Throttle*: got %d want 400; body %v", status4, out4) + } + if got, _ := out4["__type"].(string); got != sqsErrQueueNameExists { + t.Fatalf("Throttle*-diff error type: got %q want %q", got, sqsErrQueueNameExists) + } } func TestSQSServer_CatalogGetAndSetAttributes(t *testing.T) { diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index a64a0006e..0bff26b6a 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -294,33 +294,67 @@ type sqsChangeVisibilityInput struct { // ------------------------ handlers ------------------------ -func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) { +// prepareSendMessage decodes the SendMessage payload and resolves +// the queue name. Throttle charging happens after the meta load in +// validateSend so we don't pay an extra meta read just to discover +// throttling is off (Gemini high on PR #679). +func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (sqsSendMessageInput, string, bool) { var in sqsSendMessageInput if err := decodeSQSJSONInput(r, &in); err != nil { writeSQSErrorFromErr(w, err) - return + return in, "", false } queueName, err := queueNameFromURL(in.QueueUrl) if err != nil { writeSQSErrorFromErr(w, err) - return + return in, "", false } + return in, queueName, true +} + +// validateSend loads queue meta, runs the throttle charge against +// the loaded throttle config (no extra meta read), then validates +// message attributes / FIFO params and resolves the delay. Returns +// ok=false if any step has already written the error response. +// +// Throttle check sits AFTER the meta load (so we have the throttle +// config) and AFTER the QueueDoesNotExist branch (so a missing +// queue is reported as 400 QueueDoesNotExist, not as a Throttling +// 400 against a non-existent bucket). It still sits OUTSIDE the +// OCC transaction (§4.2): a rejected request never reaches the +// coordinator. +func (s *SQSServer) validateSend(w http.ResponseWriter, r *http.Request, queueName string, in sqsSendMessageInput) (*sqsQueueMeta, uint64, int64, bool) { meta, readTS, apiErr := s.loadQueueMetaForSend(r.Context(), queueName, []byte(in.MessageBody)) if apiErr != nil { writeSQSErrorFromErr(w, apiErr) - return + return nil, 0, 0, false + } + if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle) { + return nil, 0, 0, false } if apiErr := validateMessageAttributes(in.MessageAttributes); apiErr != nil { writeSQSErrorFromErr(w, apiErr) - return + return nil, 0, 0, false } if apiErr := validateSendFIFOParams(meta, in); apiErr != nil { writeSQSErrorFromErr(w, apiErr) - return + return nil, 0, 0, false } delay, apiErr := resolveSendDelay(meta, in.DelaySeconds) if apiErr != nil { writeSQSErrorFromErr(w, apiErr) + return nil, 0, 0, false + } + return meta, readTS, delay, true +} + +func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) { + in, queueName, ok := s.prepareSendMessage(w, r) + if !ok { + return + } + meta, readTS, delay, ok := s.validateSend(w, r, queueName, in) + if !ok { return } if meta.IsFIFO { @@ -530,6 +564,13 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) { writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") return } + // Throttle check uses the loaded meta's throttle config so we + // don't pay an extra meta read just to discover throttling is off + // (Gemini high on PR #679). Sits AFTER the QueueDoesNotExist + // branch — a missing queue should not consume a Recv token. + if !s.chargeQueueWithThrottle(w, queueName, bucketActionReceive, 1, meta.Throttle) { + return + } max, maxErr := resolveReceiveMaxMessages(in.MaxNumberOfMessages) if maxErr != nil { writeSQSErrorFromErr(w, maxErr) @@ -1106,6 +1147,9 @@ func (s *SQSServer) deleteMessage(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) { + return + } if err := s.deleteMessageWithRetry(r.Context(), queueName, handle); err != nil { writeSQSErrorFromErr(w, err) return @@ -1258,6 +1302,9 @@ func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Reque writeSQSErrorFromErr(w, err) return } + if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) { + return + } if err := s.changeVisibilityWithRetry(r.Context(), queueName, handle, timeout); err != nil { writeSQSErrorFromErr(w, err) return diff --git a/adapter/sqs_messages_batch.go b/adapter/sqs_messages_batch.go index b8adca271..68d47b7b2 100644 --- a/adapter/sqs_messages_batch.go +++ b/adapter/sqs_messages_batch.go @@ -94,6 +94,9 @@ func (s *SQSServer) sendMessageBatch(w http.ResponseWriter, r *http.Request) { "total batch payload exceeds 262144 bytes") return } + if !s.chargeQueue(w, r, queueName, bucketActionSend, throttleChargeCount(len(in.Entries))) { + return + } successful, failed, err := s.sendMessageBatchWithRetry(r.Context(), queueName, in.Entries) if err != nil { @@ -453,6 +456,9 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) { + return + } successful := make([]sqsBatchResultEntry, 0, len(in.Entries)) failed := make([]sqsBatchResultErrorEntry, 0) @@ -519,6 +525,9 @@ func (s *SQSServer) changeMessageVisibilityBatch(w http.ResponseWriter, r *http. writeSQSErrorFromErr(w, err) return } + if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) { + return + } successful := make([]sqsBatchResultEntry, 0, len(in.Entries)) failed := make([]sqsBatchResultErrorEntry, 0) diff --git a/adapter/sqs_partitioning.go b/adapter/sqs_partitioning.go new file mode 100644 index 000000000..0185c36fc --- /dev/null +++ b/adapter/sqs_partitioning.go @@ -0,0 +1,285 @@ +package adapter + +import ( + "net/http" + "strconv" +) + +// HT-FIFO (Phase 3.D split-queue FIFO) configuration vocabulary and +// the routing primitive partitionFor. See the design doc at +// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. +// +// PR 2 of the §11 rollout introduces the schema fields plus the +// validation surface — including the temporary dormancy gate that +// rejects PartitionCount > 1 at CreateQueue. PR 5 lifts the gate +// atomically with the data-plane fanout so a half-deployed cluster +// can never accept a partitioned queue without the data plane to +// serve it. Until then the field exists in the meta type and the +// router function compiles, but no partitioned queue can land. + +const ( + // htfifoMaxPartitions caps the per-queue partition count. 32 is + // enough for ~30,000 RPS per queue at the per-shard ~1,000 RPS + // limit. Higher would require larger per-queue meta records and + // more reaper cycles; bumping the cap is a follow-up if operators + // demand it. See §10 of the design. + htfifoMaxPartitions uint32 = 32 + + // htfifoThroughputPerMessageGroupID is the default + // FifoThroughputLimit value for HT-FIFO queues — every group ID + // hashes to a partition independently, giving the throughput + // scaling HT-FIFO is designed for. + htfifoThroughputPerMessageGroupID = "perMessageGroupId" + // htfifoThroughputPerQueue activates the §3.3 short-circuit: every + // group ID routes to partition 0, collapsing throughput back to + // what a single-partition queue gets. Useful for clients that want + // the AWS attribute set without the extra capacity. + htfifoThroughputPerQueue = "perQueue" + + // htfifoDedupeScopeMessageGroup is the default DeduplicationScope + // value for HT-FIFO queues — the dedup window is per (queue, + // partition, MessageGroupId, dedupId). + htfifoDedupeScopeMessageGroup = "messageGroup" + // htfifoDedupeScopeQueue is the legacy single-window scope. Per + // §3.2 this is incompatible with PartitionCount > 1 (the dedup + // key cannot be globally unique across partitions without a + // cross-partition OCC transaction); the validator rejects the + // combination at CreateQueue time. + htfifoDedupeScopeQueue = "queue" +) + +// htfifoTemporaryGateMessage is the operator-facing reason the +// CreateQueue gate uses while PR 2-4 are in production. Removed in +// PR 5 in the same commit that wires the data-plane fanout. +const htfifoTemporaryGateMessage = "PartitionCount > 1 requires HT-FIFO data plane — not yet enabled" + +// partitionFor maps a (queue meta, MessageGroupId) pair to a +// partition index in [0, PartitionCount). Edge cases: +// +// - PartitionCount == 0 or 1 → always 0 (legacy single-partition). +// - FifoThroughputLimit == "perQueue" → always 0 (the §3.3 +// short-circuit; collapses every group to one partition). +// - Empty MessageGroupId → 0 (defensive; FIFO send validation +// should already have rejected this). +// +// Hashing uses FNV-1a per §3.3 of the design: fast, no SIMD setup +// cost, deterministic across Go versions and architectures, no key. +// Operators do not need this to be cryptographically strong — +// well-distributed and deterministic is what matters. +func partitionFor(meta *sqsQueueMeta, messageGroupID string) uint32 { + if meta == nil { + return 0 + } + if meta.PartitionCount <= 1 { + return 0 + } + if meta.FifoThroughputLimit == htfifoThroughputPerQueue { + return 0 + } + if messageGroupID == "" { + return 0 + } + // Inlined FNV-1a over the string to avoid the []byte allocation + // hash/fnv.New64a + h.Write would force (Gemini medium on PR + // #681). MessageGroupId is capped at 128 chars by validation, so + // this loop bounds at 128 iterations of integer arithmetic per + // SendMessage — measurably faster than the hash.Hash interface + // path on the routing hot path. + const ( + fnv64Offset uint64 = 14695981039346656037 + fnv64Prime uint64 = 1099511628211 + ) + hash := fnv64Offset + for i := 0; i < len(messageGroupID); i++ { + hash ^= uint64(messageGroupID[i]) + hash *= fnv64Prime + } + // PartitionCount is a power of two (validator-enforced); mod is + // equivalent to mask-AND. The mask is meta.PartitionCount - 1. + // Computing the mask in uint64 first then narrowing to uint32 is + // safe because htfifoMaxPartitions == 32 fits in uint32 trivially. + mask := uint64(meta.PartitionCount - 1) + return uint32(hash & mask) //nolint:gosec // masked by (PartitionCount - 1) ≤ htfifoMaxPartitions − 1, fits in uint32. +} + +// isPowerOfTwo returns true when n is a positive power of two. +// PartitionCount must satisfy this so partitionFor's bitwise mask +// (h & (n-1)) is equivalent to (h % n) — without the constraint the +// distribution would be biased toward the lower indices. +func isPowerOfTwo(n uint32) bool { + return n > 0 && (n&(n-1)) == 0 +} + +// validatePartitionConfig enforces the §3.2 cross-attribute rules on +// the post-applier meta. Per-field constraints (parse, range) live +// inside the per-attribute appliers. Cross-field rules: +// +// - PartitionCount must be a power of two in [1, htfifoMaxPartitions] +// when set. PartitionCount == 0 is canonical "unset" and is +// equivalent to 1 for routing purposes. +// - FifoThroughputLimit / DeduplicationScope are FIFO-only — +// setting either on a Standard queue rejects with +// InvalidAttributeValue. +// - {PartitionCount > 1, DeduplicationScope = "queue"} rejects +// with InvalidParameterValue: queue-scoped dedup is incompatible +// with multi-partition FIFO because the dedup key cannot be +// globally unique across partitions without a cross-partition +// OCC transaction. +// - The §11 PR 2 dormancy gate (PartitionCount > 1 rejected at +// CreateQueue) lives in validatePartitionDormancyGate so the +// dormancy check can be turned off in unit tests that want to +// exercise the full schema path. Production CreateQueue calls +// both validators. +func validatePartitionConfig(meta *sqsQueueMeta) error { + if meta.PartitionCount > 0 { + if !isPowerOfTwo(meta.PartitionCount) { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount must be a power of two") + } + if meta.PartitionCount > htfifoMaxPartitions { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount exceeds the per-queue cap of "+strconv.FormatUint(uint64(htfifoMaxPartitions), 10)) + } + } + if !meta.IsFIFO { + // PartitionCount > 1 only makes sense on FIFO queues (HT-FIFO + // is by definition a FIFO feature). Without this guard a + // Standard queue with PartitionCount=2 would slip past the + // validator once PR 5 lifts the dormancy gate (Claude review + // on PR #681 round 2 caught this). PartitionCount=0 and 1 + // are accepted because both mean "single-partition layout" + // which is valid on Standard queues. + if meta.PartitionCount > 1 { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount > 1 is only valid on FIFO queues") + } + if meta.FifoThroughputLimit != "" { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit is only valid on FIFO queues") + } + if meta.DeduplicationScope != "" { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope is only valid on FIFO queues") + } + } + if meta.PartitionCount > 1 && meta.DeduplicationScope == htfifoDedupeScopeQueue { + // sqsErrValidation is "InvalidParameterValue" (Gemini medium + // on PR #681 — uses the existing constant rather than a + // duplicate-value alias). + return newSQSAPIError(http.StatusBadRequest, sqsErrValidation, + "queue-scoped deduplication is incompatible with multi-partition FIFO because the dedup key cannot be globally unique across partitions without a cross-partition OCC transaction") + } + return nil +} + +// validatePartitionDormancyGate is the temporary §11 PR 2 gate. As +// long as the data-plane fanout (PR 5) has not landed, accepting a +// partitioned-queue CreateQueue would let SendMessage write under +// the legacy single-partition prefix — the PR 5 reader would never +// find those messages and the reaper would not enumerate them. This +// gate makes the wrong-layout-data class of bug impossible. +// +// Removed in PR 5 in the same commit that wires the data plane so +// the gate-and-lift land atomically. +func validatePartitionDormancyGate(meta *sqsQueueMeta) error { + if meta.PartitionCount > 1 { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + htfifoTemporaryGateMessage) + } + return nil +} + +// validatePartitionImmutability enforces the §3.2 rule that +// PartitionCount, FifoThroughputLimit, and DeduplicationScope are +// all immutable from CreateQueue onward. Called from +// trySetQueueAttributesOnce after the meta is loaded; rejects the +// whole SetQueueAttributes call (all-or-nothing — even mutable +// attributes in the same request do not commit when an immutable +// one is invalid) per §3.2. +// +// requested is the post-apply meta; current is the on-disk meta. +// If any of the three immutable fields differs, the validator +// returns InvalidAttributeValue naming the attribute so the +// operator sees the cause directly. A same-value "no-op" succeeds. +// +// PartitionCount uses normalisePartitionCount so a SetQueueAttributes +// request that passes the canonical-equivalent value (e.g. 1 on a +// queue stored with 0, or 0 on a queue stored with 1) is treated as +// the no-op it semantically is — strict equality would reject with +// "PartitionCount is immutable" even though the partition layout +// hasn't changed (Claude Low on PR #679 round 6.2). +func validatePartitionImmutability(current, requested *sqsQueueMeta) error { + if current == nil || requested == nil { + return nil + } + if normalisePartitionCount(current.PartitionCount) != normalisePartitionCount(requested.PartitionCount) { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + if current.FifoThroughputLimit != requested.FifoThroughputLimit { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + if current.DeduplicationScope != requested.DeduplicationScope { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + return nil +} + +// htfifoAttributeKeys lists the wire-side attribute names that this +// PR introduces. Used by the immutability check (and future +// admin-surface code) to know which keys a SetQueueAttributes +// request might attempt to change. +var htfifoAttributeKeys = []string{ + "PartitionCount", + "FifoThroughputLimit", + "DeduplicationScope", +} + +// htfifoAttributesPresent reports whether any HT-FIFO attribute key +// appears in attrs. Cheap helper used by the validator to short- +// circuit the immutability check for SetQueueAttributes requests +// that touch only mutable attributes. +func htfifoAttributesPresent(attrs map[string]string) bool { + for _, k := range htfifoAttributeKeys { + if _, ok := attrs[k]; ok { + return true + } + } + return false +} + +// addHTFIFOAttributes renders the configured HT-FIFO attributes into +// out. Mirrors the Throttle* renderer in addThrottleAttributes; same +// omission rule (only present when set), same wire-side names. Kept +// in this file so the HT-FIFO surface lives in one place. +func addHTFIFOAttributes(out map[string]string, meta *sqsQueueMeta) { + if meta == nil { + return + } + if meta.PartitionCount > 0 { + out["PartitionCount"] = strconv.FormatUint(uint64(meta.PartitionCount), 10) + } + if meta.FifoThroughputLimit != "" { + out["FifoThroughputLimit"] = meta.FifoThroughputLimit + } + if meta.DeduplicationScope != "" { + out["DeduplicationScope"] = meta.DeduplicationScope + } +} + +// snapshotImmutableHTFIFO captures the three immutable HT-FIFO field +// values from a meta record. Returned struct is shallow-equal-comparable +// — validatePartitionImmutability uses the snapshot to check for any +// differing value after applyAttributes runs. +func snapshotImmutableHTFIFO(meta *sqsQueueMeta) *sqsQueueMeta { + if meta == nil { + return nil + } + return &sqsQueueMeta{ + PartitionCount: meta.PartitionCount, + FifoThroughputLimit: meta.FifoThroughputLimit, + DeduplicationScope: meta.DeduplicationScope, + } +} diff --git a/adapter/sqs_partitioning_integration_test.go b/adapter/sqs_partitioning_integration_test.go new file mode 100644 index 000000000..9bc5f88b5 --- /dev/null +++ b/adapter/sqs_partitioning_integration_test.go @@ -0,0 +1,269 @@ +package adapter + +import ( + "net/http" + "strings" + "testing" +) + +// TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate pins +// the §11 PR 2 dormancy gate at the wire layer: CreateQueue with +// PartitionCount > 1 rejects with InvalidAttributeValue and the +// gate's reason ("not yet enabled") makes it into the operator- +// visible message. Removed in PR 5 in the same commit that wires +// the data plane. +func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + for _, n := range []string{"2", "4", "8", "32"} { + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-gate-" + n + ".fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": n, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=%s: status %d (expected 400 from dormancy gate); body=%v", n, status, out) + } + if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { + t.Fatalf("PartitionCount=%s: __type=%q (expected InvalidAttributeValue)", n, got) + } + msg, _ := out["message"].(string) + if msg == "" || !strings.Contains(msg, "not yet enabled") { + t.Fatalf("PartitionCount=%s: message %q must mention the gate reason", n, msg) + } + } +} + +// TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne pins +// the no-op-partition-count path: PartitionCount=1 is the legacy +// single-partition layout and must pass the dormancy gate even on +// FIFO queues that explicitly set the field. +func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-singlepart.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "1", + }, + }) + if status != http.StatusOK { + t.Fatalf("PartitionCount=1 must be accepted: status %d body %v", status, out) + } +} + +// TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount pins the +// validator's power-of-two rule. The validator runs before the +// dormancy gate so an invalid count (3) reports the validator's +// reason, not the gate's. +func TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-bad-count.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "3", + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=3 must reject: status %d", status) + } + if msg, _ := out["message"].(string); msg == "" || !strings.Contains(msg, "power of two") { + t.Fatalf("expected 'power of two' in message, got %q", msg) + } +} + +// TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue pins the +// FIFO-only rule: setting FifoThroughputLimit or DeduplicationScope +// on a Standard queue rejects with InvalidAttributeValue. Without +// this, the queue would silently land with no-op attributes that +// SDK clients might mistake for actually configured. +func TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, _ := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "standard-with-htfifo-attr", + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerMessageGroupID, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("FifoThroughputLimit on Standard queue: status %d (expected 400)", status) + } +} + +// TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned pins +// the §3.2 cross-attribute control-plane gate at the wire layer. +// {PartitionCount > 1, DeduplicationScope = "queue"} is rejected by +// validatePartitionConfig (the schema validator) which runs inside +// parseAttributesIntoMeta — that is, BEFORE validatePartitionDormancyGate +// runs in createQueue. So the cross-attr rejection is what the wire +// layer sees today, even though the dormancy gate would also reject +// the same input on its own. After PR 5 lifts the dormancy gate the +// cross-attr rule remains the sole rejection path. +// +// The test only checks the 400 status to stay agnostic about which +// validator fires first — both are correct behaviour, and a future +// reordering of the createQueue control flow does not need to break +// this test. +func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-bad-dedup.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "2", + "DeduplicationScope": htfifoDedupeScopeQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=2 + DeduplicationScope=queue must reject: status %d body %v", status, out) + } +} + +// TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects pins +// the §3.2 immutability rule at the wire layer: SetQueueAttributes +// attempts to change PartitionCount / FifoThroughputLimit / +// DeduplicationScope reject with InvalidAttributeValue. Test creates +// a single-partition FIFO queue (allowed by dormancy) with +// FifoThroughputLimit set, then tries to change it. +func TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-immutable.fifo", htfifoThroughputPerMessageGroupID) + + // Try to flip FifoThroughputLimit. Must reject. + status, out := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("FifoThroughputLimit change: status %d body %v (expected 400 immutable)", status, out) + } + // Same-value no-op succeeds. + status, _ = callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerMessageGroupID, + }, + }) + if status != http.StatusOK { + t.Fatalf("same-value no-op SetQueueAttributes: status %d (expected 200)", status) + } +} + +// TestSQSServer_HTFIFO_ImmutabilityAllOrNothing pins the §3.2 all- +// or-nothing rule: a SetQueueAttributes that touches a *mutable* +// attribute alongside an attempted *immutable* change rejects the +// whole request, leaving the mutable attribute unchanged on the +// meta record. +func TestSQSServer_HTFIFO_ImmutabilityAllOrNothing(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-allornothing.fifo", htfifoThroughputPerMessageGroupID) + + // Combined: mutable VisibilityTimeout + immutable FifoThroughputLimit + // change. Must reject as a whole, mutable change must not commit. + status, _ := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "VisibilityTimeout": "60", + "FifoThroughputLimit": htfifoThroughputPerQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("mutable+immutable combined: status %d (expected 400)", status) + } + // Confirm VisibilityTimeout did NOT commit by reading it back. + status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "AttributeNames": []string{"VisibilityTimeout"}, + }) + if status != http.StatusOK { + t.Fatalf("get attrs: %d", status) + } + attrs, _ := out["Attributes"].(map[string]any) + if got, _ := attrs["VisibilityTimeout"].(string); got == "60" { + t.Fatalf("all-or-nothing violated: VisibilityTimeout committed even though immutable change rejected (got %q)", got) + } +} + +// TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip pins the wire +// surface for the configured HT-FIFO attributes: SetQueueAttributes +// (or CreateQueue with the attribute) followed by GetQueueAttributes +// returns the same value. +func TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-roundtrip.fifo", htfifoThroughputPerMessageGroupID) + status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "AttributeNames": []string{"All"}, + }) + if status != http.StatusOK { + t.Fatalf("GetQueueAttributes: status %d", status) + } + attrs, _ := out["Attributes"].(map[string]any) + if got, _ := attrs["FifoThroughputLimit"].(string); got != htfifoThroughputPerMessageGroupID { + t.Fatalf("FifoThroughputLimit round-trip: got %q want %q", got, htfifoThroughputPerMessageGroupID) + } + if _, present := attrs["DeduplicationScope"]; present { + t.Fatalf("DeduplicationScope must be omitted when not set; attrs=%v", attrs) + } + if _, present := attrs["PartitionCount"]; present { + t.Fatalf("PartitionCount must be omitted when not set / left at zero; attrs=%v", attrs) + } +} + +// --- helpers --- + +// mustCreateFIFOWithThroughputLimit creates a single-partition FIFO +// queue (allowed by the §11 PR 2 dormancy gate) with the requested +// FifoThroughputLimit set. Used by the immutability tests so they +// have a non-empty FifoThroughputLimit to attempt to change. +func mustCreateFIFOWithThroughputLimit(t *testing.T, node Node, name, limit string) string { + t.Helper() + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{ + "FifoQueue": "true", + "FifoThroughputLimit": limit, + }, + }) + if status != http.StatusOK { + t.Fatalf("createQueue %q: status %d body %v", name, status, out) + } + url, _ := out["QueueUrl"].(string) + return url +} diff --git a/adapter/sqs_partitioning_test.go b/adapter/sqs_partitioning_test.go new file mode 100644 index 000000000..7fa0e55d4 --- /dev/null +++ b/adapter/sqs_partitioning_test.go @@ -0,0 +1,354 @@ +package adapter + +import ( + "errors" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +// --- partitionFor unit tests --- + +// TestPartitionFor_LegacyZeroOrOneAlwaysPartitionZero pins the +// single-partition compatibility contract: a queue with +// PartitionCount == 0 (the unset state) or 1 routes every group ID +// to partition 0. Without this guarantee an existing single- +// partition queue would re-shuffle messages once PR 5 lands the +// data plane. +func TestPartitionFor_LegacyZeroOrOneAlwaysPartitionZero(t *testing.T) { + t.Parallel() + for _, count := range []uint32{0, 1} { + meta := &sqsQueueMeta{PartitionCount: count} + for _, gid := range []string{"a", "b", "user-1", "long-group-id-blah"} { + require.Equal(t, uint32(0), partitionFor(meta, gid), + "PartitionCount=%d, group=%q must route to 0", count, gid) + } + } +} + +// TestPartitionFor_PerQueueShortCircuits pins the §3.3 short-circuit: +// FifoThroughputLimit=perQueue collapses every group ID to +// partition 0 regardless of PartitionCount. Operators who want the +// AWS attribute set without the throughput scaling depend on this. +func TestPartitionFor_PerQueueShortCircuits(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8, FifoThroughputLimit: htfifoThroughputPerQueue} + for _, gid := range []string{"a", "b", "user-1", "long-group-id"} { + require.Equal(t, uint32(0), partitionFor(meta, gid)) + } +} + +// TestPartitionFor_EmptyMessageGroupIdRoutesZero pins the defensive +// fallback. FIFO send validation rejects empty MessageGroupId so +// this case should never reach the router; the test ensures the +// router doesn't crash if it does. +func TestPartitionFor_EmptyMessageGroupIdRoutesZero(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8} + require.Equal(t, uint32(0), partitionFor(meta, "")) +} + +// TestPartitionFor_DeterministicAcrossRuns pins the §3.3 +// determinism contract: the same group ID always returns the same +// partition. Without it, a consumer that pulls from a partition by +// group ID could see messages re-routed to a different partition on +// a process restart and lose ordering guarantees. +func TestPartitionFor_DeterministicAcrossRuns(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8} + gid := "user-1234" + first := partitionFor(meta, gid) + for range 100 { + require.Equal(t, first, partitionFor(meta, gid)) + } +} + +// TestPartitionFor_DistributionApproximatelyUniform pins the §9 unit +// test from the design: 100k random group IDs across 8 partitions +// must land within ±5% of equal share. FNV-1a is not a CSPRNG but +// for non-adversarial input the distribution is well-behaved. +func TestPartitionFor_DistributionApproximatelyUniform(t *testing.T) { + t.Parallel() + const partitions uint32 = 8 + const sample = 100_000 + meta := &sqsQueueMeta{PartitionCount: partitions} + hits := make(map[uint32]int, partitions) + for i := range sample { + hits[partitionFor(meta, "group-"+strconv.Itoa(i))]++ + } + expected := sample / int(partitions) + tolerance := expected / 20 // ±5% + for p := uint32(0); p < partitions; p++ { + count := hits[p] + if count < expected-tolerance || count > expected+tolerance { + t.Fatalf("partition %d: %d hits, expected within ±%d of %d (full distribution: %v)", + p, count, tolerance, expected, hits) + } + } +} + +// TestPartitionFor_PowerOfTwoMaskingMatchesMod is a regression +// guard for the bitwise-mask optimisation in partitionFor. The +// optimisation is equivalent to `% PartitionCount` only when +// PartitionCount is a power of two — the validator enforces this +// at config time, but if a future bug leaks a non-power-of-two +// value through validation, this test will catch the distribution +// bias immediately. +func TestPartitionFor_PowerOfTwoMaskingMatchesMod(t *testing.T) { + t.Parallel() + for _, n := range []uint32{2, 4, 8, 16, 32} { + meta := &sqsQueueMeta{PartitionCount: n} + for i := range 1000 { + gid := "g-" + strconv.Itoa(i) + require.Less(t, partitionFor(meta, gid), n, + "partitionFor must always be < PartitionCount=%d", n) + } + } +} + +// --- isPowerOfTwo unit tests --- + +func TestIsPowerOfTwo(t *testing.T) { + t.Parallel() + cases := []struct { + n uint32 + want bool + }{ + {0, false}, + {1, true}, + {2, true}, + {3, false}, + {4, true}, + {7, false}, + {8, true}, + {16, true}, + {32, true}, + {33, false}, + } + for _, tc := range cases { + require.Equal(t, tc.want, isPowerOfTwo(tc.n), "n=%d", tc.n) + } +} + +// --- validatePartitionConfig unit tests --- + +// TestValidatePartitionConfig_PowerOfTwo pins the §3.2 rule that +// PartitionCount must be a power of two. The bitwise-mask routing +// in partitionFor depends on this; non-powers would distribute +// unevenly. +func TestValidatePartitionConfig_PowerOfTwo(t *testing.T) { + t.Parallel() + bad := []uint32{3, 5, 6, 7, 9, 10, 12, 15} + for _, n := range bad { + err := validatePartitionConfig(&sqsQueueMeta{PartitionCount: n, IsFIFO: true}) + require.Error(t, err, "n=%d must reject", n) + } + good := []uint32{1, 2, 4, 8, 16, 32} + for _, n := range good { + err := validatePartitionConfig(&sqsQueueMeta{PartitionCount: n, IsFIFO: true}) + require.NoError(t, err, "n=%d must be accepted", n) + } +} + +// TestValidatePartitionConfig_RejectsAboveMax pins the §10 +// per-queue cap. 64 must reject; 32 must succeed. +func TestValidatePartitionConfig_RejectsAboveMax(t *testing.T) { + t.Parallel() + require.Error(t, validatePartitionConfig(&sqsQueueMeta{PartitionCount: 64, IsFIFO: true})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{PartitionCount: 32, IsFIFO: true})) +} + +// TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs pins +// the §3.2 FIFO-only rule: HT-FIFO attributes on a non-FIFO queue +// reject with InvalidAttributeValue. Setting them silently on a +// Standard queue would advertise unsupported behaviour. +// +// PartitionCount > 1 is also FIFO-only (Claude review on PR #681 +// round 2 caught the gap) — without the guard a Standard queue +// with PartitionCount=2 would slip past the validator after PR 5 +// lifts the dormancy gate. PartitionCount 0/1 are still accepted +// on Standard queues because both mean "single-partition layout". +func TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs(t *testing.T) { + t.Parallel() + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, FifoThroughputLimit: htfifoThroughputPerQueue})) + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, DeduplicationScope: htfifoDedupeScopeMessageGroup})) + for _, n := range []uint32{2, 4, 8, 16, 32} { + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: n}), + "PartitionCount=%d on Standard queue must reject", n) + } + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: 0})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: 1})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: true, FifoThroughputLimit: htfifoThroughputPerMessageGroupID, PartitionCount: 8})) +} + +// TestValidatePartitionConfig_RejectsQueueScopedDedupOnPartitioned +// pins the §3.2 cross-attribute control-plane gate: queue-scoped +// dedup is incompatible with multi-partition FIFO because the dedup +// key cannot be globally unique across partitions without a cross- +// partition OCC transaction. Rejected as InvalidParameterValue at +// CreateQueue / SetQueueAttributes time so the operator sees the +// error before a single SendMessage. +func TestValidatePartitionConfig_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { + t.Parallel() + err := validatePartitionConfig(&sqsQueueMeta{ + IsFIFO: true, + PartitionCount: 8, + DeduplicationScope: htfifoDedupeScopeQueue, + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr), "expected sqsAPIError, got %T", err) + require.Equal(t, sqsErrValidation, apiErr.errorType, + "the cross-attribute rejection must use InvalidParameterValue (incoherent params, sqsErrValidation), not InvalidAttributeValue (malformed individual value)") + // Single-partition + queue-scoped dedup is fine (legacy behaviour). + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{ + IsFIFO: true, + PartitionCount: 1, + DeduplicationScope: htfifoDedupeScopeQueue, + })) +} + +// --- validatePartitionDormancyGate unit tests --- + +// TestValidatePartitionDormancyGate_RejectsAboveOne pins the §11 +// PR 2 dormancy gate: PartitionCount > 1 must reject until PR 5 +// lifts the gate. PartitionCount 0 or 1 must pass (both are the +// legacy single-partition layout). +func TestValidatePartitionDormancyGate_RejectsAboveOne(t *testing.T) { + t.Parallel() + require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 0})) + require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 1})) + for _, n := range []uint32{2, 4, 8, 16, 32} { + err := validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: n}) + require.Error(t, err, "PartitionCount=%d must reject under the dormancy gate", n) + require.Contains(t, err.Error(), "not yet enabled", + "the gate's reason must surface to the operator") + } +} + +// --- validatePartitionImmutability unit tests --- + +// TestValidatePartitionImmutability_RejectsAnyChange pins the §3.2 +// immutability rule: SetQueueAttributes attempts to change any of +// the three immutable HT-FIFO fields reject with +// InvalidAttributeValue. +func TestValidatePartitionImmutability_RejectsAnyChange(t *testing.T) { + t.Parallel() + current := &sqsQueueMeta{ + PartitionCount: 8, + FifoThroughputLimit: htfifoThroughputPerMessageGroupID, + DeduplicationScope: htfifoDedupeScopeMessageGroup, + } + cases := []struct { + name string + mutate func(*sqsQueueMeta) + mustError bool + }{ + {"PartitionCount changed", func(m *sqsQueueMeta) { m.PartitionCount = 4 }, true}, + {"FifoThroughputLimit changed", func(m *sqsQueueMeta) { m.FifoThroughputLimit = htfifoThroughputPerQueue }, true}, + {"DeduplicationScope changed", func(m *sqsQueueMeta) { m.DeduplicationScope = htfifoDedupeScopeQueue }, true}, + {"no immutable change (same-value no-op)", func(m *sqsQueueMeta) {}, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + req := *current + tc.mutate(&req) + err := validatePartitionImmutability(current, &req) + if tc.mustError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent +// pins the Claude Low fix on PR #679 round 6.2 / 6.3. The on-disk +// PartitionCount=0 ("unset") is canonical-equivalent to an explicit +// PartitionCount=1 ("single partition"), so a SetQueueAttributes +// that reaffirms the default ought to be a no-op rather than a hard +// "PartitionCount is immutable" rejection. validatePartitionImmutability +// uses normalisePartitionCount on both sides for exactly this case. +func TestValidatePartitionImmutability_PartitionCountZeroAndOneEquivalent(t *testing.T) { + t.Parallel() + cases := []struct { + name string + current uint32 + req uint32 + wantErr bool + }{ + {"stored 0, requested 1 (no-op)", 0, 1, false}, + {"stored 1, requested 0 (no-op)", 1, 0, false}, + {"stored 0, requested 0 (no-op)", 0, 0, false}, + {"stored 1, requested 1 (no-op)", 1, 1, false}, + {"stored 0, requested 2 (real change)", 0, 2, true}, + {"stored 1, requested 2 (real change)", 1, 2, true}, + {"stored 2, requested 1 (real change)", 2, 1, true}, + {"stored 2, requested 0 (real change)", 2, 0, true}, + {"stored 4, requested 8 (real change)", 4, 8, true}, + {"stored 8, requested 8 (no-op)", 8, 8, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + cur := &sqsQueueMeta{PartitionCount: tc.current} + req := &sqsQueueMeta{PartitionCount: tc.req} + err := validatePartitionImmutability(cur, req) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// --- htfifoAttributesPresent --- + +func TestHTFIFOAttributesPresent(t *testing.T) { + t.Parallel() + require.False(t, htfifoAttributesPresent(map[string]string{})) + require.False(t, htfifoAttributesPresent(map[string]string{"VisibilityTimeout": "30"})) + require.True(t, htfifoAttributesPresent(map[string]string{"PartitionCount": "8"})) + require.True(t, htfifoAttributesPresent(map[string]string{"FifoThroughputLimit": htfifoThroughputPerMessageGroupID})) + require.True(t, htfifoAttributesPresent(map[string]string{"DeduplicationScope": htfifoDedupeScopeMessageGroup})) +} + +// TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent pins +// the Codex P2 fix on PR #679 round 6.1: validatePartitionConfig +// documents PartitionCount=0 (unset) and =1 (explicit single +// partition) as semantically identical legacy/single-partition +// routing, so CreateQueue idempotency must treat them as equal — +// otherwise a queue created without PartitionCount (stored as 0) is +// rejected as "different attributes" by a retry that explicitly +// passes PartitionCount=1. +func TestHTFIFOAttributesEqual_PartitionCountZeroAndOneEquivalent(t *testing.T) { + t.Parallel() + a := &sqsQueueMeta{PartitionCount: 0} + b := &sqsQueueMeta{PartitionCount: 1} + require.True(t, htfifoAttributesEqual(a, b), + "PartitionCount 0 (unset) and 1 (explicit single partition) must compare equal") + require.True(t, htfifoAttributesEqual(b, a), + "equality must be symmetric") + // Real divergence (>1 vs 0/1) still rejects. + c := &sqsQueueMeta{PartitionCount: 2} + require.False(t, htfifoAttributesEqual(a, c), + "PartitionCount=2 must differ from unset") + require.False(t, htfifoAttributesEqual(b, c), + "PartitionCount=2 must differ from explicit 1") + // Same > 1 value still equal. + d := &sqsQueueMeta{PartitionCount: 2} + require.True(t, htfifoAttributesEqual(c, d), + "identical PartitionCount > 1 must compare equal") +} + +func TestNormalisePartitionCount(t *testing.T) { + t.Parallel() + require.Equal(t, uint32(1), normalisePartitionCount(0)) + require.Equal(t, uint32(1), normalisePartitionCount(1)) + require.Equal(t, uint32(2), normalisePartitionCount(2)) + require.Equal(t, uint32(8), normalisePartitionCount(8)) +} diff --git a/adapter/sqs_throttle.go b/adapter/sqs_throttle.go new file mode 100644 index 000000000..28f6fd93e --- /dev/null +++ b/adapter/sqs_throttle.go @@ -0,0 +1,577 @@ +package adapter + +import ( + "context" + "math" + "net/http" + "sync" + "time" +) + +// Per-queue throttling — token-bucket store that hangs off *SQSServer. +// See docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md for +// the full design and rollout context. This file implements §3.1 (bucket +// store + token bucket), §3.3 (charging model), §3.4 (Throttling +// envelope helpers) and the cache-invalidation primitives §3.1 calls +// out for SetQueueAttributes / DeleteQueue. + +// Canonical bucket-action vocabulary. The JSON field-name prefixes +// (Send*, Recv*, Default*) are the operator-facing contract; these +// values are what the in-memory map is keyed on. The mapping is fixed +// per the design's §3.2 "Config-field → bucket-action mapping" table. +const ( + bucketActionSend = "Send" + bucketActionReceive = "Receive" + bucketActionAny = "*" +) + +// throttleAllActions is the canonical list used by every cache +// invalidation site. Defined once here so a future verb that grows a +// new bucket cannot land in production with one site forgetting to +// invalidate it. +var throttleAllActions = []string{bucketActionSend, bucketActionReceive, bucketActionAny} + +// throttleAttributeNames is the wire-side set of Throttle* +// attributes a SetQueueAttributes request can carry. Used by the +// invalidation gate in setQueueAttributes so an unrelated update +// (e.g. VisibilityTimeout only) does not pay the cache invalidation +// cost or, worse, give the caller a way to silently reset bucket +// state via a no-op SetQueueAttributes (Codex P1 on PR #679). +var throttleAttributeNames = []string{ + "ThrottleSendCapacity", + "ThrottleSendRefillPerSecond", + "ThrottleRecvCapacity", + "ThrottleRecvRefillPerSecond", + "ThrottleDefaultCapacity", + "ThrottleDefaultRefillPerSecond", +} + +// throttleAttributesPresent reports whether attrs carries any +// Throttle* key. Cheap O(6) check; the throttleAttributeNames slice +// is the source of truth so a future Throttle* attribute name added +// in one place automatically participates in the gate. +func throttleAttributesPresent(attrs map[string]string) bool { + for _, k := range throttleAttributeNames { + if _, ok := attrs[k]; ok { + return true + } + } + return false +} + +// throttleHardCeilingPerSecond bounds any user-supplied capacity or +// refill rate. A typo like SendCapacity=1e9 silently meaning "no limit" +// is more dangerous than an explicit InvalidAttributeValue (Codex P1 on +// PR #664: a wide-open queue masks itself as "throttled"). +const throttleHardCeilingPerSecond = 100_000.0 + +// throttleMinBatchCapacity is the smallest acceptable per-action +// capacity once the action covers a batch verb. SendMessageBatch and +// DeleteMessageBatch each charge up to 10 tokens (AWS caps batch size +// at 10), so a SendCapacity below 10 makes every full batch +// permanently unserviceable. +const throttleMinBatchCapacity = float64(sqsBatchMaxEntries) + +// throttleIdleEvictAfter is the idle window after which a quiet bucket +// is dropped from the in-memory store. A background goroutine +// (runSweepLoop) fires the eviction sweep on each +// throttleEvictSweepEvery tick; the hot path never calls sweep(). +// A queue that resumes activity rebuilds its bucket from the meta +// record at full capacity, matching the failover semantics +// documented in §3.1. +const throttleIdleEvictAfter = time.Hour + +// throttleEvictSweepEvery is the interval at which runSweepLoop fires +// the idle-evict sweep in its background goroutine. The hot-path +// charge() never calls into the sweep so a many-queue cluster pays +// the O(N) cost only on the goroutine's tick, never on a request. +const throttleEvictSweepEvery = time.Minute + +// bucketKey is the in-memory map key. +type bucketKey struct { + queue string + action string +} + +// tokenBucket is one bucket's mutable state. mu is per-bucket so +// concurrent traffic on different queues never serialises on the same +// lock; refill + take + release of a single bucket is the only +// critical section. evicted flips to true exactly once (under mu) when +// the bucket is removed from the store by sweep / invalidateQueue / +// loadOrInit reconciliation; charge re-checks it after acquiring mu so +// goroutines that loaded the now-orphaned bucket retry and converge on +// the live entry. Without that retry, sweep racing N concurrent chargers +// could let them drain up to one full capacity from the orphan while +// later requests get a fresh full-capacity bucket — a one-time burst +// of up to 2× capacity per evict cycle (Codex P2 on PR #679 round 5). +// Never held across the bucketStore's sync.Map. +type tokenBucket struct { + mu sync.Mutex + capacity float64 + refillRate float64 + tokens float64 + lastRefill time.Time + evicted bool +} + +// bucketStore holds every active bucket for an SQS server process. +// sync.Map matches the read-mostly access pattern: lookups are nearly +// always Load hits; LoadOrStore pays the write cost only on first use. +// +// The idle-evict sweep runs from runSweepLoop on a background ticker +// — there is no hot-path serialisation primitive because the only +// caller of sweep() is the sole goroutine the ticker drives. +type bucketStore struct { + buckets sync.Map // map[bucketKey]*tokenBucket + clock func() time.Time + evictedAfter time.Duration + sweepEvery time.Duration +} + +// newBucketStore constructs a store whose clock + idle-evict window +// can be overridden for tests. The sweep cadence is fixed at +// throttleEvictSweepEvery; tests that want a different cadence have +// no use case yet (the sweep itself is a low-cost no-op when the +// store is small). Production calls newBucketStoreDefault. +func newBucketStore(clock func() time.Time, evictedAfter time.Duration) *bucketStore { + if clock == nil { + clock = time.Now + } + return &bucketStore{ + clock: clock, + evictedAfter: evictedAfter, + sweepEvery: throttleEvictSweepEvery, + } +} + +// newBucketStoreDefault uses the production constants. Kept as a +// separate constructor so test wiring stays explicit about the +// time-window overrides. +func newBucketStoreDefault() *bucketStore { + return newBucketStore(time.Now, throttleIdleEvictAfter) +} + +// chargeOutcome is returned from charge so the caller can build the +// Throttling envelope (Retry-After computed from refillRate + +// requestedCount, see §3.4) without re-loading the bucket. +type chargeOutcome struct { + allowed bool + retryAfter time.Duration + tokensAfter float64 + bucketPresent bool +} + +// charge takes count tokens from the bucket identified by (queue, +// action) using cfg as the source-of-truth for capacity / refillRate. +// cfg may be nil — in which case throttling is disabled for the queue +// and charge returns allowed=true without touching the map. +// +// count must be ≥ 1; the caller has already validated batch size at +// the request layer (sqs_messages_batch.go bounds it to +// sqsBatchMaxEntries). +func (b *bucketStore) charge(cfg *sqsQueueThrottle, queue, action string, count int) chargeOutcome { + if b == nil || cfg == nil || cfg.IsEmpty() { + // Throttling disabled (default): every request allowed, no + // bucket allocated. The hot path stays a single nil-check. + return chargeOutcome{allowed: true, bucketPresent: false} + } + resolvedAction, capacity, refill := resolveActionConfig(cfg, action) + if capacity == 0 || refill == 0 { + // This action has no throttle configured (e.g. only Send is + // configured and the request is a Recv). Default* covers any + // remaining unconfigured action; if Default* is also zero the + // request is unthrottled. + return chargeOutcome{allowed: true, bucketPresent: false} + } + if count < 1 { + count = 1 + } + // Bucket key uses the *resolved* action so Send-falls-through-to- + // Default and Recv-falls-through-to-Default share the same Default + // bucket. Without the resolution, an operator who configures only + // Default would still get one bucket per requesting action — three + // independent quotas instead of one shared cap. + // + // Loop bound: each retry happens only when the bucket we loaded was + // evicted between the Load and the mu acquisition. Two iterations + // is the realistic ceiling (sweep / reconciliation can't repeatedly + // evict the same fresh bucket without time advancing past + // evictedAfter); the cap keeps a pathological invariant violation + // from spinning the goroutine. + for range 4 { + bucket := b.loadOrInit(queue, resolvedAction, capacity, refill) + outcome, retry := chargeBucket(bucket, b.clock(), count) + if !retry { + return outcome + } + } + // Should not happen — the for-loop drained without ever finding a + // live bucket. Treat as allowed=true (fail-open) so misconfiguration + // of the bucket store cannot produce a hard 429 storm. + return chargeOutcome{allowed: true, bucketPresent: false} +} + +// chargeBucket runs the under-mu refill+take for a single bucket and +// returns retry=true if the caller should drop the reference and reload +// from the store. Retry is the orphan-bucket signal: sweep / +// invalidateQueue / loadOrInit reconciliation set evicted=true under +// mu before dropping the bucket from the map, so a goroutine that +// loaded the bucket pre-eviction can detect it here. +func chargeBucket(bucket *tokenBucket, now time.Time, count int) (chargeOutcome, bool) { + bucket.mu.Lock() + defer bucket.mu.Unlock() + if bucket.evicted { + return chargeOutcome{}, true + } + // Refill before reading: tokens accrue at refillRate * elapsed, + // capped at the configured capacity. This is the single place that + // advances tokens forward in time so the "fresh bucket on failover" + // guarantee from §3.1 holds: a new leader's bucket starts at full + // capacity and refills only based on elapsed time on this process. + if elapsed := now.Sub(bucket.lastRefill).Seconds(); elapsed > 0 { + bucket.tokens += elapsed * bucket.refillRate + if bucket.tokens > bucket.capacity { + bucket.tokens = bucket.capacity + } + bucket.lastRefill = now + } + requested := float64(count) + if bucket.tokens >= requested { + bucket.tokens -= requested + return chargeOutcome{allowed: true, tokensAfter: bucket.tokens, bucketPresent: true}, false + } + // Reject the whole batch — partial throttling within a batch is + // hard to reason about and AWS rejects the whole call. + return chargeOutcome{ + allowed: false, + retryAfter: computeRetryAfter(requested, bucket.tokens, bucket.refillRate), + tokensAfter: bucket.tokens, + bucketPresent: true, + }, false +} + +// loadOrInit handles the first-use insert race. Two concurrent first +// requests for the same (queue, action) both arrive at LoadOrStore; +// one wins and the loser's freshly-built bucket is discarded. This is +// safe because both racers compute identical (capacity, refillRate) +// from the same meta snapshot — the bucket they would build is +// behaviourally interchangeable. +// +// Reconciliation against stale config (Codex P1 on PR #679): if a +// cached bucket's capacity/refillRate differ from the cfg's current +// values, the bucket is replaced with a fresh one built from the +// current config. Without this check, a node that lost leadership +// during a SetQueueAttributes commit and then regained leadership +// later would keep enforcing the prior leader-term's limits — the +// SetQueueAttributes invalidation only runs on the leader that +// processed the commit, so a different leader's stale buckets +// survive. The reconciliation also covers the case where the +// invalidation gate in setQueueAttributes is bypassed (e.g. by a +// future admin path that mutates throttle config without touching +// SetQueueAttributes). +func (b *bucketStore) loadOrInit(queue, action string, capacity, refill float64) *tokenBucket { + key := bucketKey{queue: queue, action: action} + if v, ok := b.buckets.Load(key); ok { + // type assertion is sound: only tokenBucket pointers are stored. + bucket, _ := v.(*tokenBucket) + // Cheap field comparison under the bucket's own lock — if the + // cached bucket matches the current config we return it + // directly. A mismatch means the on-disk meta moved while + // this node held a stale bucket; rebuild from the current + // config (full capacity, matching the failover semantics). + bucket.mu.Lock() + matches := bucket.capacity == capacity && bucket.refillRate == refill + bucket.mu.Unlock() + if matches { + return bucket + } + // CompareAndDelete is mandatory here: an unconditional Delete + // races against a concurrent goroutine that already detected + // the same mismatch and replaced the entry with its own fresh + // bucket — our Delete would evict its fresh entry, then our + // LoadOrStore would put another fresh bucket. The map ends up + // holding our bucket, but the racer's bucket might have + // already been handed out via LoadOrStore to a third + // goroutine that is now charging a bucket no longer in the + // map, while later requests get a different fresh bucket at + // full capacity. CompareAndDelete makes our Delete a no-op + // when the map already holds someone else's fresh bucket. + // (Claude P1 on PR #679 round 4 caught this.) + // + // Setting evicted=true after a successful CompareAndDelete + // signals any in-flight charger holding the stale bucket to + // retry against the live entry; without it, a goroutine that + // loaded the stale bucket and is now blocked on its mu would + // charge against the orphaned (wrong-capacity) bucket after + // we release. Done under mu so the charger sees evicted=true + // the moment it acquires the lock. + if b.buckets.CompareAndDelete(key, v) { + bucket.mu.Lock() + bucket.evicted = true + bucket.mu.Unlock() + } + // fall through to LoadOrStore — a concurrent racer might + // have already inserted a fresh bucket with the current + // config, in which case LoadOrStore picks it up and the new + // bucket below is discarded. + } + now := b.clock() + fresh := &tokenBucket{ + capacity: capacity, + refillRate: refill, + tokens: capacity, // start at full capacity, matches failover semantics. + lastRefill: now, + } + actual, _ := b.buckets.LoadOrStore(key, fresh) + bucket, _ := actual.(*tokenBucket) + return bucket +} + +// invalidateQueue drops every bucket belonging to the named queue. +// Called *after* the Raft commit on SetQueueAttributes / DeleteQueue / +// CreateQueue so the next request rebuilds from the freshly committed +// meta. Mirrors sweep's lock-then-delete-then-flag ordering: +// +// 1. Load the pointer from the map (without deleting). +// 2. Acquire bucket.mu so any concurrent charger either runs to +// completion before us (and we observe its updated lastRefill +// but still proceed — invalidation is unconditional) or blocks +// on mu until we set evicted=true. +// 3. CompareAndDelete with the loaded v to avoid evicting a +// replacement bucket inserted by a concurrent reconciliation. +// 4. Set evicted=true under mu so the next mu acquirer sees the +// orphan signal and retries against the live entry. +// +// LoadAndDelete-then-lock would let a concurrent charger that +// loaded the pointer pre-LoadAndDelete acquire mu first and charge +// while evicted is still false, then later requests would create a +// fresh full-capacity bucket — a 2x burst on every invalidation +// event (Codex P2 on PR #679 round 6). +func (b *bucketStore) invalidateQueue(queue string) { + if b == nil { + return + } + for _, action := range throttleAllActions { + key := bucketKey{queue: queue, action: action} + v, ok := b.buckets.Load(key) + if !ok { + continue + } + bucket, _ := v.(*tokenBucket) + bucket.mu.Lock() + if b.buckets.CompareAndDelete(key, v) { + bucket.evicted = true + } + bucket.mu.Unlock() + } +} + +// runSweepLoop runs the idle-evict sweep on a background ticker so +// the request hot path never pays the O(N) sync.Map.Range cost +// (Gemini high on PR #679: a many-queue cluster would see latency +// spikes on whichever request was unlucky enough to trigger the +// per-minute on-hot-path sweep). Returns when ctx is done — the +// SQSServer wires this to s.reaperCtx so a Stop() call cleans the +// goroutine up alongside the existing reaper. +func (b *bucketStore) runSweepLoop(ctx context.Context) { + if b == nil || b.evictedAfter <= 0 || b.sweepEvery <= 0 { + return + } + t := time.NewTicker(b.sweepEvery) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + b.sweep() + } + } +} + +// sweep walks the bucket store dropping any bucket idle longer than +// evictedAfter. Called from runSweepLoop on a background ticker — +// the ticker is the only caller, so sweep() does not need its own +// serialisation. Bucket lookups stay O(1) on the hot path; sweep +// iterates every entry under the per-bucket lock so it can re-check +// idle and the map entry atomically. +// +// Eviction race (Codex P2 on PR #679 round 5 and round 6): three +// guards work together to keep idle eviction from inflating the burst +// budget for a queue: +// 1. Hold bucket.mu across the Delete so the idle observation +// cannot be invalidated between check and delete. A concurrent +// charge() that loaded the bucket either runs to completion +// before sweep acquires mu (sweep then sees the updated +// lastRefill and skips delete) or blocks on mu until sweep +// releases — and on release sees evicted=true and retries. +// 2. CompareAndDelete with v ensures sweep does not evict a +// replacement bucket inserted by invalidateQueue or a future +// reconciliation path. +// 3. Set evicted=true under mu after a successful CompareAndDelete. +// Without this signal, goroutines that loaded the bucket +// pre-eviction and were blocked on mu would charge the orphan +// after release while later requests get a fresh full-capacity +// bucket — a one-time burst of up to 2× capacity per evict +// cycle on workloads where requests align with sweep ticks. +// +// Holding bucket.mu across sync.Map.Delete is safe — sync.Map.Load +// is wait-free on the read-only path and never blocks waiting for +// bucket.mu, so there is no AB-BA cycle with charge(). +func (b *bucketStore) sweep() { + cutoff := b.clock().Add(-b.evictedAfter) + b.buckets.Range(func(k, v any) bool { + bucket, _ := v.(*tokenBucket) + bucket.mu.Lock() + if bucket.lastRefill.Before(cutoff) { + if b.buckets.CompareAndDelete(k, v) { + bucket.evicted = true + } + } + bucket.mu.Unlock() + return true + }) +} + +// resolveActionConfig maps a charge() action to (effective bucket +// action, capacity, refillRate) from cfg. Send* and Recv* keep their +// own buckets when configured; otherwise the action falls through to +// the Default bucket and gets the canonical "*" key so all +// fall-through actions share one bucket. Returning (_, 0, 0) means +// "no throttle for this action" and the caller short-circuits. +func resolveActionConfig(cfg *sqsQueueThrottle, action string) (string, float64, float64) { + switch action { + case bucketActionSend: + if cfg.SendCapacity > 0 { + return bucketActionSend, cfg.SendCapacity, cfg.SendRefillPerSecond + } + case bucketActionReceive: + if cfg.RecvCapacity > 0 { + return bucketActionReceive, cfg.RecvCapacity, cfg.RecvRefillPerSecond + } + } + if cfg.DefaultCapacity > 0 { + return bucketActionAny, cfg.DefaultCapacity, cfg.DefaultRefillPerSecond + } + return action, 0, 0 +} + +// throttleRetryAfterCap bounds the Retry-After value the client sees +// (Gemini medium on PR #679). Without a cap, a tiny refillRate plus +// a large requested count would compute a multi-day wait — and +// time.Duration arithmetic can overflow at the upper end. One hour +// matches the bucket store's idle-evict window: by the time the +// suggested retry would otherwise expire, the bucket would have +// been evicted and rebuilt at full capacity anyway, so a longer +// suggestion is meaningless. Producers that hit the cap are also +// strongly mis-configured; capping is a guard rail, not a feature. +const throttleRetryAfterCap = time.Hour + +// computeRetryAfter implements the §3.4 formula: +// +// needed := requested - currentTokens +// secondsToNextRefill := ceil(needed / refillRate) +// retryAfter := max(1, int(secondsToNextRefill)) +// +// requested is the same count the charge step uses (1 for single-message +// verbs, len(Entries) for batch verbs). The min-1 floor matches the +// HTTP/1.1 §10.2.3 integer-second granularity. The validator keeps +// refillRate > 0 so no divide-by-zero guard is needed. +// +// Capped at throttleRetryAfterCap to bound time.Duration arithmetic +// against pathologically small refillRate / large requested values. +func computeRetryAfter(requested, current, refillRate float64) time.Duration { + needed := requested - current + if needed <= 0 { + // Pathological — caller invoked us with allowed=false but + // tokens >= requested. Treat as "wait one tick" rather than + // zero so the client backs off at least once. + return time.Second + } + secs := math.Ceil(needed / refillRate) + if secs < 1 { + secs = 1 + } + // Cap before multiplying to avoid time.Duration overflow on + // pathological inputs (e.g. refillRate just above zero). + const capSecs = float64(throttleRetryAfterCap / time.Second) + if secs > capSecs { + secs = capSecs + } + return time.Duration(secs) * time.Second +} + +// throttleChargeCount maps a request to the token count the bucket +// should be charged for. Single-message verbs charge 1; batch verbs +// charge len(Entries). The bucket store itself takes count as a +// parameter so this helper can stay close to the wire-protocol layer +// in the request path. +func throttleChargeCount(entries int) int { + if entries < 1 { + return 1 + } + return entries +} + +// chargeQueue is the per-handler entry point used by handlers that +// do not already load the queue meta themselves (deleteMessage, +// changeMessageVisibility, and their batch siblings). It loads the +// meta at a fresh read timestamp (Pebble cache makes this cheap) and +// runs the bucket store's charge against the queue's Throttle config. +// +// Handlers that DO load the meta themselves (sendMessage, +// sendMessageBatch, receiveMessage) should use chargeQueueWithThrottle +// to avoid the redundant load (Gemini high on PR #679). +// +// chargeQueue intentionally swallows missing-queue errors: the caller +// is going to discover that the queue does not exist a few lines +// later and respond with QueueDoesNotExist. Letting the throttle +// check race the catalog read avoids two lookups in the fast path. +// +// Designed to sit OUTSIDE the OCC transaction (§4.2): a rejected +// request never reaches the coordinator. The retry loop in +// sendMessageWithRetry et al. would otherwise busy-loop on a +// permanent rate-limit reject, burning leader CPU. +func (s *SQSServer) chargeQueue(w http.ResponseWriter, r *http.Request, queueName, action string, count int) bool { + if s.throttle == nil { + return true + } + throttle := s.queueThrottleConfig(r, queueName) + return s.chargeQueueWithThrottle(w, queueName, action, count, throttle) +} + +// chargeQueueWithThrottle is the variant for handlers that already +// have the throttle config in hand from their own meta load. Drops +// the per-request meta load chargeQueue does, addressing the Gemini +// high finding on PR #679 about redundant storage reads on the hot +// path. +func (s *SQSServer) chargeQueueWithThrottle(w http.ResponseWriter, queueName, action string, count int, throttle *sqsQueueThrottle) bool { + if s.throttle == nil { + return true + } + outcome := s.throttle.charge(throttle, queueName, action, count) + if outcome.allowed { + return true + } + writeSQSThrottlingError(w, queueName, action, outcome.retryAfter) + return false +} + +// queueThrottleConfig loads just the Throttle config off a queue's +// meta record. Returns nil on any error or missing-queue — the +// surrounding handler is responsible for surfacing those, and a nil +// throttle config short-circuits the charge to "allowed". +// +// Held as a method on *SQSServer so a test can swap the meta loader +// via the existing nextTxnReadTS / loadQueueMetaAt seam. +func (s *SQSServer) queueThrottleConfig(r *http.Request, queueName string) *sqsQueueThrottle { + if s.store == nil { + return nil + } + readTS := s.nextTxnReadTS(r.Context()) + meta, exists, err := s.loadQueueMetaAt(r.Context(), queueName, readTS) + if err != nil || !exists || meta == nil { + return nil + } + return meta.Throttle +} diff --git a/adapter/sqs_throttle_integration_test.go b/adapter/sqs_throttle_integration_test.go new file mode 100644 index 000000000..392abd54b --- /dev/null +++ b/adapter/sqs_throttle_integration_test.go @@ -0,0 +1,320 @@ +package adapter + +import ( + "net/http" + "strconv" + "testing" +) + +// TestSQSServer_Throttle_DefaultOff_AllowsUnboundedSends pins the +// default-off contract: a queue without any Throttle* attributes +// accepts arbitrary send volume. The hot path for unconfigured queues +// must never reject — anything else would change steady-state +// behaviour for every existing deployment that hasn't opted in. +func TestSQSServer_Throttle_DefaultOff_AllowsUnboundedSends(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-off") + for i := range 50 { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, + "MessageBody": "msg-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("default-off send #%d: status %d", i+1, status) + } + } +} + +// TestSQSServer_Throttle_SendBucketRejectsAfterCapacity is the §6 +// item 2 end-to-end test: configure SendCapacity=10 RefillPerSec=1, +// send 10 → 200, send 11th immediately → 400 Throttling with +// Retry-After. Pins the wire-level contract: the 400 carries the +// AWS-shaped error envelope and the Retry-After header. +func TestSQSServer_Throttle_SendBucketRejectsAfterCapacity(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-send") + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }) + for i := range 10 { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, + "MessageBody": "msg-" + strconv.Itoa(i), + }) + if status != http.StatusOK { + t.Fatalf("send #%d: status %d (expected 200)", i+1, status) + } + } + // 11th must reject with Throttling + Retry-After. + resp := postSQSRequest(t, "http://"+node.sqsAddress+"/", sqsSendMessageTarget, `{"QueueUrl":"`+url+`","MessageBody":"overflow"}`) + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("11th send: status %d (expected 400)", resp.StatusCode) + } + if got := resp.Header.Get("x-amzn-ErrorType"); got != sqsErrThrottling { + t.Fatalf("11th send: x-amzn-ErrorType=%q (expected Throttling)", got) + } + if ra := resp.Header.Get("Retry-After"); ra != "1" { + t.Fatalf("11th send: Retry-After=%q (expected 1)", ra) + } +} + +// TestSQSServer_Throttle_RecvBucketRejectsAfterCapacity is the +// receive-side mirror of the send test. ReceiveMessage charges 1 +// from Recv regardless of MaxNumberOfMessages, so 11 calls drain a +// capacity-10 bucket. Empty queue is fine — the throttle check sits +// before any catalog read. +func TestSQSServer_Throttle_RecvBucketRejectsAfterCapacity(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-recv") + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleRecvCapacity": "10", + "ThrottleRecvRefillPerSecond": "1", + }) + for i := range 10 { + status, _ := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{"QueueUrl": url}) + if status != http.StatusOK { + t.Fatalf("recv #%d: status %d (expected 200)", i+1, status) + } + } + resp := postSQSRequest(t, "http://"+node.sqsAddress+"/", sqsReceiveMessageTarget, `{"QueueUrl":"`+url+`"}`) + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("11th recv: status %d (expected 400)", resp.StatusCode) + } + if got := resp.Header.Get("x-amzn-ErrorType"); got != sqsErrThrottling { + t.Fatalf("11th recv: x-amzn-ErrorType=%q", got) + } +} + +// TestSQSServer_Throttle_BatchChargesByEntryCount pins the §3.3 +// charging table for SendMessageBatch: 10 entries → 10 tokens. With +// SendCapacity=10 a single 10-entry batch drains the bucket and the +// next single SendMessage rejects. +func TestSQSServer_Throttle_BatchChargesByEntryCount(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-batch") + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }) + entries := make([]map[string]any, 10) + for i := range entries { + entries[i] = map[string]any{ + "Id": "id" + strconv.Itoa(i), + "MessageBody": "body-" + strconv.Itoa(i), + } + } + status, _ := callSQS(t, node, sqsSendMessageBatchTarget, map[string]any{ + "QueueUrl": url, + "Entries": entries, + }) + if status != http.StatusOK { + t.Fatalf("10-entry batch: status %d (expected 200)", status) + } + // Bucket now empty. Single send must reject. + resp := postSQSRequest(t, "http://"+node.sqsAddress+"/", sqsSendMessageTarget, `{"QueueUrl":"`+url+`","MessageBody":"overflow"}`) + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("post-batch single send: status %d (expected 400)", resp.StatusCode) + } +} + +// TestSQSServer_Throttle_SetQueueAttributesInvalidatesBucket pins the +// §3.1 cache-invalidation contract for SetQueueAttributes: a raise of +// the limit must take effect on the very next request, not after the +// 1h idle-evict sweep. Without invalidation the test's final send +// would still reject under the old (now-empty) bucket. +func TestSQSServer_Throttle_SetQueueAttributesInvalidatesBucket(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-invalidate") + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }) + // Drain. + for range 10 { + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, "MessageBody": "drain", + }) + if status != http.StatusOK { + t.Fatalf("drain send: status %d", status) + } + } + // Sanity-check exhaustion. + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, "MessageBody": "should-throttle", + }) + if status != http.StatusBadRequest { + t.Fatalf("expected throttle, got %d", status) + } + // Raise capacity and refill. + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "20", + "ThrottleSendRefillPerSecond": "20", + }) + // Immediate send must succeed — a fresh bucket starts at capacity. + status, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, "MessageBody": "post-set", + }) + if status != http.StatusOK { + t.Fatalf("post-SetQueueAttributes send: status %d (expected 200; bucket invalidation broken)", status) + } +} + +// TestSQSServer_Throttle_DeleteQueueInvalidatesBucket pins the §3.1 +// cache-invalidation contract for DeleteQueue: a same-name recreate +// gets a fresh bucket, not the stale balance from the previous +// incarnation. Without invalidation the post-recreate send would +// inherit the drained bucket and reject. +func TestSQSServer_Throttle_DeleteQueueInvalidatesBucket(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + queue := "throttle-recreate" + url := mustCreateQueue(t, node, queue) + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }) + for range 10 { + _, _ = callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url, "MessageBody": "drain", + }) + } + // Delete and recreate. + if status, _ := callSQS(t, node, sqsDeleteQueueTarget, map[string]any{"QueueUrl": url}); status != http.StatusOK { + t.Fatalf("delete: %d", status) + } + url2 := mustCreateQueue(t, node, queue) + mustSetQueueAttributes(t, node, url2, map[string]string{ + "ThrottleSendCapacity": "10", + "ThrottleSendRefillPerSecond": "1", + }) + // First send on the recreated queue must succeed (full-capacity bucket). + status, _ := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": url2, "MessageBody": "fresh", + }) + if status != http.StatusOK { + t.Fatalf("post-recreate send: status %d (bucket invalidation on DeleteQueue broken)", status) + } +} + +// TestSQSServer_Throttle_GetQueueAttributesRoundTrip pins the §6 item +// 4 contract: SetQueueAttributes(Throttle*) followed by +// GetQueueAttributes("All") returns the same values. SDKs use the +// round-trip to confirm the config landed; a missing field on +// GetQueueAttributes would make operators think the SetQueueAttributes +// call silently failed. +func TestSQSServer_Throttle_GetQueueAttributesRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-roundtrip") + mustSetQueueAttributes(t, node, url, map[string]string{ + "ThrottleSendCapacity": "100", + "ThrottleSendRefillPerSecond": "50", + "ThrottleRecvCapacity": "20", + "ThrottleRecvRefillPerSecond": "5", + }) + status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "AttributeNames": []string{"All"}, + }) + if status != http.StatusOK { + t.Fatalf("GetQueueAttributes: status %d", status) + } + attrs, _ := out["Attributes"].(map[string]any) + if attrs == nil { + t.Fatalf("missing Attributes in response: %v", out) + } + expect := map[string]string{ + "ThrottleSendCapacity": "100", + "ThrottleSendRefillPerSecond": "50", + "ThrottleRecvCapacity": "20", + "ThrottleRecvRefillPerSecond": "5", + } + for k, want := range expect { + got, _ := attrs[k].(string) + if got != want { + t.Fatalf("attr %s: got %q want %q (full attrs: %v)", k, got, want, attrs) + } + } +} + +// TestSQSServer_Throttle_RejectsCapacityBelowBatchMin pins the §3.2 +// validator's batch-floor rule: SendCapacity < 10 makes every full +// batch permanently unserviceable. The SetQueueAttributes call is +// rejected with InvalidAttributeValue rather than landing. +func TestSQSServer_Throttle_RejectsCapacityBelowBatchMin(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateQueue(t, node, "throttle-bad-cap") + status, out := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "ThrottleSendCapacity": "5", + "ThrottleSendRefillPerSecond": "1", + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("expected 400 for SendCapacity=5 (< batch min 10), got %d body %v", status, out) + } + if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { + t.Fatalf("expected __type=InvalidAttributeValue, got %q", got) + } +} + +// --- helpers --- + +func mustCreateQueue(t *testing.T, node Node, name string) string { + t.Helper() + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{"QueueName": name}) + if status != http.StatusOK { + t.Fatalf("createQueue %q: status %d body %v", name, status, out) + } + url, _ := out["QueueUrl"].(string) + if url == "" { + t.Fatalf("createQueue %q: missing QueueUrl", name) + } + return url +} + +func mustSetQueueAttributes(t *testing.T, node Node, url string, attrs map[string]string) { + t.Helper() + status, out := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": attrs, + }) + if status != http.StatusOK { + t.Fatalf("setQueueAttributes(%v): status %d body %v", attrs, status, out) + } +} diff --git a/adapter/sqs_throttle_test.go b/adapter/sqs_throttle_test.go new file mode 100644 index 000000000..d1c3909ee --- /dev/null +++ b/adapter/sqs_throttle_test.go @@ -0,0 +1,722 @@ +package adapter + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestBucketStore_DefaultOff_ShortCircuit pins the contract that a +// nil throttle config never allocates a bucket and never rejects. +// This is the hot path for unconfigured queues — every nil-check that +// short-circuits keeps the per-request cost at one map-load on the +// SQSServer struct and one nil-comparison in charge(). +func TestBucketStore_DefaultOff_ShortCircuit(t *testing.T) { + t.Parallel() + store := newBucketStoreDefault() + for range 100 { + out := store.charge(nil, "orders", bucketActionSend, 1) + require.True(t, out.allowed) + require.False(t, out.bucketPresent, "nil cfg must not allocate a bucket") + } +} + +// TestBucketStore_Empty_ShortCircuit covers the post-validator +// canonicalisation path: an all-zero sqsQueueThrottle is equivalent +// to nil. Without this branch, a queue whose operator wrote +// "ThrottleSendCapacity=0" would still pay the bucket allocation. +func TestBucketStore_Empty_ShortCircuit(t *testing.T) { + t.Parallel() + store := newBucketStoreDefault() + out := store.charge(&sqsQueueThrottle{}, "orders", bucketActionSend, 1) + require.True(t, out.allowed) + require.False(t, out.bucketPresent) +} + +// TestBucketStore_FreshAllowsUpToCapacity checks the fresh-bucket +// initial-state contract: a brand-new bucket starts at full capacity +// and accepts exactly that many tokens before rejecting the next one. +// This matches both the AWS rate-limiter behaviour and the §3.1 +// failover semantic ("fresh bucket on failover starts at capacity"). +func TestBucketStore_FreshAllowsUpToCapacity(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + for i := range 10 { + out := store.charge(cfg, "orders", bucketActionSend, 1) + require.True(t, out.allowed, "send %d must be allowed", i+1) + } + out := store.charge(cfg, "orders", bucketActionSend, 1) + require.False(t, out.allowed, "11th send must be rejected") + require.Equal(t, time.Second, out.retryAfter, "Retry-After floor is 1s") +} + +// TestBucketStore_RefillBetweenChargesUsesElapsed pins the refill +// math: tokens accrue at refillRate per elapsed second, capped at +// capacity. Time is injected so the test does not race the wall +// clock. +func TestBucketStore_RefillBetweenChargesUsesElapsed(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 5} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + // Drain. + for range 10 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + } + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + // Advance 1.5s → 7.5 tokens accrued (capped under capacity 10). + now = now.Add(1500 * time.Millisecond) + for range 7 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed, + "after 1.5s refill at 5 RPS, 7 sends must succeed") + } + // 8th must reject — only 7.5 tokens accrued, charged 7, leaves 0.5. + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) +} + +// TestBucketStore_RefillCapsAtCapacity pins the upper bound on +// long-idle refill: a queue idle for an hour does NOT come back with +// 3600 tokens — the bucket caps at the configured capacity. +func TestBucketStore_RefillCapsAtCapacity(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, 2*time.Hour) + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + now = now.Add(time.Hour) // 3600 seconds, would be 3600 tokens uncapped + for range 10 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + } + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed, + "refill capped at capacity: 11th send post-idle must reject") +} + +// TestBucketStore_BatchRejectsWholeBatchWhenShort pins the §3.3 +// "batch verbs charge before dispatching individual entries" rule. +// A bucket with 3 tokens facing a 10-entry batch rejects the whole +// call and consumes nothing — partial-credit behaviour would make the +// "I have 3, you wanted 10" semantics ambiguous and AWS itself +// rejects the whole call. +func TestBucketStore_BatchRejectsWholeBatchWhenShort(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + // Drain to 3. + for range 7 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + } + // Try a 10-entry batch — should reject without consuming the 3. + out := store.charge(cfg, "orders", bucketActionSend, 10) + require.False(t, out.allowed) + require.Equal(t, 7*time.Second, out.retryAfter, + "Retry-After computed from (10-3)/1 = 7s") + // The 3 leftover tokens are still spendable. + for range 3 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed, + "the rejected batch must not have drained the leftover credit") + } +} + +// TestBucketStore_RetryAfterUsesRequestedCount pins the §3.4 fix +// where the formula's numerator is the requested count, not 1. A +// SendMessageBatch of 10 against refillRate=1 with 0 tokens needs 10s +// to refill, not 1s — telling the client to wait 1s creates a busy- +// loop of premature retries that all reject again. +func TestBucketStore_RetryAfterUsesRequestedCount(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + for range 10 { + store.charge(cfg, "orders", bucketActionSend, 1) + } + // Now batch of 10 against an empty bucket: needs 10s to refill. + out := store.charge(cfg, "orders", bucketActionSend, 10) + require.False(t, out.allowed) + require.Equal(t, 10*time.Second, out.retryAfter) +} + +// TestBucketStore_RetryAfterFloorWithSlowRefill pins the §3.4 rule +// for sub-1-RPS rates: SendRefillPerSecond=0.1 with 0 tokens needs +// 10s for the next single token, not 1s. This was the second of two +// Claude reviews caught on PR #664. +func TestBucketStore_RetryAfterFloorWithSlowRefill(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 0.1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + for range 10 { + store.charge(cfg, "orders", bucketActionSend, 1) + } + out := store.charge(cfg, "orders", bucketActionSend, 1) + require.False(t, out.allowed) + require.Equal(t, 10*time.Second, out.retryAfter) +} + +// TestBucketStore_ActionsHaveSeparateBuckets pins the (queue, action) +// granularity: a Send-bucket exhaustion does not leak into the Recv +// bucket's accounting and vice versa. +func TestBucketStore_ActionsHaveSeparateBuckets(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{ + SendCapacity: 10, SendRefillPerSecond: 1, + RecvCapacity: 10, RecvRefillPerSecond: 1, + } + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + // Drain Send. + for range 10 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + } + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + // Recv must still have full capacity. + for range 10 { + require.True(t, store.charge(cfg, "orders", bucketActionReceive, 1).allowed) + } +} + +// TestBucketStore_QueuesHaveSeparateBuckets pins per-queue isolation: +// a noisy queue does not consume another queue's budget. +func TestBucketStore_QueuesHaveSeparateBuckets(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + for range 10 { + store.charge(cfg, "orders", bucketActionSend, 1) + } + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + // Other queue, same cfg → fresh bucket. + for range 10 { + require.True(t, store.charge(cfg, "events", bucketActionSend, 1).allowed) + } +} + +// TestBucketStore_DefaultBucketCovers covers the §3.2 "Default*" +// fallback: a verb that doesn't match Send or Recv falls through to +// Default, allowing operators to set one cap that covers everything. +func TestBucketStore_DefaultBucketCovers(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{ + DefaultCapacity: 5, DefaultRefillPerSecond: 1, + } + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + for range 5 { + require.True(t, store.charge(cfg, "orders", bucketActionAny, 1).allowed) + } + require.False(t, store.charge(cfg, "orders", bucketActionAny, 1).allowed) + // And Send falls through to Default too when only Default is set. + for range 5 { + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed, + "Send falls through to Default which is empty") + } +} + +// TestBucketStore_ReconcilesBucketOnConfigChange pins the Codex P1 +// fix on PR #679: a cached bucket whose capacity/refillRate no +// longer match the queue's current Throttle config gets rebuilt on +// the next charge() call. Without this, a node that loses leadership +// during a SetQueueAttributes commit and regains it later would keep +// enforcing the prior leader-term's limits — the SetQueueAttributes +// invalidation only runs on the leader that processed the commit, +// so a different leader's stale buckets survive. +func TestBucketStore_ReconcilesBucketOnConfigChange(t *testing.T) { + t.Parallel() + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + cfgOld := &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1} + // Drain the old bucket entirely. + for range 10 { + require.True(t, store.charge(cfgOld, "orders", bucketActionSend, 1).allowed) + } + require.False(t, store.charge(cfgOld, "orders", bucketActionSend, 1).allowed, + "sanity: old config bucket exhausted") + // Now charge with a NEW config — capacity 100, refill 50. The + // bucket reconciliation must spot the cap/refill mismatch and + // rebuild a fresh bucket at the new full capacity. + cfgNew := &sqsQueueThrottle{SendCapacity: 100, SendRefillPerSecond: 50} + for range 100 { + require.True(t, store.charge(cfgNew, "orders", bucketActionSend, 1).allowed, + "new config charge must succeed against a fresh bucket; stale-bucket bug would reject") + } + // 101st must reject under the new cap. + require.False(t, store.charge(cfgNew, "orders", bucketActionSend, 1).allowed) +} + +// TestBucketStore_ConcurrentReconciliationRespectsNewCapacity pins +// the CompareAndDelete fix on PR #679 round 4: two concurrent +// goroutines hitting a stale bucket must not race each other into +// double-replacing the map entry. Without CompareAndDelete the +// second goroutine's unconditional Delete would evict the first +// goroutine's fresh bucket, leaving the second's fresh bucket +// behind — but the first's bucket is already being charged, so +// total charges across the mismatch window can exceed the new +// capacity. +// +// Race the test by having N goroutines each invoke charge() with +// the new config (post-mismatch) on the same (queue, action). The +// first one through builds the fresh bucket; every later one must +// observe the same fresh bucket and share its capacity. After all +// goroutines finish, total successful charges must equal exactly +// the new capacity — anything more means a Delete-after-replace +// orphaned a fresh bucket. +func TestBucketStore_ConcurrentReconciliationRespectsNewCapacity(t *testing.T) { + t.Parallel() + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + + // Seed the store with a stale bucket from cfgOld. + cfgOld := &sqsQueueThrottle{SendCapacity: 5, SendRefillPerSecond: 1} + for range 5 { + require.True(t, store.charge(cfgOld, "orders", bucketActionSend, 1).allowed) + } + + // Now race many goroutines through the new config. Each charge + // triggers reconciliation against cfgNew. The race window is + // between Load detecting the stale bucket and CompareAndDelete + + // LoadOrStore committing the replacement; without + // CompareAndDelete, two racers can each Delete + LoadOrStore and + // the loser's fresh bucket may end up orphaned while still being + // charged through a leaked pointer. + cfgNew := &sqsQueueThrottle{SendCapacity: 50, SendRefillPerSecond: 1} + const goroutines = 200 + var ( + wg sync.WaitGroup + successes int64 + mu sync.Mutex + ) + for range goroutines { + wg.Add(1) + go func() { + defer wg.Done() + if store.charge(cfgNew, "orders", bucketActionSend, 1).allowed { + mu.Lock() + successes++ + mu.Unlock() + } + }() + } + wg.Wait() + require.EqualValues(t, 50, successes, + "exactly cfgNew.SendCapacity successes; a Delete-after-replace race would let some past the cap") +} + +// TestBucketStore_SweepRaceDoesNotInflateBudget pins the Codex P2 +// fix on PR #679 rounds 5 and 6. The earlier code path was: +// +// sweep computes idle under mu, releases mu, then Deletes. +// A concurrent charge() that loaded the same bucket pre-Delete +// would refill+take after sweep released mu, then later requests +// would miss the map and create a fresh full-capacity bucket — +// a one-time burst of up to 2× capacity per evict cycle. +// +// Round 5 closed half the window by holding mu across the Delete. +// Round 6 closes the rest by setting evicted=true under mu so the +// goroutines that loaded the bucket *before* sweep removed it from +// the map see the flag on their mu acquisition and retry against +// the live entry instead of charging the orphan. +// +// The test is a -race stress test: race sweep against many chargers +// hammering the same bucket. The integrity assertion is on the total +// successful-charge count: with a fully-refilled idle bucket of +// capacity=N entering the race, the maximum tokens any sequence of +// charges should observe is N. The old buggy code could yield up to +// 2N when the race triggered the orphan path. +func TestBucketStore_SweepRaceDoesNotInflateBudget(t *testing.T) { + t.Parallel() + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + clk := now + store := newBucketStore(func() time.Time { return clk }, time.Hour) + const capacity = 10 + cfg := &sqsQueueThrottle{SendCapacity: capacity, SendRefillPerSecond: 1} + // Build the bucket via a single charge so it lands in the store, + // then backdate it past the evict cutoff. The clock is then frozen + // so refill cannot top up tokens during the race — every charge + // either spends an existing token or fails, making the total- + // success count a tight bound on the burst budget. + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + key := bucketKey{queue: "orders", action: bucketActionSend} + v, ok := store.buckets.Load(key) + require.True(t, ok) + bucket, _ := v.(*tokenBucket) + bucket.mu.Lock() + bucket.lastRefill = now.Add(-2 * time.Hour) + bucket.tokens = capacity + bucket.mu.Unlock() + clk = now.Add(2 * time.Hour) + + var wg sync.WaitGroup + var successes atomic.Int64 + const chargers = 64 + const sweeps = 4 + wg.Add(chargers + sweeps) + for range sweeps { + go func() { + defer wg.Done() + store.sweep() + }() + } + for range chargers { + go func() { + defer wg.Done() + if store.charge(cfg, "orders", bucketActionSend, 1).allowed { + successes.Add(1) + } + }() + } + wg.Wait() + + // Old code: a charger that loaded the pre-Delete bucket could take + // a token, then later chargers would create a fresh full-capacity + // bucket — total successes could climb to 2*capacity. With the + // evicted-flag retry, every charger converges on a single live + // bucket and total successes are bounded by capacity. + require.LessOrEqualf(t, successes.Load(), int64(capacity), + "sweep race must not let total successful charges exceed capacity (got %d, capacity %d)", + successes.Load(), capacity) +} + +// TestBucketStore_OrphanedBucketRetriesToLiveEntry exercises the +// evicted-flag retry path in chargeBucket directly. The race the +// stress test above tries to trigger probabilistically is forced +// here deterministically by interleaving the charge / sweep steps +// by hand. +func TestBucketStore_OrphanedBucketRetriesToLiveEntry(t *testing.T) { + t.Parallel() + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + clk := now + store := newBucketStore(func() time.Time { return clk }, time.Hour) + cfg := &sqsQueueThrottle{SendCapacity: 5, SendRefillPerSecond: 1} + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + key := bucketKey{queue: "orders", action: bucketActionSend} + v, ok := store.buckets.Load(key) + require.True(t, ok) + original, _ := v.(*tokenBucket) + // Backdate the bucket and advance the clock so sweep evicts it. + original.mu.Lock() + original.lastRefill = now.Add(-2 * time.Hour) + original.mu.Unlock() + clk = now.Add(2 * time.Hour) + + store.sweep() + + // Sweep must have evicted the bucket from the map and marked it. + _, stillThere := store.buckets.Load(key) + require.False(t, stillThere, "sweep must remove the idle bucket from the map") + original.mu.Lock() + require.True(t, original.evicted, "sweep must mark the dropped bucket evicted") + original.mu.Unlock() + + // A charge against the live store reaches a fresh bucket via the + // loadOrInit path; any goroutine still holding the orphan would + // retry through chargeBucket's evicted check and converge here. + out := store.charge(cfg, "orders", bucketActionSend, 1) + require.True(t, out.allowed) + v2, ok := store.buckets.Load(key) + require.True(t, ok) + live, _ := v2.(*tokenBucket) + require.NotSame(t, original, live, "post-eviction charge must allocate a fresh bucket") +} + +// TestBucketStore_InvalidateMarksOrphanEvicted pins the round-6 fix +// to invalidateQueue: dropped buckets must flip evicted=true under mu +// so a sendMessage that loaded meta pre-invalidation and is racing +// against DeleteQueue / SetQueueAttributes / CreateQueue retries +// rather than charging the orphan. +func TestBucketStore_InvalidateMarksOrphanEvicted(t *testing.T) { + t.Parallel() + store := newBucketStoreDefault() + cfg := &sqsQueueThrottle{SendCapacity: 5, SendRefillPerSecond: 1} + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + key := bucketKey{queue: "orders", action: bucketActionSend} + v, ok := store.buckets.Load(key) + require.True(t, ok) + original, _ := v.(*tokenBucket) + + store.invalidateQueue("orders") + + _, stillThere := store.buckets.Load(key) + require.False(t, stillThere, "invalidateQueue must remove the bucket from the map") + original.mu.Lock() + require.True(t, original.evicted, "invalidateQueue must mark the dropped bucket evicted") + original.mu.Unlock() +} + +// TestBucketStore_InvalidateUnderConcurrencyIsRaceFree pins the Codex +// P2 fix on PR #679 round 6.1. The earlier invalidateQueue used +// LoadAndDelete-then-lock, which let a concurrent charger that loaded +// the pointer pre-LoadAndDelete acquire bucket.mu first and observe +// evicted=false on a bucket that had already been removed from the +// map. The fix mirrors sweep's lock-then-CompareAndDelete-then-flag +// ordering, so any charger blocked on mu sees evicted=true the moment +// it unblocks and retries against the live entry. +// +// Bounding the *successful charge count* across an invalidate race is +// not meaningful: invalidate is supposed to reset the bucket, so the +// post-invalidate fresh bucket can absorb up to capacity additional +// tokens by design — that 2× window is structural, not a bug. What +// the fix guarantees instead is that the race is data-race-clean +// (-race detector finds nothing) and that any bucket the store +// removed is observably evicted=true under mu when the next charger +// acquires it. The deterministic +// TestBucketStore_InvalidateMarksOrphanEvicted pins that property +// directly; this stress test exists to surface any new -race finding +// the new lock ordering might introduce. +func TestBucketStore_InvalidateUnderConcurrencyIsRaceFree(t *testing.T) { + t.Parallel() + const capacity = 10 + cfg := &sqsQueueThrottle{SendCapacity: capacity, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + + var wg sync.WaitGroup + const chargers = 64 + const invalidates = 4 + wg.Add(chargers + invalidates) + for range invalidates { + go func() { + defer wg.Done() + store.invalidateQueue("orders") + }() + } + for range chargers { + go func() { + defer wg.Done() + store.charge(cfg, "orders", bucketActionSend, 1) + }() + } + wg.Wait() +} + +// TestComputeRetryAfter_CapsAtMaximum pins the Gemini medium fix on +// PR #679: a tiny refillRate (e.g. 1e-9) plus a large requested +// count would otherwise compute a multi-day Retry-After and +// time.Duration arithmetic could overflow. Capped at +// throttleRetryAfterCap so the client always sees a sane value. +func TestComputeRetryAfter_CapsAtMaximum(t *testing.T) { + t.Parallel() + got := computeRetryAfter(1, 0, 1e-9) + require.Equal(t, throttleRetryAfterCap, got, + "computeRetryAfter must cap at throttleRetryAfterCap regardless of input") +} + +// TestThrottleAttributesPresent covers the request-gate helper used +// by setQueueAttributes to skip cache invalidation on unrelated +// updates (Codex P1 on PR #679). +func TestThrottleAttributesPresent(t *testing.T) { + t.Parallel() + require.False(t, throttleAttributesPresent(map[string]string{})) + require.False(t, throttleAttributesPresent(map[string]string{"VisibilityTimeout": "30"})) + require.True(t, throttleAttributesPresent(map[string]string{"ThrottleSendCapacity": "10"})) + require.True(t, throttleAttributesPresent(map[string]string{"ThrottleRecvRefillPerSecond": "5"})) + require.True(t, throttleAttributesPresent(map[string]string{"ThrottleDefaultCapacity": "5"})) +} + +// TestBucketStore_InvalidateQueueDropsAllActions pins the §3.1 cache +// invalidation contract for SetQueueAttributes / DeleteQueue: every +// bucket belonging to the queue is dropped, even ones not currently +// being charged. A future verb that grows a new bucket can't sneak +// past invalidation by being wired into one site only. +func TestBucketStore_InvalidateQueueDropsAllActions(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{ + SendCapacity: 10, SendRefillPerSecond: 1, + RecvCapacity: 10, RecvRefillPerSecond: 1, + } + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + // Drain both buckets. + for range 10 { + store.charge(cfg, "orders", bucketActionSend, 1) + store.charge(cfg, "orders", bucketActionReceive, 1) + } + require.False(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + require.False(t, store.charge(cfg, "orders", bucketActionReceive, 1).allowed) + // Invalidate. + store.invalidateQueue("orders") + // Both buckets must now be at full capacity again. + for range 10 { + require.True(t, store.charge(cfg, "orders", bucketActionSend, 1).allowed) + require.True(t, store.charge(cfg, "orders", bucketActionReceive, 1).allowed) + } +} + +// TestBucketStore_ConcurrentChargesPreserveCount pins the concurrency +// contract under -race: 100 goroutines race for tokens against a +// capacity-50 bucket. Exactly 50 must succeed; the other 50 must be +// rejected. Anything else (101 successes, partial-credit consumption +// during reject) means the per-bucket mutex is broken. +func TestBucketStore_ConcurrentChargesPreserveCount(t *testing.T) { + t.Parallel() + cfg := &sqsQueueThrottle{SendCapacity: 50, SendRefillPerSecond: 1} + now := time.Date(2026, 4, 27, 10, 0, 0, 0, time.UTC) + store := newBucketStore(func() time.Time { return now }, time.Hour) + var ( + wg sync.WaitGroup + successes int64 + mu sync.Mutex + ) + for range 100 { + wg.Add(1) + go func() { + defer wg.Done() + if store.charge(cfg, "orders", bucketActionSend, 1).allowed { + mu.Lock() + successes++ + mu.Unlock() + } + }() + } + wg.Wait() + require.EqualValues(t, 50, successes, + "exactly capacity successes; broken mutex would let some race past or double-charge") +} + +// --- Validator tests --- + +// TestValidateThrottleConfig_NilOrEmpty is the no-op: a meta with no +// Throttle, or with the zero-valued struct, validates clean and gets +// canonicalised so downstream code only has to handle the nil case. +func TestValidateThrottleConfig_NilOrEmpty(t *testing.T) { + t.Parallel() + m := &sqsQueueMeta{} + require.NoError(t, validateThrottleConfig(m)) + require.Nil(t, m.Throttle) + m.Throttle = &sqsQueueThrottle{} + require.NoError(t, validateThrottleConfig(m)) + require.Nil(t, m.Throttle, "all-zero post-validate must canonicalise to nil") +} + +// TestValidateThrottleConfig_BothZeroOrBothPositive pins the §3.2 +// pair-wise rule: an action's capacity and refill must agree on +// whether the action is enabled. +func TestValidateThrottleConfig_BothZeroOrBothPositive(t *testing.T) { + t.Parallel() + cases := []struct { + name string + cfg sqsQueueThrottle + wantErr bool + }{ + {"send capacity without refill", sqsQueueThrottle{SendCapacity: 10}, true}, + {"send refill without capacity", sqsQueueThrottle{SendRefillPerSecond: 1}, true}, + {"recv capacity without refill", sqsQueueThrottle{RecvCapacity: 10}, true}, + {"recv refill without capacity", sqsQueueThrottle{RecvRefillPerSecond: 1}, true}, + {"both positive ok", sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1}, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + cfg := tc.cfg + err := validateThrottleConfig(&sqsQueueMeta{Throttle: &cfg}) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestValidateThrottleConfig_CapacityGEMaxBatchCharge pins the §3.2 +// floor for batch-covered actions: SendMessageBatch and +// DeleteMessageBatch each charge up to 10 tokens, so a capacity below +// 10 makes every full batch permanently unserviceable. +func TestValidateThrottleConfig_CapacityGEMaxBatchCharge(t *testing.T) { + t.Parallel() + err := validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{SendCapacity: 5, SendRefillPerSecond: 1}, + }) + require.Error(t, err) + err = validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{RecvCapacity: 9, RecvRefillPerSecond: 1}, + }) + require.Error(t, err) + err = validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 1}, + }) + require.NoError(t, err) +} + +// TestValidateThrottleConfig_DefaultBucketBatchFloor pins the +// Codex P1 fix on PR #679 round 5: Default* gets the same batch- +// capacity ≥ 10 floor as Send/Recv because resolveActionConfig +// falls Send/Recv traffic through to Default when the dedicated +// pair is unset. Without the floor a Default-only config of +// {capacity=5, refill=1} would accept SendMessageBatch entries=10 +// requests at the validator and reject them forever at the bucket. +func TestValidateThrottleConfig_DefaultBucketBatchFloor(t *testing.T) { + t.Parallel() + // Capacity 1 (below the batch floor) must reject. + err := validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{DefaultCapacity: 1, DefaultRefillPerSecond: 1}, + }) + require.Error(t, err) + // Capacity below batch floor at 5 must also reject. + err = validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{DefaultCapacity: 5, DefaultRefillPerSecond: 1}, + }) + require.Error(t, err) + // Capacity exactly at the batch floor is accepted. + err = validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{DefaultCapacity: 10, DefaultRefillPerSecond: 1}, + }) + require.NoError(t, err) +} + +// TestValidateThrottleConfig_CapacityGERefill pins the §3.2 burst +// rule: capacity below refill makes the bucket unable to accumulate +// any burst headroom — the capacity floor is the refill rate. +func TestValidateThrottleConfig_CapacityGERefill(t *testing.T) { + t.Parallel() + err := validateThrottleConfig(&sqsQueueMeta{ + Throttle: &sqsQueueThrottle{SendCapacity: 10, SendRefillPerSecond: 50}, + }) + require.Error(t, err) +} + +// TestParseThrottleFloat_RejectsBadInputs covers the per-field +// parser: NaN, infinity, negative values, malformed strings, and the +// hard ceiling all reject with InvalidAttributeValue. +func TestParseThrottleFloat_RejectsBadInputs(t *testing.T) { + t.Parallel() + bad := []string{ + "", + "not a number", + "NaN", + "Inf", + "-1", + "-0.5", + "1e100", // > hard ceiling + "100000.01", // > hard ceiling by epsilon + } + for _, in := range bad { + t.Run(in, func(t *testing.T) { + t.Parallel() + _, err := parseThrottleFloat(in) + require.Error(t, err, "input %q must be rejected", in) + }) + } + // Boundary: hard ceiling exactly is accepted. + v, err := parseThrottleFloat("100000") + require.NoError(t, err) + require.Equal(t, 100000.0, v) +} + +// TestComputeRetryAfter_FloorsAtOneSecond pins the §3.4 minimum-1 +// floor: HTTP/1.1 §10.2.3 specifies integer-second granularity, so +// even a sub-second wait is rounded up to 1. +func TestComputeRetryAfter_FloorsAtOneSecond(t *testing.T) { + t.Parallel() + // needed=0.5, refill=10 → ceil(0.05) = 1 + require.Equal(t, time.Second, computeRetryAfter(1, 0.5, 10)) + // needed=1, refill=100 → ceil(0.01) = 1 + require.Equal(t, time.Second, computeRetryAfter(1, 0, 100)) +}