Phase 1.4.1: Rate limiting on gateway channels#31
Phase 1.4.1: Rate limiting on gateway channels#31richard-devbot wants to merge 4 commits intoCursorTouch:mainfrom
Conversation
Review Summary by QodoAdd rate limiting to gateway channels with token bucket
WalkthroughsDescription• Implements in-process token bucket rate limiter with per-agent isolation • Integrates rate limiting into ACPServer request handler (HTTP 429 responses) • Adds comprehensive test suite covering token refill, burst capacity, and per-key isolation • Thread-safe design using stdlib locks, no external dependencies Diagramflowchart LR
Request["Incoming Request"]
Auth["Auth Check"]
AgentCheck["Agent Authorization"]
RateLimit["Rate Limiter Check"]
Allowed["Request Allowed"]
Denied["HTTP 429 Response"]
Request --> Auth
Auth --> AgentCheck
AgentCheck --> RateLimit
RateLimit -->|Tokens Available| Allowed
RateLimit -->|Rate Limited| Denied
File Changes1. operator_use/utils/rate_limiter.py
|
Code Review by Qodo
1. TokenBucket not sliding-window
|
| """In-process token bucket rate limiter for gateway channels. | ||
|
|
||
| Provides per-channel (or per-agent) rate limiting with no external dependencies. | ||
| Thread-safe via threading.Lock; async-safe because consume() is non-blocking. | ||
| """ | ||
|
|
||
| import threading | ||
| import time | ||
| from dataclasses import dataclass, field | ||
|
|
||
|
|
||
| @dataclass | ||
| class TokenBucket: | ||
| """Thread-safe token bucket rate limiter. | ||
|
|
||
| Args: | ||
| rate: Number of tokens added per second. | ||
| capacity: Maximum number of tokens the bucket can hold. | ||
| """ | ||
|
|
||
| rate: float | ||
| capacity: float | ||
| _tokens: float = field(init=False) | ||
| _last_refill: float = field(init=False) | ||
| _lock: threading.Lock = field(init=False, default_factory=threading.Lock) | ||
|
|
||
| def __post_init__(self) -> None: | ||
| self._tokens = self.capacity | ||
| self._last_refill = time.monotonic() | ||
|
|
||
| def _refill(self) -> None: | ||
| """Add tokens based on elapsed time. Must be called with lock held.""" | ||
| now = time.monotonic() | ||
| elapsed = now - self._last_refill | ||
| self._tokens = min(self.capacity, self._tokens + elapsed * self.rate) | ||
| self._last_refill = now | ||
|
|
||
| def consume(self, tokens: float = 1.0) -> bool: | ||
| """Try to consume tokens. Returns True if allowed, False if rate limited.""" | ||
| with self._lock: | ||
| self._refill() | ||
| if self._tokens >= tokens: | ||
| self._tokens -= tokens | ||
| return True | ||
| return False | ||
|
|
||
| @property | ||
| def available_tokens(self) -> float: | ||
| """Current available tokens (approximate, for diagnostics).""" | ||
| with self._lock: | ||
| self._refill() | ||
| return self._tokens | ||
|
|
||
|
|
||
| class RateLimiter: | ||
| """Per-key rate limiter registry. | ||
|
|
||
| Maintains one TokenBucket per key (channel_id, agent_id, etc.). Thread-safe. | ||
| """ | ||
|
|
||
| def __init__(self, rate: float = 10.0, capacity: float = 20.0) -> None: | ||
| """ | ||
| Args: | ||
| rate: Requests per second allowed per key (default: 10/s). | ||
| capacity: Burst capacity per key (default: 20 requests). | ||
| """ | ||
| self._rate = rate | ||
| self._capacity = capacity | ||
| self._buckets: dict[str, TokenBucket] = {} | ||
| self._lock = threading.Lock() | ||
|
|
||
| def is_allowed(self, key: str) -> bool: | ||
| """Check if a request for the given key is allowed under the rate limit.""" | ||
| with self._lock: | ||
| if key not in self._buckets: | ||
| self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity) | ||
| return self._buckets[key].consume() |
There was a problem hiding this comment.
1. tokenbucket not sliding-window 📎 Requirement gap ⛨ Security
The new limiter is a token-bucket implementation with defaults rate=10.0 and capacity=20.0, not the required sliding-window limiter with a default of 30 requests per 60 seconds. It is implemented under operator_use/utils and invoked from ACPServer, not as operator_use/gateway/rate_limiter.py enforced at the gateway before orchestration.
Agent Prompt
## Issue description
The PR adds a token-bucket limiter under `operator_use/utils` and applies it in `ACPServer`, but the compliance requirement is a gateway-level sliding-window limiter module at `operator_use/gateway/rate_limiter.py` with a default of 30 requests per 60 seconds.
## Issue Context
This is intended to mitigate message flooding before messages reach orchestration.
## Fix Focus Areas
- operator_use/utils/rate_limiter.py[1-82]
- operator_use/acp/server.py[36-36]
- operator_use/acp/server.py[82-83]
- operator_use/acp/server.py[234-240]
- operator_use/gateway/channels/base.py[40-44]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| # Per-agent rate limiter: 10 req/s sustained, 20-request burst | ||
| self._rate_limiter = RateLimiter(rate=10.0, capacity=20.0) |
There was a problem hiding this comment.
2. Rate limits hard-coded 📎 Requirement gap ⚙ Maintainability
Rate limiting parameters are hard-coded (e.g., RateLimiter(rate=10.0, capacity=20.0)) and the limiter is not controlled via the required config.json rate_limit schema. This prevents deployments from tuning limits without code changes.
Agent Prompt
## Issue description
Rate limit parameters are hard-coded and do not read `rate_limit: {"max_requests": 30, "window_seconds": 60}` from `config.json`.
## Issue Context
The compliance checklist requires rate limit configuration to be adjustable per deployment environment via `config.json`.
## Fix Focus Areas
- operator_use/acp/server.py[82-83]
- operator_use/utils/rate_limiter.py[61-77]
- operator_use/config/service.py[289-338]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| # Rate limit: per-agent token bucket (10 req/s, 20 burst) | ||
| if not self._rate_limiter.is_allowed(target_agent_id): | ||
| logger.warning("Rate limit exceeded for agent %s", target_agent_id) |
There was a problem hiding this comment.
3. Limiter keyed by agent 📎 Requirement gap ⛨ Security
Rate limiting is keyed by target_agent_id, so multiple users share the same bucket and can throttle each other. The requirement is per-user tracking keyed by the channel user_id.
Agent Prompt
## Issue description
Rate limiting is currently keyed by agent identifier, not by channel `user_id`.
## Issue Context
Per-user buckets prevent one user from impacting others.
## Fix Focus Areas
- operator_use/acp/server.py[224-240]
- operator_use/bus/views.py[86-91]
- operator_use/gateway/channels/base.py[40-44]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| class TestRateLimiter: | ||
| def test_allows_requests_within_limit(self): | ||
| limiter = RateLimiter(rate=100.0, capacity=10.0) | ||
| for _ in range(10): | ||
| assert limiter.is_allowed("channel-1") is True | ||
|
|
||
| def test_denies_requests_over_limit(self): | ||
| limiter = RateLimiter(rate=0.01, capacity=1.0) | ||
| limiter.is_allowed("channel-1") # consume the one token | ||
| assert limiter.is_allowed("channel-1") is False | ||
|
|
||
| def test_independent_buckets_per_channel(self): | ||
| limiter = RateLimiter(rate=0.01, capacity=1.0) | ||
| limiter.is_allowed("ch-A") # drain ch-A | ||
| assert limiter.is_allowed("ch-A") is False | ||
| assert limiter.is_allowed("ch-B") is True # ch-B still full | ||
|
|
||
| def test_reset_clears_bucket(self): | ||
| limiter = RateLimiter(rate=0.01, capacity=1.0) | ||
| limiter.is_allowed("ch-X") # drain | ||
| assert limiter.is_allowed("ch-X") is False | ||
| limiter.reset("ch-X") | ||
| assert limiter.is_allowed("ch-X") is True # fresh bucket after reset | ||
|
|
||
| def test_reset_nonexistent_key_is_noop(self): | ||
| limiter = RateLimiter(rate=10.0, capacity=5.0) | ||
| limiter.reset("does-not-exist") # should not raise |
There was a problem hiding this comment.
4. Tests miss sustained overload 📎 Requirement gap ☼ Reliability
The new tests validate basic allow/deny, refill, and manual reset(), but do not cover sustained overload over time or automatic recovery after a window expires. This does not meet the required test coverage scenarios.
Agent Prompt
## Issue description
Test coverage is missing for sustained overload over time and for recovery after the configured window/period expires (without calling `reset()`).
## Issue Context
The checklist explicitly requires coverage of sustained overload and window reset behaviors.
## Fix Focus Areas
- tests/test_rate_limiter.py[1-71]
- operator_use/utils/rate_limiter.py[1-82]
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def is_allowed(self, key: str) -> bool: | ||
| """Check if a request for the given key is allowed under the rate limit.""" | ||
| with self._lock: | ||
| if key not in self._buckets: | ||
| self._buckets[key] = TokenBucket(rate=self._rate, capacity=self._capacity) | ||
| return self._buckets[key].consume() | ||
|
|
||
| def reset(self, key: str) -> None: | ||
| """Remove the rate limit bucket for a key (e.g. for tests or channel teardown).""" | ||
| with self._lock: | ||
| self._buckets.pop(key, None) |
There was a problem hiding this comment.
5. Ratelimiter dict race 🐞 Bug ☼ Reliability
RateLimiter.is_allowed() reads self._buckets[key] after releasing the registry lock, so a concurrent reset(key) can pop the entry and cause a KeyError (500) even though the class claims thread-safety. This can occur during “channel teardown” scenarios described by the reset() docstring.
Agent Prompt
## Issue description
`RateLimiter.is_allowed()` performs `self._buckets[key]` lookup outside the registry lock. If another thread/task calls `reset(key)` between the lock release and that lookup, `is_allowed()` can raise `KeyError` and return a 500.
## Issue Context
The class claims thread-safety and `reset()` is documented for teardown use, which is exactly the scenario where `reset()` might race with in-flight requests.
## Fix Focus Areas
- operator_use/utils/rate_limiter.py[72-82]
## Suggested fix
Change `is_allowed()` to capture the bucket reference while holding `_lock`:
- under `_lock`, do `bucket = self._buckets.get(key)`; create/store if missing
- after releasing `_lock`, call `bucket.consume()` on the captured reference
This removes the post-lock dict access race while keeping per-bucket locking behavior.
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
Ready for review & merge ✅Hey @Jeomon, PR #31 is ready to land. This closes Issue #23 (rate limiting on gateway channels). What's in here:
Known limitation (flagged transparently): In-process only — rate limit state resets on server restart, and there's no distributed coordination across multiple server instances. This PR is independent and can merge at any time. No conflicts with main. |
a6fbbce to
caaa08b
Compare
Response to Qodo Review — PR #31All 8 findings (3 bugs + 5 requirement gaps) have been addressed. FixedReq Gap 1 — Wrong algorithm + wrong location
Req Gap 2 — Hard-coded rate limit params
Req Gap 3 — Limiter keyed by agent, not user
Req Gap 4 / Bug 4 — Tests missing sustained overload + window-slide recovery
Bug 5 — Dict race:
Req Gap 6 — Unfriendly 429 message exposes internal agent ID
Bug 7 — Late rate-limit check (after JSON parse)
Bug 8 — Unthrottled 429 warning logs
All changes pushed to |
…orTouch#11] Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…rsions [CursorTouch#11] Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Fixes 22 ruff violations: unused imports (F401), unused variables (F841), semicolon-split statements (E702), and import ordering/formatting issues via ruff --fix and manual edits. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
da960c6 to
6a8ea21
Compare
…[ci]
- BrowserPlugin.SYSTEM_PROMPT: add <perception>, <tool_use>, <execution_principles> sections
- BrowserPlugin.register_hooks: actually register _state_hook on BEFORE_LLM_CALL when enabled
- BrowserPlugin.unregister_hooks: unregister _state_hook from BEFORE_LLM_CALL
- BrowserPlugin.unregister_tools: call unset_extension for "browser" and "_browser"
- BrowserPlugin.enable/disable: wire hook register/unregister through lifecycle
- ComputerPlugin.SYSTEM_PROMPT: add <perception>, <tool_use>, <execution_principles> sections
- ComputerPlugin.register_hooks: register _state_hook + _wait_for_ui_hook when enabled
- ComputerPlugin.unregister_hooks: unregister both hooks
- ComputerPlugin.enable/disable: wire hook register/unregister through lifecycle
- control_center: pass kwargs._graceful_restart_fn through to _do_restart(graceful_fn=...)
- ToolRegistry.get: also check _extensions so registry.get("browser") finds the browser instance
- ruff format: reformat entire codebase to resolve style violations
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Summary
Closes #23.
operator_use/utils/rate_limiter.pywithTokenBucketandRateLimiterclassesthreading.Lockfor concurrent accessACPServer._handle_create_run-- rate-limited requests return HTTP 429 with a JSON error body and emit a warning logTest Plan
pytest tests/test_rate_limiter.py -v-- 11 tests passIntegration Point
Rate limiting is enforced in
ACPServer._handle_create_run(operator_use/acp/server.py), after auth and agent resolution but before run creation. This means:Failure Conditions