Skip to content

Phase 1.4.1: Rate limiting on gateway channels#31

Open
richard-devbot wants to merge 4 commits intoCursorTouch:mainfrom
richard-devbot:richardson/phase1-rate-limiting
Open

Phase 1.4.1: Rate limiting on gateway channels#31
richard-devbot wants to merge 4 commits intoCursorTouch:mainfrom
richard-devbot:richardson/phase1-rate-limiting

Conversation

@richard-devbot
Copy link
Copy Markdown

Summary

Closes #23.

  • Adds operator_use/utils/rate_limiter.py with TokenBucket and RateLimiter classes
  • Per-agent token bucket: 10 req/s rate, 20-request burst capacity (configurable)
  • Thread-safe via threading.Lock for concurrent access
  • Integrated into ACPServer._handle_create_run -- rate-limited requests return HTTP 429 with a JSON error body and emit a warning log
  • No external dependencies (pure stdlib)

Test Plan

  • pytest tests/test_rate_limiter.py -v -- 11 tests pass
  • Token refill over time verified
  • Burst capacity ceiling verified
  • Multi-token consume verified
  • Per-channel bucket isolation verified
  • Reset clears bucket and allows fresh requests
  • Reset on nonexistent key is a safe no-op

Integration Point

Rate limiting is enforced in ACPServer._handle_create_run (operator_use/acp/server.py), after auth and agent resolution but before run creation. This means:

  • Unauthenticated requests are rejected at 401 before hitting the rate limiter
  • Per-agent token scoping is checked at 403 before hitting the rate limiter
  • Only valid, authorized requests consume rate limit tokens

Failure Conditions

  • In-process only: does not persist across restarts (by design for Phase 1)
  • No distributed rate limiting across multiple server instances
  • Monotonic clock drift on suspend/resume could cause brief burst allowances

@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Add rate limiting to gateway channels with token bucket

✨ Enhancement 🧪 Tests

Grey Divider

Walkthroughs

Description
• Implements in-process token bucket rate limiter with per-agent isolation
• Integrates rate limiting into ACPServer request handler (HTTP 429 responses)
• Adds comprehensive test suite covering token refill, burst capacity, and per-key isolation
• Thread-safe design using stdlib locks, no external dependencies
Diagram
flowchart LR
  Request["Incoming Request"]
  Auth["Auth Check"]
  AgentCheck["Agent Authorization"]
  RateLimit["Rate Limiter Check"]
  Allowed["Request Allowed"]
  Denied["HTTP 429 Response"]
  
  Request --> Auth
  Auth --> AgentCheck
  AgentCheck --> RateLimit
  RateLimit -->|Tokens Available| Allowed
  RateLimit -->|Rate Limited| Denied
Loading

Grey Divider

File Changes

1. operator_use/utils/rate_limiter.py ✨ Enhancement +82/-0

Token bucket rate limiter implementation

• Introduces TokenBucket class with thread-safe token consumption and refill logic
• Implements RateLimiter registry maintaining per-key token buckets
• Uses threading.Lock for thread safety and time.monotonic() for accurate timing
• Provides is_allowed() method for rate limit checks and reset() for cleanup

operator_use/utils/rate_limiter.py


2. operator_use/acp/server.py ✨ Enhancement +11/-0

Integrate rate limiting into ACP server request handler

• Imports RateLimiter from utils module
• Initializes per-agent rate limiter in ACPServer.__init__() with 10 req/s and 20-request burst
• Adds rate limit check in _handle_create_run() after authorization validation
• Returns HTTP 429 with JSON error body and warning log when rate limit exceeded

operator_use/acp/server.py


3. tests/test_rate_limiter.py 🧪 Tests +71/-0

Comprehensive test suite for rate limiter

• Adds 11 tests covering TokenBucket behavior (initial state, consumption, refill, capacity
 ceiling)
• Tests RateLimiter per-key isolation, reset functionality, and rate limit enforcement
• Validates token refill timing and multi-token consumption scenarios
• Verifies reset on nonexistent keys is safe no-op

tests/test_rate_limiter.py


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review bot commented Apr 5, 2026

