pull/12760/head
vijay sharma 3 weeks ago
parent f1b58b3b0d
commit 5b9eeb9e32

@ -5,7 +5,6 @@ import os
import tempfile
import threading
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Any
@ -29,39 +28,13 @@ from adapters.ccxt_shim.security_master import (
from adapters.ccxt_shim.market_hours import MarketHoursGuard
from adapters.ccxt_shim.risk_guard import RiskGuard
from adapters.ccxt_shim.order_router import OrderRouter
from adapters.ccxt_shim.rate_limiter import RateLimiter
from freqtrade.exceptions import OperationalException
logger = logging.getLogger(__name__)
class InternalRateLimiter:
def __init__(self, rpm: int = 100, rpd: int = 5000):
self.rpm = rpm
self.rpd = rpd
self.history = deque()
self.daily_count = 0
self.last_reset = time.time()
def check_and_record(self):
now = time.time()
if now - self.last_reset > 86400:
self.daily_count = 0
self.last_reset = now
if self.daily_count >= self.rpd:
raise OperationalException("Daily rate limit exceeded (5000)")
while self.history and now - self.history[0] > 60:
self.history.popleft()
if len(self.history) >= self.rpm:
sleep_time = 60 - (now - self.history[0])
if sleep_time > 0:
logger.warning(f"Rate limit hit. Sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
now = time.time()
self.history.append(now)
self.daily_count += 1
class BreezeCCXT(ccxt.Exchange):
_mock_mode_logged = False
_mock_ohlcv_lock = threading.Lock()
@ -90,8 +63,8 @@ class BreezeCCXT(ccxt.Exchange):
self.risk_guard = RiskGuard({}) # Defaults to enabled=True, max=10
# Rate Limiting
rl_config = self.options.get("rateLimit", 100)
self.rate_limiter = InternalRateLimiter(rpm=rl_config)
# P17: Switched to centralized RateLimiter with Env support
self.rate_limiter = RateLimiter()
self._security_master_cache: dict[str, Any] | None = None
self.market_hours = MarketHoursGuard()
self.order_router = OrderRouter(lambda: self.markets)
@ -306,7 +279,7 @@ class BreezeCCXT(ccxt.Exchange):
def fetch_markets(self, params: dict | None = None):
if self.rate_limiter:
self.rate_limiter.check_and_record()
self.rate_limiter.allow("fetch_markets")
master = self._load_security_master()
whitelist = self.config.get("pair_whitelist", [])
if not whitelist:
@ -423,12 +396,13 @@ class BreezeCCXT(ccxt.Exchange):
}
def fetch_ticker(self, symbol: str, params: dict | None = None):
self.rate_limiter.allow("fetch_ticker")
if self._is_mock_mode():
return self._generate_mock_ticker(symbol)
if not self.breeze:
raise OperationalException("Breeze session not initialized.")
self.rate_limiter.check_and_record()
s_params = self._parse_symbol(symbol)
try:
res = self.breeze.get_quotes(**s_params)
@ -483,12 +457,12 @@ class BreezeCCXT(ccxt.Exchange):
limit: int | None = None,
params: dict | None = None,
):
self.rate_limiter.allow("fetch_ohlcv")
if self._is_mock_mode():
return self._generate_mock_ohlcv(symbol, timeframe, since, limit)
if not self.breeze:
raise OperationalException("Breeze session not initialized.")
self.rate_limiter.check_and_record()
s_params = self._parse_symbol(symbol)
interval = self.timeframes.get(timeframe)
if not interval:
@ -553,6 +527,7 @@ class BreezeCCXT(ccxt.Exchange):
self, symbol, order_type, side, amount, price=None, params: dict | None = None
):
logger.info(f"BreezeCCXT.create_order (Sync) called for {symbol} {side}")
self.rate_limiter.allow("create_order")
self.market_hours.assert_can_create_order(side, symbol)
# P15 Risk Guard Check
@ -623,6 +598,7 @@ class BreezeCCXT(ccxt.Exchange):
raise OperationalException("create_order not supported in real mode yet.")
def cancel_order(self, order_id, symbol=None, params: dict | None = None):
self.rate_limiter.allow("cancel_order")
self.market_hours.assert_can_cancel_order(order_id, str(symbol))
if self._is_mock_mode():

