Source code for ccxt_pandas.wrappers.async_ccxt_pandas_multi_exchange
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
import ccxt.pro as ccxt_pro
from ccxt_pandas.wrappers.async_ccxt_pandas_exchange import AsyncCCXTPandasExchange
[docs]
@dataclass
class AsyncCCXTPandasMultiExchange:
"""
Manages multiple CCXT Pro-based asynchronous exchanges, returning task lists
for unified concurrent execution across exchanges.
Attributes:
exchange_names (tuple): Exchange identifiers to initialize.
exchanges (dict[str, AsyncCCXTPandasExchange]): Exchange ID → async client mapping.
"""
exchange_names: tuple = ()
exchanges: dict[str, AsyncCCXTPandasExchange] | None = None
def __post_init__(self):
if self.exchanges is None:
self.exchanges = {}
for exchange_id in self.exchange_names:
exchange_class = getattr(ccxt_pro, exchange_id)
exchange = exchange_class()
self.exchanges[exchange_id] = AsyncCCXTPandasExchange(
exchange=exchange, exchange_name=exchange_id
)
def __getattr__(self, method_name) -> Callable[[tuple[Any, ...], dict[str, Any]], list]:
def wrapper_function(*args, **kwargs) -> list:
tasks = []
for name, exchange in self.exchanges.items():
method = getattr(exchange, method_name)
task = method(*args, **kwargs)
if isinstance(task, list):
tasks += task
else:
tasks.append(task)
return tasks
return wrapper_function
[docs]
def close(self) -> list:
return [exchange.close() for exchange in self.exchanges.values()]