Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"time"

"github.com/bootjp/elastickv/kv"
Expand Down Expand Up @@ -55,6 +56,11 @@ const (
sqsErrInternalFailure = "InternalFailure"
sqsErrServiceUnavailable = "ServiceUnavailable"
sqsErrMalformedRequest = "MalformedQueryString"
// sqsErrThrottling is the per-queue rate-limit rejection code.
// Returned with HTTP 400 and a Retry-After header derived from the
// bucket's refillRate + the request's charge count (see
// computeRetryAfter in sqs_throttle.go for the formula).
sqsErrThrottling = "Throttling"
)

type SQSServerOption func(*SQSServer)
Expand All @@ -76,6 +82,11 @@ type SQSServer struct {
// goroutines without ordering between them.
reaperCtx context.Context
reaperCancel context.CancelFunc
// throttle is the per-queue rate-limit bucket store. Always
// non-nil; charge() short-circuits when the queue's meta has no
// throttle config so unconfigured queues pay one nil-check per
// request and nothing else (see sqs_throttle.go).
throttle *bucketStore
}

// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
Expand All @@ -98,6 +109,7 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin
coordinator: coordinate,
reaperCtx: reaperCtx,
reaperCancel: reaperCancel,
throttle: newBucketStoreDefault(),
}
s.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){
sqsCreateQueueTarget: s.createQueue,
Expand Down Expand Up @@ -131,6 +143,10 @@ func NewSQSServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordin

func (s *SQSServer) Run() error {
s.startReaper(s.reaperCtx)
// Throttle bucket idle-evict runs on a background ticker so the
// request hot path never pays the O(N) sweep cost. Cleaned up by
// the same reaperCtx cancellation that stops the message reaper.
go s.throttle.runSweepLoop(s.reaperCtx)
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -267,3 +283,19 @@ func writeSQSError(w http.ResponseWriter, status int, code string, message strin
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(resp)
}

// writeSQSThrottlingError emits the rate-limit rejection envelope: 400
// + the AWS-shaped JSON error body + a Retry-After header carrying
// the integer-second wait derived from the bucket's refill rate and
// the request's charge count. The action argument is the bucket-action
// vocabulary ("Send" | "Receive" | "*") so the operator-visible
// message names the bucket that ran out, not just the queue.
func writeSQSThrottlingError(w http.ResponseWriter, queue, action string, retryAfter time.Duration) {
if retryAfter < time.Second {
retryAfter = time.Second
}
secs := int(retryAfter / time.Second)
w.Header().Set("Retry-After", strconv.Itoa(secs))
writeSQSError(w, http.StatusBadRequest, sqsErrThrottling,
"Rate exceeded for queue '"+queue+"' action '"+action+"'")
}
439 changes: 432 additions & 7 deletions adapter/sqs_catalog.go

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions adapter/sqs_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func TestSQSServer_CatalogCreateIsIdempotent(t *testing.T) {
if got, _ := out3["__type"].(string); got != sqsErrQueueNameExists {
t.Fatalf("differing-attrs error type: got %q want %q", got, sqsErrQueueNameExists)
}

// Fourth call: same name, same non-throttle attrs as the original
// create, but different Throttle* values. The original create had
// no Throttle config; this one adds one. throttleConfigEqual must
// notice the diff and the call must reject as QueueNameExists.
// Without this case a bug in throttleConfigEqual (e.g. always
// returning true) would slip past the existing VisibilityTimeout-
// only test (Claude review on PR #679 round 2).
withThrottle := map[string]any{
"QueueName": "idempotent",
"Attributes": map[string]string{
"VisibilityTimeout": "60",
"ThrottleSendCapacity": "10",
"ThrottleSendRefillPerSecond": "1",
},
}
status4, out4 := callSQS(t, node, sqsCreateQueueTarget, withThrottle)
if status4 != http.StatusBadRequest {
t.Fatalf("re-create with added Throttle*: got %d want 400; body %v", status4, out4)
}
if got, _ := out4["__type"].(string); got != sqsErrQueueNameExists {
t.Fatalf("Throttle*-diff error type: got %q want %q", got, sqsErrQueueNameExists)
}
}

func TestSQSServer_CatalogGetAndSetAttributes(t *testing.T) {
Expand Down
59 changes: 53 additions & 6 deletions adapter/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,33 +294,67 @@ type sqsChangeVisibilityInput struct {

// ------------------------ handlers ------------------------

func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
// prepareSendMessage decodes the SendMessage payload and resolves
// the queue name. Throttle charging happens after the meta load in
// validateSend so we don't pay an extra meta read just to discover
// throttling is off (Gemini high on PR #679).
func (s *SQSServer) prepareSendMessage(w http.ResponseWriter, r *http.Request) (sqsSendMessageInput, string, bool) {
var in sqsSendMessageInput
if err := decodeSQSJSONInput(r, &in); err != nil {
writeSQSErrorFromErr(w, err)
return
return in, "", false
}
queueName, err := queueNameFromURL(in.QueueUrl)
if err != nil {
writeSQSErrorFromErr(w, err)
return
return in, "", false
}
return in, queueName, true
}

// validateSend loads queue meta, runs the throttle charge against
// the loaded throttle config (no extra meta read), then validates
// message attributes / FIFO params and resolves the delay. Returns
// ok=false if any step has already written the error response.
//
// Throttle check sits AFTER the meta load (so we have the throttle
// config) and AFTER the QueueDoesNotExist branch (so a missing
// queue is reported as 400 QueueDoesNotExist, not as a Throttling
// 400 against a non-existent bucket). It still sits OUTSIDE the
// OCC transaction (§4.2): a rejected request never reaches the
// coordinator.
func (s *SQSServer) validateSend(w http.ResponseWriter, r *http.Request, queueName string, in sqsSendMessageInput) (*sqsQueueMeta, uint64, int64, bool) {
meta, readTS, apiErr := s.loadQueueMetaForSend(r.Context(), queueName, []byte(in.MessageBody))
if apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
if !s.chargeQueueWithThrottle(w, queueName, bucketActionSend, 1, meta.Throttle) {
return nil, 0, 0, false
Comment on lines +332 to +333
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid charging with stale queue metadata after delete races

This charges using meta.Throttle loaded earlier, so if DeleteQueue commits between the meta read and this call, the in-flight request can recreate a throttle bucket after deleteQueue already invalidated it. Because buckets are keyed by queue name/action (not generation), a same-name recreate can inherit stale token state instead of starting fresh, violating the delete+recreate reset behavior the invalidation is meant to guarantee.

Useful? React with 👍 / 👎.

}
if apiErr := validateMessageAttributes(in.MessageAttributes); apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
if apiErr := validateSendFIFOParams(meta, in); apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return
return nil, 0, 0, false
}
delay, apiErr := resolveSendDelay(meta, in.DelaySeconds)
if apiErr != nil {
writeSQSErrorFromErr(w, apiErr)
return nil, 0, 0, false
}
return meta, readTS, delay, true
}

func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) {
in, queueName, ok := s.prepareSendMessage(w, r)
if !ok {
return
}
meta, readTS, delay, ok := s.validateSend(w, r, queueName, in)
if !ok {
return
}
if meta.IsFIFO {
Expand Down Expand Up @@ -530,6 +564,13 @@ func (s *SQSServer) receiveMessage(w http.ResponseWriter, r *http.Request) {
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
return
}
// Throttle check uses the loaded meta's throttle config so we
// don't pay an extra meta read just to discover throttling is off
// (Gemini high on PR #679). Sits AFTER the QueueDoesNotExist
// branch — a missing queue should not consume a Recv token.
if !s.chargeQueueWithThrottle(w, queueName, bucketActionReceive, 1, meta.Throttle) {
return
}
max, maxErr := resolveReceiveMaxMessages(in.MaxNumberOfMessages)
if maxErr != nil {
writeSQSErrorFromErr(w, maxErr)
Expand Down Expand Up @@ -1106,6 +1147,9 @@ func (s *SQSServer) deleteMessage(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
return
}
if err := s.deleteMessageWithRetry(r.Context(), queueName, handle); err != nil {
writeSQSErrorFromErr(w, err)
return
Expand Down Expand Up @@ -1258,6 +1302,9 @@ func (s *SQSServer) changeMessageVisibility(w http.ResponseWriter, r *http.Reque
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, 1) {
return
}
if err := s.changeVisibilityWithRetry(r.Context(), queueName, handle, timeout); err != nil {
writeSQSErrorFromErr(w, err)
return
Expand Down
9 changes: 9 additions & 0 deletions adapter/sqs_messages_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (s *SQSServer) sendMessageBatch(w http.ResponseWriter, r *http.Request) {
"total batch payload exceeds 262144 bytes")
return
}
if !s.chargeQueue(w, r, queueName, bucketActionSend, throttleChargeCount(len(in.Entries))) {
return
}

successful, failed, err := s.sendMessageBatchWithRetry(r.Context(), queueName, in.Entries)
if err != nil {
Expand Down Expand Up @@ -453,6 +456,9 @@ func (s *SQSServer) deleteMessageBatch(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
return
}

successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
failed := make([]sqsBatchResultErrorEntry, 0)
Expand Down Expand Up @@ -519,6 +525,9 @@ func (s *SQSServer) changeMessageVisibilityBatch(w http.ResponseWriter, r *http.
writeSQSErrorFromErr(w, err)
return
}
if !s.chargeQueue(w, r, queueName, bucketActionReceive, throttleChargeCount(len(in.Entries))) {
return
}

successful := make([]sqsBatchResultEntry, 0, len(in.Entries))
failed := make([]sqsBatchResultErrorEntry, 0)
Expand Down
Loading
Loading