Code Review by Qodo

🐞 Bugs (3) 📘 Rule violations (0) 📎 Requirement gaps (5) 🎨 UX Issues (0)

Grey Divider


Action required

1. TokenBucket not sliding-window 📎 Requirement gap ⛨ Security
Description
The new limiter is a token-bucket implementation with defaults rate=10.0 and capacity=20.0, not
the required sliding-window limiter with a default of 30 requests per 60 seconds. It is implemented
under operator_use/utils and invoked from ACPServer, not as
operator_use/gateway/rate_limiter.py enforced at the gateway before orchestration.
Code

operator_use/utils/rate_limiter.py[R1-77]

+"""In-process token bucket rate limiter for gateway channels.
+
+Provides per-channel (or per-agent) rate limiting with no external dependencies.
+Thread-safe via threading.Lock; async-safe because consume() is non-blocking.
+"""
+
+import threading
+import time
+from dataclasses import dataclass, field
+
+
+@dataclass
+class TokenBucket:
+    """Thread-safe token bucket rate limiter.
+
+    Args:
+        rate: Number of tokens added per second.
+        capacity: Maximum number of tokens the bucket can hold.
+    """
+
+    rate: float
+    capacity: float
+    _tokens: float = field(init=False)
+    _last_refill: float = field(init=False)
+    _lock: threading.Lock = field(init=False, default_factory=threading.Lock)
+
+    def __post_init__(self) -> None:
+        self._tokens = self.capacity
+        self._last_refill = time.monotonic()
+
+    def _refill(self) -> None:
+        """Add tokens based on elapsed time. Must be called with lock held."""
+        now = time.monotonic()
+        elapsed = now - self._last_refill
+        self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
+        self._last_refill = now
+
+    def consume(self, tokens: float = 1.0) -> bool:
+        """Try to consume tokens. Returns True if allowed, False if rate limited."""
+        with self._lock:
+            self._refill()
+            if self._tokens >= tokens:
+                self._tokens -= tokens
+                return True
+            return False
+
+    @property
+    def available_tokens(self) -> float:
+        """Current available tokens (approximate, for diagnostics)."""
+        with self._lock:
+            self._refill()
+            return self._tokens
+
+
+class RateLimiter:
+    """Per-key rate limiter registry.
+
+    Maintains one TokenBucket per key (channel_id, agent_id, etc.). Thread-safe.
+    """
+
+    def __init__(self, rate: float = 10.0, capacity: float = 20.0) -> None:
+        """
+        Args:
+            rate: Requests per second allowed per key (default: 10/s).
+            capacity: Burst capacity per key (default: 20 requests).
+        """
+        self._rate = rate
+        self._capacity = capacity
+        self._buckets: dict[str, TokenBucket] = {}
+        self._lock = threading.Lock()
+
+    def is_allowed(self, key: str) -> bool:
+        """Check if a request for the given key is allowed under the rate limit."""
+        with self._lock:
+            if key not in self._buckets:
+                self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity)
+        return self._buckets[key].consume()
Evidence
PR Compliance ID 1 requires a sliding-window limiter in operator_use/gateway/rate_limiter.py with
default 30/60 and invoked at the gateway before messages reach orchestration; the added code instead
implements a token bucket in operator_use/utils/rate_limiter.py with 10/s and 20 burst, and the
only new call site is in operator_use/acp/server.py.

Implement sliding-window rate limiter module for gateway channels
operator_use/utils/rate_limiter.py[1-77]
operator_use/acp/server.py[36-36]
operator_use/acp/server.py[82-83]
operator_use/acp/server.py[234-240]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The PR adds a token-bucket limiter under `operator_use/utils` and applies it in `ACPServer`, but the compliance requirement is a gateway-level sliding-window limiter module at `operator_use/gateway/rate_limiter.py` with a default of 30 requests per 60 seconds.

## Issue Context
This is intended to mitigate message flooding before messages reach orchestration.

