import inspect
from collections.abc import Callable
from dataclasses import dataclass, field
from functools import wraps
from typing import Any, Literal
import ccxt
import pandas as pd
from cachetools.func import ttl_cache
from ccxt_pandas.utils.ccxt_pandas_exchange_typed import CCXTPandasExchangeTyped
from ccxt_pandas.utils.pandas_utils import (
FunctionHandler,
concat_results,
merge_markets_with_balances,
preprocess_order,
preprocess_order_dataframe,
timestamp_to_int,
)
from ccxt_pandas.utils.utils import exchange_has_method
from ccxt_pandas.wrappers.base_processor import BaseProcessor
from ccxt_pandas.wrappers.exchange_parsers import get_parser
from ccxt_pandas.wrappers.method_mappings import (
modified_methods,
single_order_methods,
symbol_order_methods,
)
[docs]
@dataclass
class CCXTPandasExchange(CCXTPandasExchangeTyped):
"""
CCXTPandasExchange is a wrapper for the CCXT library that integrates with Pandas
to provide streamlined data processing for cryptocurrency exchanges. It enables users
to seamlessly create orders, fetch market data, and process exchange responses as
Pandas DataFrames.
Supports advanced features including:
- Multi-symbol operations: pass a list of symbols to fetch data for all at once
- Date range pagination: use from_date/to_date to paginate through historical data
- Caching: use cache=True to incrementally build a local cache of fetched data
- Order DataFrame batching: use _from_dataframe methods to batch order operations
- Error handling modes: "raise", "warn", or "ignore" errors from exchange calls
Attributes:
exchange (ccxt.Exchange): An instance of the CCXT exchange client.
exchange_name (str | None): The name of the exchange to interact with.
account_name (str | None): The account name, if required for tracking.
dropna_fields (bool): Determines whether empty (NaN) columns are removed from DataFrame outputs.
attach_trades_to_orders (bool): Determines whether trades are attached to orders when processing orders.
max_order_cost (float): Maximum cost value for any single order.
max_number_of_orders (int): Maximum number of orders to process in a single operation.
markets_cache_time (int): Cache duration (in seconds) for markets data.
errors (str): Error handling mode: "raise", "warn", or "ignore".
cost_out_of_range (str): Behavior when cost exceeds ranges: "warn" or "clip".
amount_out_of_range (str): Behavior when amount exceeds ranges: "warn" or "clip".
price_out_of_range (str): Behavior when price exceeds ranges: "warn" or "clip".
"""
exchange: ccxt.Exchange = field(default_factory=ccxt.binance)
exchange_name: str | None = None
account_name: str | None = None
dropna_fields: bool = True
attach_trades_to_orders: bool = False
max_order_cost: float = 10_000
max_number_of_orders: int = 1_000
markets_cache_time: int = 3600
errors: Literal["ignore", "raise", "warn"] = "raise"
cost_out_of_range: Literal["warn", "clip"] = "warn"
amount_out_of_range: Literal["warn", "clip"] = "warn"
price_out_of_range: Literal["warn", "clip"] = "warn"
validate_schemas: bool = False
strict_validation: bool = False
_ccxt_processor: BaseProcessor = field(default_factory=BaseProcessor, init=False, repr=False)
_function_handler: FunctionHandler = field(
default_factory=FunctionHandler, init=False, repr=False
)
_signature_cache: dict = field(default_factory=dict, init=False, repr=False)
def __post_init__(self):
if self.exchange_name is None:
self.exchange_name = self.exchange.id
self._ccxt_processor = BaseProcessor(
exchange_name=self.exchange_name,
account_name=self.account_name,
dropna_fields=self.dropna_fields,
attach_trades_to_orders=self.attach_trades_to_orders,
cost_out_of_range=self.cost_out_of_range,
amount_out_of_range=self.amount_out_of_range,
price_out_of_range=self.price_out_of_range,
validate_schemas=self.validate_schemas,
strict_validation=self.strict_validation,
)
self._function_handler = FunctionHandler(errors=self.errors)
def _analyze_method_signature(self, name: str) -> dict:
if name not in self._signature_cache:
exchange = super().__getattribute__("exchange")
func = getattr(exchange, name)
sig = inspect.signature(func)
self._signature_cache[name] = sig.parameters
return self._signature_cache[name]
def _make_base_call(self, method_name: str):
original_method = getattr(self.exchange, method_name)
@wraps(original_method)
def base_call(*args, **kwargs) -> dict | pd.DataFrame:
if method_name in single_order_methods:
kwargs["amount"], kwargs["price"] = preprocess_order(
exchange=self.exchange,
symbol=kwargs["symbol"],
order_type=kwargs["type"],
amount=kwargs.get("amount"),
price=kwargs.get("price"),
cost=kwargs.get("cost"),
markets=self.load_cached_markets(),
max_cost=self.max_order_cost,
cost_out_of_range=self.cost_out_of_range,
amount_out_of_range=self.amount_out_of_range,
price_out_of_range=self.price_out_of_range,
)
if "cost" in kwargs:
kwargs.pop("cost")
elif method_name in symbol_order_methods:
kwargs["orders"] = kwargs["orders"][["id", "symbol"]].to_dict("records")
if "since" in kwargs:
kwargs["since"] = timestamp_to_int(kwargs["since"])
result = original_method(*args, **kwargs)
result = self._ccxt_processor.preprocess_outputs(
method_name=method_name, result=result, symbol=kwargs.get("symbol")
)
return result
return base_call
def __getattribute__(self, method_name: str) -> Callable:
if method_name not in modified_methods:
# Intercept implicit (undocumented) exchange methods that are not in
# modified_methods and not defined on the wrapper class itself.
# These are camelCase methods generated by CCXT (e.g. dapiDataGetTakerBuySellVol).
# If an exchange-specific parser is registered, wrap the call so the raw
# response is automatically converted to a typed DataFrame.
exchange = super().__getattribute__("exchange")
exchange_method = getattr(exchange, method_name, None)
if (
exchange_method is not None
and callable(exchange_method)
and not hasattr(type(self), method_name)
):
processor = super().__getattribute__("_ccxt_processor")
parser = get_parser(processor.exchange_name)
if parser is not None:
@wraps(exchange_method)
def implicit_wrapper(*args, **kwargs):
result = exchange_method(*args, **kwargs)
return parser(processor, result, method_name=method_name)
return implicit_wrapper
return super().__getattribute__(method_name)
param_names = self._analyze_method_signature(method_name)
base_call = self._make_base_call(method_name)
supports_symbol = "symbol" in param_names
supports_code = "code" in param_names
supports_since = "since" in param_names
def wrapper(*args, **kwargs) -> pd.DataFrame | None | Any:
cache = kwargs.pop("cache", False)
symbols = kwargs.pop("symbol", kwargs.pop("code", []))
if not isinstance(symbols, (list, tuple, set)):
symbols = [symbols]
from_date = kwargs.pop("from_date", None)
to_date = kwargs.pop("to_date", None)
if symbols and (supports_symbol or supports_code):
symbol_column: Literal["code", "symbol"] = "code" if supports_code else "symbol"
if cache:
cache_attr = method_name.replace("fetch_", "")
try:
current_cache = object.__getattribute__(self, cache_attr)
except AttributeError:
current_cache = pd.DataFrame()
updated = self._function_handler.load_multi_symbol_dataset_into_cache(
function=base_call,
data=current_cache,
symbols=symbols,
symbol_column=symbol_column,
from_date=from_date,
to_date=to_date,
**kwargs,
)
setattr(self, cache_attr, updated)
return updated.copy()
elif supports_since and from_date:
return self._function_handler.load_full_multi_symbol_dataset(
function=base_call,
symbols=symbols,
symbol_column=symbol_column,
from_date=from_date,
to_date=to_date,
**kwargs,
)
else:
return self._function_handler.load_multi_symbol_dataset(
function=base_call,
symbols=symbols,
symbol_column=symbol_column,
**kwargs,
)
elif supports_since and from_date:
if cache:
cache_attr = method_name.replace("fetch_", "")
try:
current_cache = object.__getattribute__(self, cache_attr)
except AttributeError:
current_cache = pd.DataFrame()
updated = self._function_handler.load_dataset_into_cache(
function=base_call,
data=current_cache,
symbols=symbols,
from_date=from_date,
to_date=to_date,
**kwargs,
)
setattr(self, cache_attr, updated)
return updated.copy()
else:
return self._function_handler.load_full_dataset(
function=base_call,
symbols=symbols,
from_date=from_date,
to_date=to_date,
**kwargs,
)
return self._function_handler.try_function(function=base_call, **kwargs)
return wrapper
[docs]
def create_order_from_dataframe(self, orders: pd.DataFrame, **kwargs) -> pd.DataFrame:
base_call = self._make_base_call("create_order")
return self._function_handler.loop_through_orders(
function=base_call,
orders=orders,
max_number_of_orders=self.max_number_of_orders,
**kwargs,
)
[docs]
def edit_order_from_dataframe(self, orders: pd.DataFrame, **kwargs) -> pd.DataFrame:
base_call = self._make_base_call("edit_order")
return self._function_handler.loop_through_orders(
function=base_call,
orders=orders,
max_number_of_orders=self.max_number_of_orders,
**kwargs,
)
[docs]
def cancel_order_from_dataframe(self, orders: pd.DataFrame, **kwargs) -> pd.DataFrame:
base_call = self._make_base_call("cancel_order")
return self._function_handler.loop_through_orders(
function=base_call,
orders=orders,
max_number_of_orders=self.max_number_of_orders,
**kwargs,
)
[docs]
def create_orders_from_dataframe(
self, orders: pd.DataFrame, chunk_size: int = 5, **kwargs
) -> pd.DataFrame:
orders = preprocess_order_dataframe(
orders=orders,
markets=self.load_cached_markets(),
max_orders=self.max_number_of_orders,
max_cost=self.max_order_cost,
cost_out_of_range=self.cost_out_of_range,
amount_out_of_range=self.amount_out_of_range,
price_out_of_range=self.price_out_of_range,
)
order_dicts = self._ccxt_processor.orders_to_dict(
orders=orders,
exchange=self.exchange,
)
base_call = self._make_base_call("create_orders")
results = []
for i in range(0, len(order_dicts), chunk_size):
chunk = order_dicts[i : i + chunk_size]
result = self._function_handler.try_function(
function=base_call,
orders=chunk,
**kwargs,
)
results.append(result)
return concat_results(results)
[docs]
def edit_orders_from_dataframe(
self, orders: pd.DataFrame, chunk_size: int = 5, **kwargs
) -> pd.DataFrame:
orders = preprocess_order_dataframe(
orders=orders,
markets=self.load_cached_markets(),
max_orders=self.max_number_of_orders,
max_cost=self.max_order_cost,
cost_out_of_range=self.cost_out_of_range,
amount_out_of_range=self.amount_out_of_range,
price_out_of_range=self.price_out_of_range,
)
order_dicts = self._ccxt_processor.orders_to_dict(
orders=orders,
exchange=self.exchange,
)
base_call = self._make_base_call("edit_orders")
results = []
for i in range(0, len(order_dicts), chunk_size):
chunk = order_dicts[i : i + chunk_size]
result = self._function_handler.try_function(
function=base_call,
orders=chunk,
**kwargs,
)
results.append(result)
return concat_results(results)
[docs]
def cancel_orders_from_dataframe(self, orders: pd.DataFrame, **kwargs) -> pd.DataFrame:
base_call = self._make_base_call("cancel_orders")
return self._function_handler.call_per_group_concat(
function=base_call,
orders=orders,
**kwargs,
)
[docs]
def load_cached_markets(self, params: dict | None = None) -> pd.DataFrame:
@ttl_cache(ttl=self.markets_cache_time)
def _cached_load_markets() -> pd.DataFrame:
return self.load_markets(reload=True, params=params or {})
return _cached_load_markets()
[docs]
def fetch_markets_with_balances(
self,
markets_params: dict | None = None,
balance_params: dict | None = None,
reload_markets: bool = False,
) -> pd.DataFrame:
return merge_markets_with_balances(
markets=self.load_markets(params=markets_params or {}, reload=reload_markets),
balance=self.fetch_balance(params=balance_params or {}),
)
[docs]
def has_method(self, method_name: str) -> bool:
return exchange_has_method(self.exchange, method_name)