Source code for pyth_pandas.client

"""PythPandas — core HTTP client dataclass with DataFrame preprocessing.

The client targets the Pyth Pro Router (Pyth Lazer) REST API. Endpoint
methods live in mixins; this class provides transport, authentication,
DataFrame preprocessing, and shared infrastructure.
"""

from __future__ import annotations

import os
from dataclasses import dataclass, field
from typing import ClassVar, Self

import httpx
import pandas as pd

from pyth_pandas.exceptions import PythAPIError, PythAuthError, PythRateLimitError
from pyth_pandas.mixins import GovernanceMixin, PricesMixin
from pyth_pandas.utils import (
    DEFAULT_BOOL_COLUMNS,
    DEFAULT_DROP_COLUMNS,
    DEFAULT_INT_DATETIME_COLUMNS,
    DEFAULT_JSON_COLUMNS,
    DEFAULT_MS_INT_DATETIME_COLUMNS,
    DEFAULT_NUMERIC_COLUMNS,
    DEFAULT_STR_DATETIME_COLUMNS,
    DEFAULT_US_INT_DATETIME_COLUMNS,
    expand_column_lists,
    filter_params,
)
from pyth_pandas.utils import preprocess_dataframe as _preprocess_dataframe
from pyth_pandas.utils import preprocess_dict as _preprocess_dict


[docs] @dataclass class PythPandas(PricesMixin, GovernanceMixin): """HTTP client for the Pyth Pro Router that returns pandas DataFrames. Endpoint methods live in the mixins. This class provides the HTTP transport, bearer-token authentication, DataFrame preprocessing, and error-mapping infrastructure. Use as a context manager so the underlying connection pool is closed deterministically. Args: base_url: Pyth Pro Router base URL. Pyth runs multiple endpoints (numbered 0/1/2) for redundancy; supply whichever you have been onboarded onto. timeout: Per-request timeout in seconds. api_key: Bearer access token. Falls back to ``PYTH_API_KEY`` from the environment (after any ``load_dotenv()`` call). use_tqdm: Whether progress bars are enabled for any future paginated helpers. Example: >>> from pyth_pandas import PythPandas >>> with PythPandas() as client: ... df = client.fetch_latest_prices( ... symbols=["Crypto.BTC/USD"], ... properties=["price", "confidence", "exponent"], ... ) """ #: Upstream API version this client targets (from the OpenAPI spec). API_VERSION: ClassVar[str] = "1.0.0" #: Upstream API versions this client is known to be compatible with. SUPPORTED_API_VERSIONS: ClassVar[tuple[str, ...]] = ("1.0.0",) base_url: str = "https://pyth-lazer.dourolabs.app/v1/" timeout: int = field(default=30, repr=False) api_key: str | None = field(default=None, repr=False) use_tqdm: bool = field(default=True, repr=True) tqdm_description: str = field(default="", repr=False) # Column-coercion defaults (override per-instance to extend) numeric_columns: tuple = field(default=DEFAULT_NUMERIC_COLUMNS, repr=False) str_datetime_columns: tuple = field(default=DEFAULT_STR_DATETIME_COLUMNS, repr=False) int_datetime_columns: tuple = field(default=DEFAULT_INT_DATETIME_COLUMNS, repr=False) ms_int_datetime_columns: tuple = field(default=DEFAULT_MS_INT_DATETIME_COLUMNS, repr=False) us_int_datetime_columns: tuple = field(default=DEFAULT_US_INT_DATETIME_COLUMNS, repr=False) bool_columns: tuple = field(default=DEFAULT_BOOL_COLUMNS, repr=False) drop_columns: tuple = field(default=DEFAULT_DROP_COLUMNS, repr=False) json_columns: tuple = field(default=DEFAULT_JSON_COLUMNS, repr=False) # ── Lifecycle ─────────────────────────────────────────────────────── def __post_init__(self) -> None: if self.api_key is None: self.api_key = os.getenv("PYTH_API_KEY") if not self.base_url.endswith("/"): self.base_url = self.base_url + "/" self._client = httpx.Client(timeout=self.timeout) self._numeric_columns = expand_column_lists(self.numeric_columns) self._str_datetime_columns = expand_column_lists(self.str_datetime_columns) self._int_datetime_columns = expand_column_lists(self.int_datetime_columns) self._ms_int_datetime_columns = expand_column_lists(self.ms_int_datetime_columns) self._us_int_datetime_columns = expand_column_lists(self.us_int_datetime_columns) self._bool_columns = expand_column_lists(self.bool_columns) self._drop_columns = expand_column_lists(self.drop_columns) self._json_columns = expand_column_lists(self.json_columns)
[docs] def close(self) -> None: """Close the underlying HTTP connection pool.""" self._client.close()
def __enter__(self) -> Self: return self def __exit__(self, *_: object) -> None: self.close() # ── Preprocessing helpers (bound to instance config) ────────────────
[docs] def preprocess_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """Apply the client's column-coercion config to ``df``.""" return _preprocess_dataframe( df, numeric_columns=self._numeric_columns, str_datetime_columns=self._str_datetime_columns, int_datetime_columns=self._int_datetime_columns, ms_int_datetime_columns=self._ms_int_datetime_columns, us_int_datetime_columns=self._us_int_datetime_columns, bool_columns=self._bool_columns, drop_columns=self._drop_columns, json_columns=self._json_columns, )
[docs] def preprocess_dict(self, data: dict) -> dict: """Apply the client's column-coercion config to a single dict.""" return _preprocess_dict( data, numeric_columns=self._numeric_columns, str_datetime_columns=self._str_datetime_columns, int_datetime_columns=self._int_datetime_columns, ms_int_datetime_columns=self._ms_int_datetime_columns, us_int_datetime_columns=self._us_int_datetime_columns, bool_columns=self._bool_columns, drop_columns=self._drop_columns, json_columns=self._json_columns, )
# ── Auth guards ───────────────────────────────────────────────────── def _require_auth(self) -> None: if not self.api_key: raise PythAuthError( detail=( "Pyth Pro access token not set. Provide api_key or set " "PYTH_API_KEY in the environment." ) ) def _auth_headers(self) -> dict: return {"Authorization": f"Bearer {self.api_key}"} # ── Request helpers ───────────────────────────────────────────────── def _request( self, *, path: str, method: str = "GET", params: dict | None = None, data: dict | list | None = None, headers: dict | None = None, ) -> dict | list: """Send an unauthenticated request and return parsed JSON.""" url = self.base_url + path.lstrip("/") response = self._client.request( method, url, params=filter_params(params), json=data, headers=headers, ) return self._handle_response(response) def _request_authed( self, *, path: str, method: str = "GET", params: dict | None = None, data: dict | list | None = None, ) -> dict | list: """Send an authenticated request, raising before network if the token is missing.""" self._require_auth() return self._request( path=path, method=method, params=params, data=data, headers=self._auth_headers(), ) # ── Error handling ────────────────────────────────────────────────── def _handle_response(self, response: httpx.Response) -> dict | list: try: response.raise_for_status() except httpx.HTTPStatusError as exc: status = exc.response.status_code url = str(exc.request.url) try: detail: object = exc.response.json() except Exception: detail = exc.response.text if status in (401, 403): raise PythAuthError(status, url, detail) from exc if status == 429: raise PythRateLimitError(status, url, detail) from exc raise PythAPIError(status, url, detail) from exc if not response.content: return {} return response.json()