## Fix Focus Areas
- operator_use/utils/rate_limiter.py[1-82]
- operator_use/acp/server.py[36-36]
- operator_use/acp/server.py[82-83]
- operator_use/acp/server.py[234-240]
- operator_use/gateway/channels/base.py[40-44]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Rate limits hard-coded 📎 Requirement gap ⚙ Maintainability
Description
Rate limiting parameters are hard-coded (e.g., RateLimiter(rate=10.0, capacity=20.0)) and the
limiter is not controlled via the required config.json rate_limit schema. This prevents
deployments from tuning limits without code changes.
Code

operator_use/acp/server.py[R82-83]

+        # Per-agent rate limiter: 10 req/s sustained, 20-request burst
+        self._rate_limiter = RateLimiter(rate=10.0, capacity=20.0)
Evidence
PR Compliance ID 2 requires reading rate_limit: {max_requests, window_seconds} from config.json;
the PR hard-codes limiter settings in ACPServer and defines only rate/capacity defaults in
RateLimiter, with no configuration field present in the root Config model.

Support configuration of rate limits via config.json
operator_use/acp/server.py[82-83]
operator_use/utils/rate_limiter.py[61-66]
operator_use/config/service.py[289-307]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Rate limit parameters are hard-coded and do not read `rate_limit: {"max_requests": 30, "window_seconds": 60}` from `config.json`.

## Issue Context
The compliance checklist requires rate limit configuration to be adjustable per deployment environment via `config.json`.

## Fix Focus Areas
- operator_use/acp/server.py[82-83]
- operator_use/utils/rate_limiter.py[61-77]
- operator_use/config/service.py[289-338]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Limiter keyed by agent 📎 Requirement gap ⛨ Security
Description
Rate limiting is keyed by target_agent_id, so multiple users share the same bucket and can
throttle each other. The requirement is per-user tracking keyed by the channel user_id.
Code

operator_use/acp/server.py[R234-236]

+        # Rate limit: per-agent token bucket (10 req/s, 20 burst)
+        if not self._rate_limiter.is_allowed(target_agent_id):
+            logger.warning("Rate limit exceeded for agent %s", target_agent_id)
Evidence
PR Compliance ID 3 requires per-user accounting keyed by channel user ID; the new enforcement uses
self._rate_limiter.is_allowed(target_agent_id), while incoming bus messages carry a user_id
field intended for per-user identity.

Enforce per-user tracking keyed by channel user ID
operator_use/acp/server.py[224-236]
operator_use/bus/views.py[86-91]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Rate limiting is currently keyed by agent identifier, not by channel `user_id`.

## Issue Context
Per-user buckets prevent one user from impacting others.

## Fix Focus Areas
- operator_use/acp/server.py[224-240]
- operator_use/bus/views.py[86-91]
- operator_use/gateway/channels/base.py[40-44]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


View more (2)
4. Tests miss sustained overload 📎 Requirement gap ☼ Reliability
Description
The new tests validate basic allow/deny, refill, and manual reset(), but do not cover sustained
overload over time or automatic recovery after a window expires. This does not meet the required
test coverage scenarios.
Code

tests/test_rate_limiter.py[R45-71]

+class TestRateLimiter:
+    def test_allows_requests_within_limit(self):
+        limiter = RateLimiter(rate=100.0, capacity=10.0)
+        for _ in range(10):
+            assert limiter.is_allowed("channel-1") is True
+
+    def test_denies_requests_over_limit(self):
+        limiter = RateLimiter(rate=0.01, capacity=1.0)
+        limiter.is_allowed("channel-1")  # consume the one token
+        assert limiter.is_allowed("channel-1") is False
+
+    def test_independent_buckets_per_channel(self):
+        limiter = RateLimiter(rate=0.01, capacity=1.0)
+        limiter.is_allowed("ch-A")  # drain ch-A
+        assert limiter.is_allowed("ch-A") is False
+        assert limiter.is_allowed("ch-B") is True  # ch-B still full
+
+    def test_reset_clears_bucket(self):
+        limiter = RateLimiter(rate=0.01, capacity=1.0)
+        limiter.is_allowed("ch-X")  # drain
+        assert limiter.is_allowed("ch-X") is False
+        limiter.reset("ch-X")
+        assert limiter.is_allowed("ch-X") is True  # fresh bucket after reset
+
+    def test_reset_nonexistent_key_is_noop(self):
+        limiter = RateLimiter(rate=10.0, capacity=5.0)
+        limiter.reset("does-not-exist")  # should not raise
Evidence
PR Compliance ID 5 requires automated tests for sustained overload and window reset behavior; the
added tests do not include a sustained overload scenario over time, nor an assertion that requests
resume after a configured window expires (only a manual reset() call is tested).

