Source code for ccxt_pandas.wrappers.async_ccxt_pandas_multi_account

from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

import ccxt.pro as ccxt
import pandas as pd
from pandas import DataFrame

from ccxt_pandas.wrappers.async_ccxt_pandas_exchange import AsyncCCXTPandasExchange
from ccxt_pandas.wrappers.method_mappings import orders_methods
from ccxt_pandas.wrappers.models import ExchangeClientConfig


[docs] @dataclass class AsyncCCXTPandasMultiAccount: """ Manages multiple asynchronous CCXT exchange clients, enabling unified multi-account operations that return task lists for concurrent execution. Attributes: accounts (dict[str, ExchangeClientConfig]): Account name → config mapping. clients (dict[str, AsyncCCXTPandasExchange]): Account name → async client mapping. """ accounts: dict[str, ExchangeClientConfig] = None clients: dict[str, AsyncCCXTPandasExchange] = None def __post_init__(self): if self.clients is None: self.clients = {} for account_name, account in self.accounts.items(): exchange_class = getattr(ccxt, account["exchange"]) exchange = exchange_class( { k: v for k, v in account.items() if k not in ["account", "exchange", "sandboxMode"] } ) exchange.set_sandbox_mode(account["sandboxMode"]) self.clients[account_name] = AsyncCCXTPandasExchange( exchange=exchange, exchange_name=exchange.id, account_name=account_name, ) def __getattr__( self, method_name: str ) -> Callable[[tuple[Any, ...], dict[str, Any]], list[Callable[..., DataFrame]]]: def wrapper(*args, **kwargs) -> list[Callable[..., pd.DataFrame]]: tasks = [] if method_name in orders_methods: orders = kwargs.pop("orders") for account, account_orders in orders.groupby("account"): client = self.clients[account] method = getattr(client, method_name) tasks_kwargs = { **kwargs, "orders": account_orders.drop( columns=["account", "exchange"], errors="ignore" ), } task = method(*args, **tasks_kwargs) if isinstance(task, list): tasks += task else: tasks.append(task) else: for account, client in self.clients.items(): method = getattr(client, method_name) task = method(*args, **kwargs) if isinstance(task, list): tasks += task else: tasks.append(task) return tasks return wrapper
[docs] def close(self) -> list: return [client.close() for client in self.clients.values()]