@ -0,0 +1,117 @@
import logging
import os
import time
from typing import Any
from freqtrade.exceptions import OperationalException
logger = logging.getLogger(__name__)
class RateLimiter:
"""
Token Bucket Rate Limiter for BreezeCCXT shim.
Enforces API limits locally to prevent 429s or to provide deterministic
blocking for testing.
"""
def __init__(self):
# Configuration via Environment Variables (Shim standard)
self.enabled = os.environ.get("FT_RATE_LIMIT_DISABLE", "0") != "1"
# Limit Configuration
# Default: 100 requests per minute
self.per_minute = int(os.environ.get("FT_RATE_LIMIT_PER_MINUTE", "100"))
# Mode: 'sleep' (production default) or 'block' (testing/gates)
self.mode = os.environ.get("FT_RATE_LIMIT_MODE", "sleep").lower()
# Token Bucket State
self.capacity = self.per_minute
self.tokens = float(self.capacity)
self.last_refill = time.time()
# Refill rate: tokens per second
# If per_minute=60, rate=1.0 token/sec
self.refill_rate = self.per_minute / 60.0
if self.enabled:
logger.info(
f"RateLimiter initialized: {self.per_minute}/min, "
f"Rate: {self.refill_rate:.4f} tps, Mode: {self.mode}"
)
else:
logger.warning("RateLimiter is DISABLED via FT_RATE_LIMIT_DISABLE")
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
if elapsed > 0:
added = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + added)
self.last_refill = now
def allow(self, op: str, cost: int = 1) -> None:
"""
Check if operation is allowed.
Consumes tokens if available.
If not available:
- mode='sleep': Sleeps until tokens available
- mode='block': Raises OperationalException
"""
if not self.enabled:
return
self._refill()
if self.tokens >= cost:
self.tokens -= cost
self._log_usage(op, cost)
return
# Not enough tokens
if self.mode == "block":
self._raise_block(op, cost)
else:
self._sleep_until_allowed(op, cost)
def _raise_block(self, op: str, cost: int):
# Stable token for acceptance gates
logger.warning(f"RATE_LIMIT_BLOCK: op={op} cost={cost} remaining={self.tokens:.2f}")
raise OperationalException(f"rate_limit_block: {op}")
def _sleep_until_allowed(self, op: str, cost: int):
needed = cost - self.tokens
# Time required to refill 'needed' tokens
sleep_time = needed / self.refill_rate
# Safety bound (prevent infinite sleeps if config is bad)
if sleep_time > 60:
logger.warning(f"RateLimiter request to sleep {sleep_time:.2f}s clamped to 60s")
sleep_time = 60
logger.info(
f"RATE_LIMIT_SLEEP: op={op} cost={cost} remaining={self.tokens:.2f} sleep={sleep_time:.3f}s"
)
time.sleep(sleep_time)
# After sleep, refill and consume
self._refill()
self.tokens -= cost
self._log_usage(op, cost)
def _log_usage(self, op: str, cost: int):
# Debug level to avoid spam, unless approaching limit?
# For P17 verification, might want distinct logs.
# Keeping it DEBUG for high volume, INFO for block/sleep.
logger.debug(f"RATE_LIMIT: op={op} cost={cost} remaining={self.tokens:.2f}")
def stats(self) -> dict[str, Any]:
self._refill()
return {
"enabled": self.enabled,
"mode": self.mode,
"tokens": self.tokens,
"capacity": self.capacity,
}