Add automated tests covering normal, burst, sustained overload, and window reset behaviors
tests/test_rate_limiter.py[45-71]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
Test coverage is missing for sustained overload over time and for recovery after the configured window/period expires (without calling `reset()`).

## Issue Context
The checklist explicitly requires coverage of sustained overload and window reset behaviors.

## Fix Focus Areas
- tests/test_rate_limiter.py[1-71]
- operator_use/utils/rate_limiter.py[1-82]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. RateLimiter dict race 🐞 Bug ☼ Reliability
Description
RateLimiter.is_allowed() reads self._buckets[key] after releasing the registry lock, so a
concurrent reset(key) can pop the entry and cause a KeyError (500) even though the class claims
thread-safety. This can occur during “channel teardown” scenarios described by the reset()
docstring.
Code

operator_use/utils/rate_limiter.py[R72-82]

+    def is_allowed(self, key: str) -> bool:
+        """Check if a request for the given key is allowed under the rate limit."""
+        with self._lock:
+            if key not in self._buckets:
+                self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity)
+        return self._buckets[key].consume()
+
+    def reset(self, key: str) -> None:
+        """Remove the rate limit bucket for a key (e.g. for tests or channel teardown)."""
+        with self._lock:
+            self._buckets.pop(key, None)
Evidence
is_allowed() exits with self._lock: and then performs a fresh dictionary lookup
(self._buckets[key]) outside the lock; reset() can remove that key under the same lock, making
the post-lock lookup race and potentially raising KeyError.

operator_use/utils/rate_limiter.py[72-82]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`RateLimiter.is_allowed()` performs `self._buckets[key]` lookup outside the registry lock. If another thread/task calls `reset(key)` between the lock release and that lookup, `is_allowed()` can raise `KeyError` and return a 500.

## Issue Context
The class claims thread-safety and `reset()` is documented for teardown use, which is exactly the scenario where `reset()` might race with in-flight requests.

## Fix Focus Areas
- operator_use/utils/rate_limiter.py[72-82]

## Suggested fix
Change `is_allowed()` to capture the bucket reference while holding `_lock`:
- under `_lock`, do `bucket = self._buckets.get(key)`; create/store if missing
- after releasing `_lock`, call `bucket.consume()` on the captured reference
This removes the post-lock dict access race while keeping per-bucket locking behavior.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

6. 429 message not friendly 📎 Requirement gap ≡ Correctness
Description
When blocked, the code returns {"error": "Rate limit exceeded for agent ..."} which is not a
friendly user-facing message and exposes internal agent identifiers. The requirement is a clear
friendly rate-limited message returned by the gateway.
Code

operator_use/acp/server.py[R236-240]

+            logger.warning("Rate limit exceeded for agent %s", target_agent_id)
+            return web.json_response(
+                {"error": f"Rate limit exceeded for agent '{target_agent_id}'"},
+                status=429,
+            )
Evidence
PR Compliance ID 4 requires a friendly user-facing rate-limit message at the gateway; the added
response is a raw error string referencing an internal agent identifier.

Provide friendly user-facing message on rate limiting
operator_use/acp/server.py[236-240]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The rate-limit response is a raw error string that includes internal identifiers, rather than a friendly user-facing message.

## Issue Context
The checklist requires a clear, friendly "rate limited" message when requests are blocked.

## Fix Focus Areas
- operator_use/acp/server.py[234-240]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


