diff --git a/adapters/ccxt_shim/breeze_ccxt.py b/adapters/ccxt_shim/breeze_ccxt.py index 132fd85bf..27ae77ea9 100644 --- a/adapters/ccxt_shim/breeze_ccxt.py +++ b/adapters/ccxt_shim/breeze_ccxt.py @@ -5,7 +5,6 @@ import os import tempfile import threading import time -from collections import deque from datetime import datetime from pathlib import Path from typing import Any @@ -29,39 +28,13 @@ from adapters.ccxt_shim.security_master import ( from adapters.ccxt_shim.market_hours import MarketHoursGuard from adapters.ccxt_shim.risk_guard import RiskGuard from adapters.ccxt_shim.order_router import OrderRouter +from adapters.ccxt_shim.rate_limiter import RateLimiter from freqtrade.exceptions import OperationalException logger = logging.getLogger(__name__) -class InternalRateLimiter: - def __init__(self, rpm: int = 100, rpd: int = 5000): - self.rpm = rpm - self.rpd = rpd - self.history = deque() - self.daily_count = 0 - self.last_reset = time.time() - - def check_and_record(self): - now = time.time() - if now - self.last_reset > 86400: - self.daily_count = 0 - self.last_reset = now - if self.daily_count >= self.rpd: - raise OperationalException("Daily rate limit exceeded (5000)") - while self.history and now - self.history[0] > 60: - self.history.popleft() - if len(self.history) >= self.rpm: - sleep_time = 60 - (now - self.history[0]) - if sleep_time > 0: - logger.warning(f"Rate limit hit. Sleeping for {sleep_time:.2f}s") - time.sleep(sleep_time) - now = time.time() - self.history.append(now) - self.daily_count += 1 - - class BreezeCCXT(ccxt.Exchange): _mock_mode_logged = False _mock_ohlcv_lock = threading.Lock() @@ -90,8 +63,8 @@ class BreezeCCXT(ccxt.Exchange): self.risk_guard = RiskGuard({}) # Defaults to enabled=True, max=10 # Rate Limiting - rl_config = self.options.get("rateLimit", 100) - self.rate_limiter = InternalRateLimiter(rpm=rl_config) + # P17: Switched to centralized RateLimiter with Env support + self.rate_limiter = RateLimiter() self._security_master_cache: dict[str, Any] | None = None self.market_hours = MarketHoursGuard() self.order_router = OrderRouter(lambda: self.markets) @@ -306,7 +279,7 @@ class BreezeCCXT(ccxt.Exchange): def fetch_markets(self, params: dict | None = None): if self.rate_limiter: - self.rate_limiter.check_and_record() + self.rate_limiter.allow("fetch_markets") master = self._load_security_master() whitelist = self.config.get("pair_whitelist", []) if not whitelist: @@ -423,12 +396,13 @@ class BreezeCCXT(ccxt.Exchange): } def fetch_ticker(self, symbol: str, params: dict | None = None): + self.rate_limiter.allow("fetch_ticker") + if self._is_mock_mode(): return self._generate_mock_ticker(symbol) if not self.breeze: raise OperationalException("Breeze session not initialized.") - self.rate_limiter.check_and_record() s_params = self._parse_symbol(symbol) try: res = self.breeze.get_quotes(**s_params) @@ -483,12 +457,12 @@ class BreezeCCXT(ccxt.Exchange): limit: int | None = None, params: dict | None = None, ): + self.rate_limiter.allow("fetch_ohlcv") if self._is_mock_mode(): return self._generate_mock_ohlcv(symbol, timeframe, since, limit) if not self.breeze: raise OperationalException("Breeze session not initialized.") - self.rate_limiter.check_and_record() s_params = self._parse_symbol(symbol) interval = self.timeframes.get(timeframe) if not interval: @@ -553,6 +527,7 @@ class BreezeCCXT(ccxt.Exchange): self, symbol, order_type, side, amount, price=None, params: dict | None = None ): logger.info(f"BreezeCCXT.create_order (Sync) called for {symbol} {side}") + self.rate_limiter.allow("create_order") self.market_hours.assert_can_create_order(side, symbol) # P15 Risk Guard Check @@ -623,6 +598,7 @@ class BreezeCCXT(ccxt.Exchange): raise OperationalException("create_order not supported in real mode yet.") def cancel_order(self, order_id, symbol=None, params: dict | None = None): + self.rate_limiter.allow("cancel_order") self.market_hours.assert_can_cancel_order(order_id, str(symbol)) if self._is_mock_mode(): diff --git a/adapters/ccxt_shim/rate_limiter.py b/adapters/ccxt_shim/rate_limiter.py new file mode 100644 index 000000000..f472b5f27 --- /dev/null +++ b/adapters/ccxt_shim/rate_limiter.py @@ -0,0 +1,117 @@ +import logging +import os +import time +from typing import Any + +from freqtrade.exceptions import OperationalException + +logger = logging.getLogger(__name__) + + +class RateLimiter: + """ + Token Bucket Rate Limiter for BreezeCCXT shim. + Enforces API limits locally to prevent 429s or to provide deterministic + blocking for testing. + """ + + def __init__(self): + # Configuration via Environment Variables (Shim standard) + self.enabled = os.environ.get("FT_RATE_LIMIT_DISABLE", "0") != "1" + + # Limit Configuration + # Default: 100 requests per minute + self.per_minute = int(os.environ.get("FT_RATE_LIMIT_PER_MINUTE", "100")) + + # Mode: 'sleep' (production default) or 'block' (testing/gates) + self.mode = os.environ.get("FT_RATE_LIMIT_MODE", "sleep").lower() + + # Token Bucket State + self.capacity = self.per_minute + self.tokens = float(self.capacity) + self.last_refill = time.time() + + # Refill rate: tokens per second + # If per_minute=60, rate=1.0 token/sec + self.refill_rate = self.per_minute / 60.0 + + if self.enabled: + logger.info( + f"RateLimiter initialized: {self.per_minute}/min, " + f"Rate: {self.refill_rate:.4f} tps, Mode: {self.mode}" + ) + else: + logger.warning("RateLimiter is DISABLED via FT_RATE_LIMIT_DISABLE") + + def _refill(self): + now = time.time() + elapsed = now - self.last_refill + + if elapsed > 0: + added = elapsed * self.refill_rate + self.tokens = min(self.capacity, self.tokens + added) + self.last_refill = now + + def allow(self, op: str, cost: int = 1) -> None: + """ + Check if operation is allowed. + Consumes tokens if available. + If not available: + - mode='sleep': Sleeps until tokens available + - mode='block': Raises OperationalException + """ + if not self.enabled: + return + + self._refill() + + if self.tokens >= cost: + self.tokens -= cost + self._log_usage(op, cost) + return + + # Not enough tokens + if self.mode == "block": + self._raise_block(op, cost) + else: + self._sleep_until_allowed(op, cost) + + def _raise_block(self, op: str, cost: int): + # Stable token for acceptance gates + logger.warning(f"RATE_LIMIT_BLOCK: op={op} cost={cost} remaining={self.tokens:.2f}") + raise OperationalException(f"rate_limit_block: {op}") + + def _sleep_until_allowed(self, op: str, cost: int): + needed = cost - self.tokens + # Time required to refill 'needed' tokens + sleep_time = needed / self.refill_rate + + # Safety bound (prevent infinite sleeps if config is bad) + if sleep_time > 60: + logger.warning(f"RateLimiter request to sleep {sleep_time:.2f}s clamped to 60s") + sleep_time = 60 + + logger.info( + f"RATE_LIMIT_SLEEP: op={op} cost={cost} remaining={self.tokens:.2f} sleep={sleep_time:.3f}s" + ) + time.sleep(sleep_time) + + # After sleep, refill and consume + self._refill() + self.tokens -= cost + self._log_usage(op, cost) + + def _log_usage(self, op: str, cost: int): + # Debug level to avoid spam, unless approaching limit? + # For P17 verification, might want distinct logs. + # Keeping it DEBUG for high volume, INFO for block/sleep. + logger.debug(f"RATE_LIMIT: op={op} cost={cost} remaining={self.tokens:.2f}") + + def stats(self) -> dict[str, Any]: + self._refill() + return { + "enabled": self.enabled, + "mode": self.mode, + "tokens": self.tokens, + "capacity": self.capacity, + } diff --git a/tests/exchange/test_icicibreeze_rate_limit_applied.py b/tests/exchange/test_icicibreeze_rate_limit_applied.py new file mode 100644 index 000000000..8b2955349 --- /dev/null +++ b/tests/exchange/test_icicibreeze_rate_limit_applied.py @@ -0,0 +1,75 @@ +import pytest +import os +from unittest import mock +from freqtrade.exceptions import OperationalException +from adapters.ccxt_shim.breeze_ccxt import BreezeCCXT + + +@pytest.fixture +def rate_limited_exchange(): + """ + Fixture for BreezeCCXT with strict rate limiting enabled via environment. + """ + with mock.patch.dict( + os.environ, + { + "BREEZE_MOCK": "1", + "FT_RATE_LIMIT_PER_MINUTE": "10", + "FT_RATE_LIMIT_MODE": "block", + "FT_RATE_LIMIT_DISABLE": "0", + }, + ): + exposure = {"risk_guard": {"enabled": False}} # Disable risk guard to isolate rate limit + exchange = BreezeCCXT(config=exposure) + yield exchange + + +def test_fetch_ticker_rate_limit(rate_limited_exchange): + symbol = "RELIANCE/INR" + + # First 10 calls should pass + for i in range(10): + res = rate_limited_exchange.fetch_ticker(symbol) + assert res["symbol"] == symbol + + # 11th call should block + with pytest.raises(OperationalException, match="rate_limit_block: fetch_ticker"): + rate_limited_exchange.fetch_ticker(symbol) + + +def test_fetch_markets_rate_limit(rate_limited_exchange): + # Depending on how tests run, tokens might be shared if instance persists or class state + # Fixture creates new instance each time, which creates new RateLimiter, so fresh bucket. + + # 10 calls pass + for i in range(10): + rate_limited_exchange.fetch_markets() + + # 11th blocks + with pytest.raises(OperationalException, match="rate_limit_block: fetch_markets"): + rate_limited_exchange.fetch_markets() + + +def test_create_order_rate_limit(rate_limited_exchange): + symbol = "RELIANCE/INR" + # To pass order creation checks, we need to ensure market hours/risk guard don't block first. + # risk_guard disabled in fixture. + # We mock market hours to allow. + + with mock.patch("adapters.ccxt_shim.market_hours.MarketHoursGuard.assert_can_create_order"): + # P15 risk check calls fetch_ticker, which consumes a token. + # We mock fetch_ticker to avoid double consumption and isolate create_order check. + # We need to mock it on the INSTANCE, not the class, because the instance RateLimiter is what we rely on? + # Use mock.patch.object + with mock.patch.object( + rate_limited_exchange, + "fetch_ticker", + return_value={"bid": 2490, "ask": 2510, "last": 2500}, + ): + # 10 calls pass + for i in range(10): + rate_limited_exchange.create_order(symbol, "limit", "buy", 1, 2500) + + # 11th blocks + with pytest.raises(OperationalException, match="rate_limit_block: create_order"): + rate_limited_exchange.create_order(symbol, "limit", "buy", 1, 2500) diff --git a/tests/test_rate_limiter_block_mode.py b/tests/test_rate_limiter_block_mode.py new file mode 100644 index 000000000..23bfba0a4 --- /dev/null +++ b/tests/test_rate_limiter_block_mode.py @@ -0,0 +1,38 @@ +import pytest +import os +from unittest import mock +from adapters.ccxt_shim.rate_limiter import RateLimiter +from freqtrade.exceptions import OperationalException + + +@pytest.fixture +def rate_limiter_block(): + with mock.patch.dict( + os.environ, + { + "FT_RATE_LIMIT_PER_MINUTE": "60", # 1 token per second + "FT_RATE_LIMIT_MODE": "block", + "FT_RATE_LIMIT_DISABLE": "0", + }, + ): + print(f"DEBUG: Created RateLimiter with env: {os.environ.get('FT_RATE_LIMIT_MODE')}") + yield RateLimiter() + + +def test_block_mode_allow(rate_limiter_block): + # Should start full + assert rate_limiter_block.tokens == 60.0 + + # Allow 1 + rate_limiter_block.allow("test_op") + assert rate_limiter_block.tokens == 59.0 + + +def test_block_mode_exceed_limit(rate_limiter_block): + # Drain 60 tokens + for _ in range(60): + rate_limiter_block.allow("test_drain") + + # 61st call should raise + with pytest.raises(OperationalException, match="rate_limit_block: test_fail"): + rate_limiter_block.allow("test_fail") diff --git a/tests/test_rate_limiter_sleep_mode.py b/tests/test_rate_limiter_sleep_mode.py new file mode 100644 index 000000000..3983a9fbd --- /dev/null +++ b/tests/test_rate_limiter_sleep_mode.py @@ -0,0 +1,35 @@ +import pytest +import os +import time +from unittest import mock +from adapters.ccxt_shim.rate_limiter import RateLimiter + + +@pytest.fixture +def rate_limiter_sleep(): + with mock.patch.dict( + os.environ, + { + "FT_RATE_LIMIT_PER_MINUTE": "60", # 1 token per second + "FT_RATE_LIMIT_MODE": "sleep", + "FT_RATE_LIMIT_DISABLE": "0", + }, + ): + yield RateLimiter() + + +def test_sleep_mode_delays(rate_limiter_sleep): + # Drain 60 tokens + for _ in range(60): + rate_limiter_sleep.allow("test_drain") + + start_time = time.time() + + # 61st call should sleep for approx 1s (to get 1 token) + # We cheat/check token logic: refill rate is 1/sec. + # Cost 1. Needed 1. Time = 1s. + rate_limiter_sleep.allow("test_sleep") + + elapsed = time.time() - start_time + assert elapsed >= 0.9 # Allow slight timing jitter + assert elapsed < 2.0 # Shouldn't sleep excessively