@ -0,0 +1,75 @@
import pytest
import os
from unittest import mock
from freqtrade.exceptions import OperationalException
from adapters.ccxt_shim.breeze_ccxt import BreezeCCXT
@pytest.fixture
def rate_limited_exchange():
"""
Fixture for BreezeCCXT with strict rate limiting enabled via environment.
"""
with mock.patch.dict(
os.environ,
{
"BREEZE_MOCK": "1",
"FT_RATE_LIMIT_PER_MINUTE": "10",
"FT_RATE_LIMIT_MODE": "block",
"FT_RATE_LIMIT_DISABLE": "0",
},
):
exposure = {"risk_guard": {"enabled": False}} # Disable risk guard to isolate rate limit
exchange = BreezeCCXT(config=exposure)
yield exchange
def test_fetch_ticker_rate_limit(rate_limited_exchange):
symbol = "RELIANCE/INR"
# First 10 calls should pass
for i in range(10):
res = rate_limited_exchange.fetch_ticker(symbol)
assert res["symbol"] == symbol
# 11th call should block
with pytest.raises(OperationalException, match="rate_limit_block: fetch_ticker"):
rate_limited_exchange.fetch_ticker(symbol)
def test_fetch_markets_rate_limit(rate_limited_exchange):
# Depending on how tests run, tokens might be shared if instance persists or class state
# Fixture creates new instance each time, which creates new RateLimiter, so fresh bucket.
# 10 calls pass
for i in range(10):
rate_limited_exchange.fetch_markets()
# 11th blocks
with pytest.raises(OperationalException, match="rate_limit_block: fetch_markets"):
rate_limited_exchange.fetch_markets()
def test_create_order_rate_limit(rate_limited_exchange):
symbol = "RELIANCE/INR"
# To pass order creation checks, we need to ensure market hours/risk guard don't block first.
# risk_guard disabled in fixture.
# We mock market hours to allow.
with mock.patch("adapters.ccxt_shim.market_hours.MarketHoursGuard.assert_can_create_order"):
# P15 risk check calls fetch_ticker, which consumes a token.
# We mock fetch_ticker to avoid double consumption and isolate create_order check.
# We need to mock it on the INSTANCE, not the class, because the instance RateLimiter is what we rely on?
# Use mock.patch.object
with mock.patch.object(
rate_limited_exchange,
"fetch_ticker",
return_value={"bid": 2490, "ask": 2510, "last": 2500},
):
# 10 calls pass
for i in range(10):
rate_limited_exchange.create_order(symbol, "limit", "buy", 1, 2500)
# 11th blocks
with pytest.raises(OperationalException, match="rate_limit_block: create_order"):
rate_limited_exchange.create_order(symbol, "limit", "buy", 1, 2500)

@ -0,0 +1,38 @@
import pytest
import os
from unittest import mock
from adapters.ccxt_shim.rate_limiter import RateLimiter
from freqtrade.exceptions import OperationalException
@pytest.fixture
def rate_limiter_block():
with mock.patch.dict(
os.environ,
{
"FT_RATE_LIMIT_PER_MINUTE": "60", # 1 token per second
"FT_RATE_LIMIT_MODE": "block",
"FT_RATE_LIMIT_DISABLE": "0",
},
):
print(f"DEBUG: Created RateLimiter with env: {os.environ.get('FT_RATE_LIMIT_MODE')}")
yield RateLimiter()
def test_block_mode_allow(rate_limiter_block):
# Should start full
assert rate_limiter_block.tokens == 60.0
# Allow 1
rate_limiter_block.allow("test_op")
assert rate_limiter_block.tokens == 59.0
def test_block_mode_exceed_limit(rate_limiter_block):
# Drain 60 tokens
for _ in range(60):
rate_limiter_block.allow("test_drain")
# 61st call should raise
with pytest.raises(OperationalException, match="rate_limit_block: test_fail"):
rate_limiter_block.allow("test_fail")

@ -0,0 +1,35 @@
import pytest
import os
import time
from unittest import mock
from adapters.ccxt_shim.rate_limiter import RateLimiter
@pytest.fixture
def rate_limiter_sleep():
with mock.patch.dict(
os.environ,
{
"FT_RATE_LIMIT_PER_MINUTE": "60", # 1 token per second
"FT_RATE_LIMIT_MODE": "sleep",
"FT_RATE_LIMIT_DISABLE": "0",
},
):
yield RateLimiter()
def test_sleep_mode_delays(rate_limiter_sleep):
# Drain 60 tokens
for _ in range(60):
rate_limiter_sleep.allow("test_drain")
start_time = time.time()
# 61st call should sleep for approx 1s (to get 1 token)
# We cheat/check token logic: refill rate is 1/sec.
# Cost 1. Needed 1. Time = 1s.
rate_limiter_sleep.allow("test_sleep")
elapsed = time.time() - start_time
assert elapsed >= 0.9 # Allow slight timing jitter
assert elapsed < 2.0 # Shouldn't sleep excessively
Loading…
Cancel
Save