7. Late rate-limit check 🐞 Bug ➹ Performance
Description
In _handle_create_run, rate limiting happens after request.json() and RunCreateRequest
validation, so even requests that will be rejected with 429 still incur JSON parsing/validation CPU
and memory cost. Under burst traffic, this undermines rate limiting’s ability to protect the server
from resource exhaustion.
Code

operator_use/acp/server.py[R231-240]

                status=403,
            )

+        # Rate limit: per-agent token bucket (10 req/s, 20 burst)
+        if not self._rate_limiter.is_allowed(target_agent_id):
+            logger.warning("Rate limit exceeded for agent %s", target_agent_id)
+            return web.json_response(
+                {"error": f"Rate limit exceeded for agent '{target_agent_id}'"},
+                status=429,
+            )
Evidence
The handler parses JSON and constructs the Pydantic request model before calling
self._rate_limiter.is_allowed(...), meaning rate-limited requests still pay the parsing/validation
overhead.

operator_use/acp/server.py[212-240]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The rate-limit check occurs after JSON parsing and request model validation. Once the bucket is empty, the server still repeatedly parses JSON only to return 429, which is avoidable work.

## Issue Context
For per-agent tokens, the allowed agent is already known (`request.get('_authed_agent')`) and does not require parsing the body to compute a rate-limit key.

## Fix Focus Areas
- operator_use/acp/server.py[212-240]

## Suggested fix
Move/add an earlier rate-limit gate before `await request.json()`:
- If `authed` is not `None` (per-agent token mode), use `authed` as the rate-limit key and enforce the limiter before parsing.
- Optionally, for global/anonymous access (`authed is None`), add a coarse pre-parse limiter keyed by `request.remote` or a shared key (e.g. "anonymous") to cap parse/validation load, then keep the existing per-target-agent limiter after resolving `target_agent_id`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


8. Unthrottled 429 warnings 🐞 Bug ✧ Quality
Description
Every rate-limited request emits a warning log, which can flood logs during bursts/attacks and
create avoidable IO pressure and noisy alerts. This is a hot-path log with no sampling/throttling.
Code

operator_use/acp/server.py[R234-240]

+        # Rate limit: per-agent token bucket (10 req/s, 20 burst)
+        if not self._rate_limiter.is_allowed(target_agent_id):
+            logger.warning("Rate limit exceeded for agent %s", target_agent_id)
+            return web.json_response(
+                {"error": f"Rate limit exceeded for agent '{target_agent_id}'"},
+                status=429,
+            )
Evidence
The warning is inside the 429 rejection branch and will execute once per rejected request; there is
no rate limiting/sampling around the log emission.

operator_use/acp/server.py[234-240]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`logger.warning(...)` is executed for every rate-limited request. Under sustained rate limiting, this can generate very high log volume and operational noise.

## Issue Context
Logging rate-limit violations can be useful, but it should be sampled/throttled or turned into a metric to avoid per-request log spam.

## Fix Focus Areas
- operator_use/acp/server.py[234-240]

## Suggested fix
Keep observability while avoiding spam by doing one of:
- downgrade to `logger.info` or `logger.debug`
- implement per-agent time-based throttling (e.g., log at most once per N seconds per agent)
- emit a counter/metric (if the repo has a metrics facility) and log only periodically/sampled

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Comment thread operator_use/utils/rate_limiter.py Outdated
Comment on lines +1 to +77
"""In-process token bucket rate limiter for gateway channels.

Provides per-channel (or per-agent) rate limiting with no external dependencies.
Thread-safe via threading.Lock; async-safe because consume() is non-blocking.
"""

import threading
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucket:
"""Thread-safe token bucket rate limiter.

Args:
rate: Number of tokens added per second.
capacity: Maximum number of tokens the bucket can hold.
"""

rate: float
capacity: float
_tokens: float = field(init=False)
_last_refill: float = field(init=False)
_lock: threading.Lock = field(init=False, default_factory=threading.Lock)

def __post_init__(self) -> None:
self._tokens = self.capacity
self._last_refill = time.monotonic()

def _refill(self) -> None:
"""Add tokens based on elapsed time. Must be called with lock held."""
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
self._last_refill = now

