Source code for pyth_pandas.async_client

"""Async wrapper for PythPandas.

Wraps the synchronous client via composition, running each method in a
``ThreadPoolExecutor`` for non-blocking behavior in asyncio contexts.
All public methods from ``PythPandas`` are auto-generated as async
wrappers — new mixin methods become available without changes here.

Usage:
    >>> async with AsyncPythPandas() as client:
    ...     df = await client.fetch_latest_prices(symbols=["Crypto.BTC/USD"], properties=["price"])
"""

from __future__ import annotations

import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor
from typing import Self

from pyth_pandas.client import PythPandas

# Methods that are sync-only, properties, or internal — do not wrap.
_SKIP = frozenset(
    {
        "close",
        "preprocess_dataframe",
        "preprocess_dict",
    }
)


def _make_async_wrapper(method_name: str):
    async def wrapper(self, *args, **kwargs):
        loop = asyncio.get_running_loop()
        fn = getattr(self._sync, method_name)
        return await loop.run_in_executor(self._executor, functools.partial(fn, *args, **kwargs))

    wrapper.__name__ = method_name
    wrapper.__qualname__ = f"AsyncPythPandas.{method_name}"
    sync_method = getattr(PythPandas, method_name, None)
    if sync_method is not None:
        annotations = getattr(sync_method, "__annotations__", {})
        if "return" in annotations:
            wrapper.__annotations__ = {"return": annotations["return"]}
    return wrapper


[docs] class AsyncPythPandas: """Async version of :class:`~pyth_pandas.PythPandas`. Accepts the same constructor arguments. Internally creates a synchronous ``PythPandas`` instance and runs its methods in a thread pool. """ def __init__(self, *, max_workers: int = 10, **kwargs): self._sync = PythPandas(**kwargs) self._executor = ThreadPoolExecutor(max_workers=max_workers) @property def base_url(self) -> str: return self._sync.base_url @property def api_key(self) -> str | None: return self._sync.api_key
[docs] async def close(self) -> None: self._sync.close() self._executor.shutdown(wait=False)
async def __aenter__(self) -> Self: return self async def __aexit__(self, *_: object) -> None: await self.close() def __repr__(self) -> str: return f"Async{self._sync!r}"
def _populate_async_methods() -> None: for name in dir(PythPandas): if name.startswith("_") or name in _SKIP: continue attr = getattr(PythPandas, name, None) # Use callable() not inspect.isfunction so cachetools.cachedmethod descriptors are caught if not callable(attr): continue if hasattr(AsyncPythPandas, name): continue wrapper = _make_async_wrapper(name) wrapper.__doc__ = getattr(attr, "__doc__", None) setattr(AsyncPythPandas, name, wrapper) _populate_async_methods()