diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index 6372147c2..909fd0c74 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -6,6 +6,8 @@ import logging import time import typing from collections import OrderedDict +from datetime import datetime +from typing import Dict, Tuple import numpy as np import pandas as pd @@ -17,7 +19,7 @@ from freqtrade.exceptions import DependencyException logger = logging.getLogger(__name__) # Global cache dictionary -cached_grouped_trades: OrderedDict[pd.Timestamp, pd.DataFrame] = OrderedDict() +cached_grouped_trades_per_pair: Dict[str, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]] = {} def _init_dataframe_with_trades_columns(dataframe: pd.DataFrame): @@ -61,7 +63,9 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): df.drop(columns=["datetime"], inplace=True) -def populate_dataframe_with_trades(config, dataframe, trades): +def populate_dataframe_with_trades( + pair: str, config, dataframe: pd.DataFrame, trades: pd.DataFrame +): """ Populates a dataframe with trades :param dataframe: Dataframe to populate @@ -76,6 +80,9 @@ def populate_dataframe_with_trades(config, dataframe, trades): _init_dataframe_with_trades_columns(dataframe) try: + cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame] = ( + cached_grouped_trades_per_pair.get(pair, OrderedDict()) + ) start_time = time.time() # calculate ohlcv candle start and end _calculate_ohlcv_candle_start_and_end(trades, timeframe) @@ -114,7 +121,9 @@ def populate_dataframe_with_trades(config, dataframe, trades): trades_series.loc[indices] = [trades_grouped_df] # Use caching mechanism if (candle_start, candle_next) in cached_grouped_trades: - cache_entry = cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] + cache_entry = cached_grouped_trades[ + (typing.cast(datetime, candle_start), candle_next) + ] # dataframe.loc[is_between] = cache_entry # doesn't take, so we need workaround: # Create a dictionary of the column values to be assigned update_dict = {c: cache_entry[c].iat[0] for c in cache_entry.columns} @@ -170,9 +179,9 @@ def populate_dataframe_with_trades(config, dataframe, trades): dataframe.loc[indices, "total_trades"] = len(trades_grouped_df) # Cache the result - cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = dataframe.loc[ - is_between - ].copy() + cached_grouped_trades[(typing.cast(datetime, candle_start), candle_next)] = ( + dataframe.loc[is_between].copy() + ) # Maintain cache size if len(cached_grouped_trades) > cache_size: @@ -187,6 +196,10 @@ def populate_dataframe_with_trades(config, dataframe, trades): dataframe["imbalances"] = imbalances_series dataframe["stacked_imbalances_bid"] = stacked_imbalances_bid_series dataframe["stacked_imbalances_ask"] = stacked_imbalances_ask_series + # dereference old cache + if pair in cached_grouped_trades_per_pair: + del cached_grouped_trades_per_pair[pair] + cached_grouped_trades_per_pair[pair] = cached_grouped_trades except Exception as e: logger.exception("Error populating dataframe with trades") @@ -195,7 +208,9 @@ def populate_dataframe_with_trades(config, dataframe, trades): return dataframe -def trades_to_volumeprofile_with_total_delta_bid_ask(trades: pd.DataFrame, scale: float) -> pd.DataFrame: +def trades_to_volumeprofile_with_total_delta_bid_ask( + trades: pd.DataFrame, scale: float +) -> pd.DataFrame: """ :param trades: dataframe :param scale: scale aka bin size e.g. 0.5 diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 43e507768..528a3647e 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -1601,7 +1601,7 @@ class IStrategy(ABC, HyperStrategyMixin): config = self.config config["timeframe"] = self.timeframe # TODO: slice trades to size of dataframe for faster backtesting - dataframe = populate_dataframe_with_trades(config, dataframe, trades) + dataframe = populate_dataframe_with_trades(metadata["pair"], config, dataframe, trades) logger.debug("Populated dataframe with trades.") diff --git a/tests/data/test_converter_public_trades.py b/tests/data/test_converter_public_trades.py index 1cb43bc9a..a9d1d2698 100644 --- a/tests/data/test_converter_public_trades.py +++ b/tests/data/test_converter_public_trades.py @@ -54,7 +54,7 @@ def reset_cache(request): import freqtrade.data.converter.orderflow as orderflow global orderflow # noqa F811 - orderflow.cached_grouped_trades = OrderedDict() + orderflow.cached_grouped_trades_per_pair = {} yield @@ -110,7 +110,7 @@ def test_public_trades_mock_populate_dataframe_with_trades__check_orderflow( }, } # Apply the function to populate the data frame with order flow data - df = populate_dataframe_with_trades(config, dataframe, trades) + df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades) # Extract results from the first row of the DataFrame results = df.iloc[0] t = results["trades"] @@ -223,7 +223,7 @@ def test_public_trades_trades_mock_populate_dataframe_with_trades__check_trades( } # Populate the DataFrame with trades and order flow data - df = populate_dataframe_with_trades(config, dataframe, trades) + df = populate_dataframe_with_trades("BTC/UDST", config, dataframe, trades) # --- DataFrame and Trade Data Validation --- @@ -389,7 +389,9 @@ def test_public_trades_config_max_trades( }, } - df = populate_dataframe_with_trades(default_conf | orderflow_config, dataframe, trades) + df = populate_dataframe_with_trades( + "BTC/UDST", default_conf | orderflow_config, dataframe, trades + ) assert df.delta.count() == 1