def consume(self, tokens: float = 1.0) -> bool:
"""Try to consume tokens. Returns True if allowed, False if rate limited."""
with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return True
return False

@property
def available_tokens(self) -> float:
"""Current available tokens (approximate, for diagnostics)."""
with self._lock:
self._refill()
return self._tokens


class RateLimiter:
"""Per-key rate limiter registry.

Maintains one TokenBucket per key (channel_id, agent_id, etc.). Thread-safe.
"""

def __init__(self, rate: float = 10.0, capacity: float = 20.0) -> None:
"""
Args:
rate: Requests per second allowed per key (default: 10/s).
capacity: Burst capacity per key (default: 20 requests).
"""
self._rate = rate
self._capacity = capacity
self._buckets: dict[str, TokenBucket] = {}
self._lock = threading.Lock()

def is_allowed(self, key: str) -> bool:
"""Check if a request for the given key is allowed under the rate limit."""
with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity)
return self._buckets[key].consume()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. tokenbucket not sliding-window 📎 Requirement gap ⛨ Security

The new limiter is a token-bucket implementation with defaults rate=10.0 and capacity=20.0, not
the required sliding-window limiter with a default of 30 requests per 60 seconds. It is implemented
under operator_use/utils and invoked from ACPServer, not as
operator_use/gateway/rate_limiter.py enforced at the gateway before orchestration.
Agent Prompt
## Issue description
The PR adds a token-bucket limiter under `operator_use/utils` and applies it in `ACPServer`, but the compliance requirement is a gateway-level sliding-window limiter module at `operator_use/gateway/rate_limiter.py` with a default of 30 requests per 60 seconds.

## Issue Context
This is intended to mitigate message flooding before messages reach orchestration.

