perf: Optimize data processing, exchange caching, and RPC

- Optimize `ohlcv_to_dataframe` timestamp handling and type casting.
- Optimize `clean_ohlcv_dataframe` to skip sorting.
- Cache `Exchange.get_quote_currencies`.
- Optimize `json_to_dataframe` using `rapidjson`.
- Optimize `Telegram._logs` string concatenation.
- Update `tests/data/test_converter.py` to match optimized input expectations.

Co-authored-by: Corax-CoLAB <239841157+Corax-CoLAB@users.noreply.github.com>
pull/12809/head
google-labs-jules[bot] 4 months ago
parent a915ac5ed5
commit f74e5100c7

@ -36,23 +36,13 @@ def ohlcv_to_dataframe(
"""
logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
cols = DEFAULT_DATAFRAME_COLUMNS
df = DataFrame(ohlcv, columns=cols)
# Use float dtype to avoid astype conversion later and handle int volume/prices
df = DataFrame(ohlcv, columns=cols, dtype="float")
# Floor date to seconds to account for exchange imprecisions
df["date"] = to_datetime(df["date"], unit="ms", utc=True).dt.floor("s")
# Some exchanges return int values for Volume and even for OHLC.
# Convert them since TA-LIB indicators used in the strategy assume floats
# and fail with exception...
df = df.astype(
dtype={
"open": "float",
"high": "float",
"low": "float",
"close": "float",
"volume": "float",
}
)
# Optimization: Integer arithmetic is faster than datetime conversion
df["date"] = to_datetime(df["date"] // 1000 * 1000, unit="ms", utc=True)
return clean_ohlcv_dataframe(
df, timeframe, pair, fill_missing=fill_missing, drop_incomplete=drop_incomplete
)
@ -75,7 +65,7 @@ def clean_ohlcv_dataframe(
:return: DataFrame
"""
# group by index and aggregate results to eliminate duplicate ticks
data = data.groupby(by="date", as_index=False, sort=True).agg(
data = data.groupby(by="date", as_index=False, sort=False).agg(
{
"open": "first",
"high": "max",

@ -245,6 +245,7 @@ class Exchange:
# Cached timeframes
self._timeframes: list[str] | None = None
self._quote_currencies_cache: list[str] | None = None
# Holds public_trades
self._trades: dict[PairWithTimeframe, DataFrame] = {}
@ -556,8 +557,11 @@ class Exchange:
"""
Return a list of supported quote currencies
"""
if self._quote_currencies_cache is not None:
return self._quote_currencies_cache
markets = self.markets
return sorted(set([x["quote"] for _, x in markets.items()]))
self._quote_currencies_cache = sorted(set([x["quote"] for _, x in markets.items()]))
return self._quote_currencies_cache
def get_pair_quote_currency(self, pair: str) -> str:
"""Return a pair's quote currency (base/quote:settlement)"""
@ -725,6 +729,7 @@ class Exchange:
self._ws_async.options = self._api.options
self._last_markets_refresh = dt_ts()
self._timeframes = None
self._quote_currencies_cache = None
if is_initial and self._ft_has["needs_trading_fees"]:
self._trading_fees = self.fetch_trading_fees()

@ -197,7 +197,18 @@ def json_to_dataframe(data: str) -> pd.DataFrame:
:param data: A JSON string
:returns: A pandas DataFrame from the JSON string
"""
dataframe = pd.read_json(StringIO(data), orient="split")
try:
# Optimize parsing using rapidjson directly
json_dict = rapidjson.loads(data)
dataframe = pd.DataFrame(
json_dict["data"],
columns=json_dict["columns"],
index=json_dict["index"]
)
except (ValueError, KeyError, rapidjson.JSONDecodeError):
# Fallback to pandas if structure is not matching 'split' or other errors
dataframe = pd.read_json(StringIO(data), orient="split")
if "date" in dataframe.columns:
dataframe["date"] = pd.to_datetime(dataframe["date"], unit="ms", utc=True)

@ -1895,7 +1895,8 @@ class Telegram(RPCHandler):
except (TypeError, ValueError, IndexError):
limit = 10
logs = RPC._rpc_get_logs(limit)["logs"]
msgs = ""
msgs_list = []
current_len = 0
msg_template = "*{}* {}: {} \\- `{}`"
for logrec in logs:
msg = msg_template.format(
@ -1904,16 +1905,20 @@ class Telegram(RPCHandler):
escape_markdown(logrec[3], version=2),
escape_markdown(logrec[4], version=2),
)
if len(msgs + msg) + 10 >= MAX_MESSAGE_LENGTH:
# Add 1 for the newline character
msg_len = len(msg) + 1
if current_len + msg_len + 10 >= MAX_MESSAGE_LENGTH:
# Send message immediately if it would become too long
await self._send_msg(msgs, parse_mode=ParseMode.MARKDOWN_V2)
msgs = msg + "\n"
await self._send_msg("".join(msgs_list), parse_mode=ParseMode.MARKDOWN_V2)
msgs_list = [msg + "\n"]
current_len = msg_len
else:
# Append message to messages to send
msgs += msg + "\n"
msgs_list.append(msg + "\n")
current_len += msg_len
if msgs:
await self._send_msg(msgs, parse_mode=ParseMode.MARKDOWN_V2)
if msgs_list:
await self._send_msg("".join(msgs_list), parse_mode=ParseMode.MARKDOWN_V2)
@authorized_only
async def _help(self, update: Update, context: CallbackContext) -> None:

@ -198,10 +198,16 @@ def test_ohlcv_fill_up_missing_data2(caplog):
)
def test_ohlcv_to_dataframe_multi(timeframe):
data = generate_test_data(timeframe, 180)
# Convert DataFrame to list of lists (simulating ccxt output)
# Date needs to be converted to int64 ms timestamp
ohlcv_data = data.copy()
ohlcv_data["date"] = ohlcv_data["date"].astype(np.int64) // 1000 // 1000
ohlcv_list = ohlcv_data.values.tolist()
assert len(data) == 180
df = ohlcv_to_dataframe(data, timeframe, "UNITTEST/USDT")
df = ohlcv_to_dataframe(ohlcv_list, timeframe, "UNITTEST/USDT")
assert len(df) == len(data) - 1
df1 = ohlcv_to_dataframe(data, timeframe, "UNITTEST/USDT", drop_incomplete=False)
df1 = ohlcv_to_dataframe(ohlcv_list, timeframe, "UNITTEST/USDT", drop_incomplete=False)
assert len(df1) == len(data)
assert data.equals(df1)
@ -211,7 +217,13 @@ def test_ohlcv_to_dataframe_multi(timeframe):
else:
# Shift by half a timeframe
data1.loc[:, "date"] = data1.loc[:, "date"] + (pd.to_timedelta(timeframe) / 2)
df2 = ohlcv_to_dataframe(data1, timeframe, "UNITTEST/USDT")
# Prepare data1 for ohlcv_to_dataframe
ohlcv_data1 = data1.copy()
ohlcv_data1["date"] = ohlcv_data1["date"].astype(np.int64) // 1000 // 1000
ohlcv_list1 = ohlcv_data1.values.tolist()
df2 = ohlcv_to_dataframe(ohlcv_list1, timeframe, "UNITTEST/USDT")
assert len(df2) == len(data) - 1
tfs = timeframe_to_seconds(timeframe)

Loading…
Cancel
Save