## Fix Focus Areas
- operator_use/utils/rate_limiter.py[1-82]
- operator_use/acp/server.py[36-36]
- operator_use/acp/server.py[82-83]
- operator_use/acp/server.py[234-240]
- operator_use/gateway/channels/base.py[40-44]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread operator_use/acp/server.py Outdated
Comment on lines +82 to +83
# Per-agent rate limiter: 10 req/s sustained, 20-request burst
self._rate_limiter = RateLimiter(rate=10.0, capacity=20.0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Rate limits hard-coded 📎 Requirement gap ⚙ Maintainability

Rate limiting parameters are hard-coded (e.g., RateLimiter(rate=10.0, capacity=20.0)) and the
limiter is not controlled via the required config.json rate_limit schema. This prevents
deployments from tuning limits without code changes.
Agent Prompt
## Issue description
Rate limit parameters are hard-coded and do not read `rate_limit: {"max_requests": 30, "window_seconds": 60}` from `config.json`.

## Issue Context
The compliance checklist requires rate limit configuration to be adjustable per deployment environment via `config.json`.

## Fix Focus Areas
- operator_use/acp/server.py[82-83]
- operator_use/utils/rate_limiter.py[61-77]
- operator_use/config/service.py[289-338]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread operator_use/acp/server.py Outdated
Comment on lines +234 to +236
# Rate limit: per-agent token bucket (10 req/s, 20 burst)
if not self._rate_limiter.is_allowed(target_agent_id):
logger.warning("Rate limit exceeded for agent %s", target_agent_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Limiter keyed by agent 📎 Requirement gap ⛨ Security

Rate limiting is keyed by target_agent_id, so multiple users share the same bucket and can
throttle each other. The requirement is per-user tracking keyed by the channel user_id.
Agent Prompt
## Issue description
Rate limiting is currently keyed by agent identifier, not by channel `user_id`.

## Issue Context
Per-user buckets prevent one user from impacting others.

## Fix Focus Areas
- operator_use/acp/server.py[224-240]
- operator_use/bus/views.py[86-91]
- operator_use/gateway/channels/base.py[40-44]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread tests/test_rate_limiter.py Outdated
Comment on lines +45 to +71
class TestRateLimiter:
def test_allows_requests_within_limit(self):
limiter = RateLimiter(rate=100.0, capacity=10.0)
for _ in range(10):
assert limiter.is_allowed("channel-1") is True

def test_denies_requests_over_limit(self):
limiter = RateLimiter(rate=0.01, capacity=1.0)
limiter.is_allowed("channel-1") # consume the one token
assert limiter.is_allowed("channel-1") is False

def test_independent_buckets_per_channel(self):
limiter = RateLimiter(rate=0.01, capacity=1.0)
limiter.is_allowed("ch-A") # drain ch-A
assert limiter.is_allowed("ch-A") is False
assert limiter.is_allowed("ch-B") is True # ch-B still full

def test_reset_clears_bucket(self):
limiter = RateLimiter(rate=0.01, capacity=1.0)
limiter.is_allowed("ch-X") # drain
assert limiter.is_allowed("ch-X") is False
limiter.reset("ch-X")
assert limiter.is_allowed("ch-X") is True # fresh bucket after reset

def test_reset_nonexistent_key_is_noop(self):
limiter = RateLimiter(rate=10.0, capacity=5.0)
limiter.reset("does-not-exist") # should not raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

4. Tests miss sustained overload 📎 Requirement gap ☼ Reliability

The new tests validate basic allow/deny, refill, and manual reset(), but do not cover sustained
overload over time or automatic recovery after a window expires. This does not meet the required
test coverage scenarios.
Agent Prompt
## Issue description
Test coverage is missing for sustained overload over time and for recovery after the configured window/period expires (without calling `reset()`).

## Issue Context
The checklist explicitly requires coverage of sustained overload and window reset behaviors.

## Fix Focus Areas
- tests/test_rate_limiter.py[1-71]
- operator_use/utils/rate_limiter.py[1-82]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment thread operator_use/utils/rate_limiter.py Outdated
Comment on lines +72 to +82
def is_allowed(self, key: str) -> bool:
"""Check if a request for the given key is allowed under the rate limit."""
with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity)
return self._buckets[key].consume()

def reset(self, key: str) -> None:
"""Remove the rate limit bucket for a key (e.g. for tests or channel teardown)."""
with self._lock:
self._buckets.pop(key, None)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

5. Ratelimiter dict race 🐞 Bug ☼ Reliability

RateLimiter.is_allowed() reads self._buckets[key] after releasing the registry lock, so a
concurrent reset(key) can pop the entry and cause a KeyError (500) even though the class claims
thread-safety. This can occur during “channel teardown” scenarios described by the reset()
docstring.
Agent Prompt
## Issue description
`RateLimiter.is_allowed()` performs `self._buckets[key]` lookup outside the registry lock. If another thread/task calls `reset(key)` between the lock release and that lookup, `is_allowed()` can raise `KeyError` and return a 500.

## Issue Context
The class claims thread-safety and `reset()` is documented for teardown use, which is exactly the scenario where `reset()` might race with in-flight requests.

## Fix Focus Areas
- operator_use/utils/rate_limiter.py[72-82]

## Suggested fix
Change `is_allowed()` to capture the bucket reference while holding `_lock`:
- under `_lock`, do `bucket = self._buckets.get(key)`; create/store if missing
- after releasing `_lock`, call `bucket.consume()` on the captured reference
This removes the post-lock dict access race while keeping per-bucket locking behavior.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

@richard-devbot
Copy link
Copy Markdown
Author

Ready for review & merge ✅

Hey @Jeomon, PR #31 is ready to land. This closes Issue #23 (rate limiting on gateway channels).

What's in here:

  • operator_use/utils/rate_limiter.pyTokenBucket + RateLimiter, thread-safe, no external deps
  • Integrated into ACPServer._handle_create_run — rate check fires after auth (401) and agent scoping (403), before run creation; returns HTTP 429 on breach
  • Default: 10 req/s sustained, 20-request burst per agent_id
  • 11 unit tests, all passing

Known limitation (flagged transparently): In-process only — rate limit state resets on server restart, and there's no distributed coordination across multiple server instances.

This PR is independent and can merge at any time. No conflicts with main.

@richard-devbot richard-devbot force-pushed the richardson/phase1-rate-limiting branch from a6fbbce to caaa08b Compare April 13, 2026 05:14
@richard-devbot
Copy link
Copy Markdown
Author

Response to Qodo Review — PR #31

All 8 findings (3 bugs + 5 requirement gaps) have been addressed.


Fixed

Req Gap 1 — Wrong algorithm + wrong location

  • Replaced TokenBucket/RateLimiter with SlidingWindowRateLimiter (deque of timestamps per key)
  • Moved to operator_use/gateway/rate_limiter.py per compliance requirement
  • Old operator_use/utils/rate_limiter.py deleted

Req Gap 2 — Hard-coded rate limit params

  • Added RateLimitConfig (requests=30, window_seconds=60, enabled=True) to operator_use/config/service.py
  • Config now has rate_limit: RateLimitConfig = Field(default_factory=RateLimitConfig)
  • ACPServer now accepts rate_limit_config: RateLimitConfig | None and passes it to the limiter

Req Gap 3 — Limiter keyed by agent, not user

  • Changed rate-limit key to authed (per-agent token = caller identity) or request.remote (anonymous fallback)
  • Multiple users no longer share a bucket; each caller is isolated

Req Gap 4 / Bug 4 — Tests missing sustained overload + window-slide recovery

  • Added test_sustained_overload_blocks_continuously: 10 requests after limit exhausted, all blocked
  • Added test_window_slides_after_expiry: requests resume after window expires without calling reset()
  • Full test rewrite for sliding-window semantics (9 tests total)

Bug 5 — Dict race: is_allowed() reads bucket outside lock

  • Fixed by design: SlidingWindowRateLimiter.is_allowed() captures the deque reference while holding _lock, so a concurrent reset() cannot cause a KeyError
  • Covered by test_concurrent_reset_does_not_raise (500 allow-loop + 200 reset-loop threads)

Req Gap 6 — Unfriendly 429 message exposes internal agent ID

  • Changed to: {"error": "Too many requests. Please slow down and try again later."}
  • No internal identifiers exposed

Bug 7 — Late rate-limit check (after JSON parse)

  • Moved rate-limit gate to execute BEFORE await request.json() — rejected requests no longer pay JSON parsing/Pydantic validation overhead

Bug 8 — Unthrottled 429 warning logs

  • Added _log_rate_limit_warning(): logs at most once per 10 seconds per key — prevents log spam under sustained attack

All changes pushed to richardson/phase1-rate-limiting. 515 tests pass.

Richardson Gunde and others added 3 commits April 13, 2026 11:07
…orTouch#11]

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…rsions [CursorTouch#11]

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Fixes 22 ruff violations: unused imports (F401), unused variables (F841),
semicolon-split statements (E702), and import ordering/formatting issues
via ruff --fix and manual edits.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@richard-devbot richard-devbot force-pushed the richardson/phase1-rate-limiting branch from da960c6 to 6a8ea21 Compare April 13, 2026 06:48
…[ci]

- BrowserPlugin.SYSTEM_PROMPT: add <perception>, <tool_use>, <execution_principles> sections
- BrowserPlugin.register_hooks: actually register _state_hook on BEFORE_LLM_CALL when enabled
- BrowserPlugin.unregister_hooks: unregister _state_hook from BEFORE_LLM_CALL
- BrowserPlugin.unregister_tools: call unset_extension for "browser" and "_browser"
- BrowserPlugin.enable/disable: wire hook register/unregister through lifecycle
- ComputerPlugin.SYSTEM_PROMPT: add <perception>, <tool_use>, <execution_principles> sections
- ComputerPlugin.register_hooks: register _state_hook + _wait_for_ui_hook when enabled
- ComputerPlugin.unregister_hooks: unregister both hooks
- ComputerPlugin.enable/disable: wire hook register/unregister through lifecycle
- control_center: pass kwargs._graceful_restart_fn through to _do_restart(graceful_fn=...)
- ToolRegistry.get: also check _extensions so registry.get("browser") finds the browser instance
- ruff format: reformat entire codebase to resolve style violations

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Phase 1.4.1] Add rate limiting to gateway channels

1 participant