Source code for pyth_pandas.mixins._prices

"""Price-fetching endpoints (``/latest_price``, ``/price``, ``/reduce_price``)."""

from __future__ import annotations

from typing import cast

import pandas as pd
from pandera.typing import DataFrame

from pyth_pandas.schemas import ParsedFeedSchema
from pyth_pandas.types import JsonUpdate
from pyth_pandas.utils import to_unix_timestamp_us

# Type aliases for documentation
_Channel = str  # "real_time" | "fixed_rate@50ms" | "fixed_rate@200ms" | "fixed_rate@1000ms"
_Format = str  # "evm" | "solana" | "leEcdsa" | "leUnsigned"
_Property = str  # "price" | "confidence" | "exponent" | ... (see PriceFeedProperty)


[docs] class PricesMixin: """Price fetch endpoints that return one row per requested feed. Each ``fetch_*`` method returns a ``DataFrame`` of parsed feeds. The accompanying on-chain payloads (EVM, Solana, …) and the update timestamp are attached as ``df.attrs``. For the raw ``JsonUpdate`` response, use the matching ``*_raw`` variant. """ def _build_price_body( self, *, properties: list[_Property], formats: list[_Format], channel: _Channel, price_feed_ids: list[int] | None, symbols: list[str] | None, parsed: bool, json_binary_encoding: str | None, timestamp: int | None = None, ) -> dict: if (price_feed_ids is None) == (symbols is None): raise ValueError("Specify exactly one of price_feed_ids or symbols.") body: dict = { "properties": list(properties), "formats": list(formats), "channel": channel, "parsed": parsed, "priceFeedIds": price_feed_ids, "symbols": symbols, } if json_binary_encoding is not None: body["jsonBinaryEncoding"] = json_binary_encoding if timestamp is not None: body["timestamp"] = timestamp return body def _update_to_dataframe(self, payload: JsonUpdate) -> DataFrame[ParsedFeedSchema]: parsed = payload.get("parsed") if isinstance(payload, dict) else None feeds = parsed.get("priceFeeds", []) if parsed else [] df = pd.DataFrame(feeds) df = self.preprocess_dataframe(df) # type: ignore[attr-defined] # `exponent` is a (usually negative) int in the API; coerce to float so # that `10 ** df["exponent"]` works without the "Integers to negative # integer powers are not allowed" numpy error. if "exponent" in df.columns: df["exponent"] = df["exponent"].astype("Float64") df.attrs["timestampUs"] = parsed.get("timestampUs") if parsed else None for fmt in ("evm", "solana", "leEcdsa", "leUnsigned"): if isinstance(payload, dict) and payload.get(fmt) is not None: df.attrs[fmt] = payload[fmt] return cast(DataFrame[ParsedFeedSchema], df) # ── /latest_price ───────────────────────────────────────────────────
[docs] def fetch_latest_prices_raw( self, *, properties: list[_Property], formats: list[_Format], channel: _Channel = "real_time", price_feed_ids: list[int] | None = None, symbols: list[str] | None = None, parsed: bool = True, json_binary_encoding: str | None = "hex", ) -> JsonUpdate: """Fetch the latest available update for the requested feeds. Args: properties: Feed properties to include (``price``, ``confidence``, ``exponent``, ...). formats: On-chain payload formats to include (``evm``, ``solana``, ``leEcdsa``, ``leUnsigned``). channel: Update channel — ``real_time`` or ``fixed_rate@*``. price_feed_ids: Numeric feed IDs. Mutually exclusive with ``symbols``. symbols: Feed symbols (e.g. ``["Crypto.BTC/USD"]``). parsed: Include the ``parsed`` block in the response. json_binary_encoding: ``"hex"`` or ``"base64"`` for binary payloads. Returns: The raw ``JsonUpdate`` dict. Raises: PythAPIError: For any non-2xx response from the upstream API. """ body = self._build_price_body( properties=properties, formats=formats, channel=channel, price_feed_ids=price_feed_ids, symbols=symbols, parsed=parsed, json_binary_encoding=json_binary_encoding, ) data = self._request_authed( # type: ignore[attr-defined] path="latest_price", method="POST", data=body ) return cast(JsonUpdate, data)
[docs] def fetch_latest_prices( self, *, properties: list[_Property], formats: list[_Format] | None = None, channel: _Channel = "real_time", price_feed_ids: list[int] | None = None, symbols: list[str] | None = None, json_binary_encoding: str | None = "hex", ) -> DataFrame[ParsedFeedSchema]: """Fetch the latest update and return it as a parsed-feed DataFrame. Same arguments as :meth:`fetch_latest_prices_raw`. The returned DataFrame has one row per feed; the on-chain payloads and ``timestampUs`` are attached on ``df.attrs``. Args: properties: Feed properties to include. formats: On-chain payload formats. Defaults to ``[]`` (parsed only). channel: Update channel. price_feed_ids: Numeric feed IDs (mutually exclusive with ``symbols``). symbols: Feed symbols (e.g. ``["Crypto.BTC/USD"]``). json_binary_encoding: ``"hex"`` or ``"base64"``. Returns: DataFrame conforming to :class:`~pyth_pandas.schemas.ParsedFeedSchema`. """ payload = self.fetch_latest_prices_raw( properties=properties, formats=formats or [], channel=channel, price_feed_ids=price_feed_ids, symbols=symbols, parsed=True, json_binary_encoding=json_binary_encoding, ) return self._update_to_dataframe(payload)
# ── /price (timestamped) ────────────────────────────────────────────
[docs] def fetch_prices_raw( self, *, timestamp: int | float | str | pd.Timestamp, properties: list[_Property], formats: list[_Format], channel: _Channel = "real_time", price_feed_ids: list[int] | None = None, symbols: list[str] | None = None, parsed: bool = True, json_binary_encoding: str | None = "hex", ) -> JsonUpdate: """Fetch updates issued at a specific point in time. Args: timestamp: Microsecond-resolution Unix timestamp. Accepts an int (interpreted as µs if >= 10**14, else seconds), a ``pd.Timestamp``, or an ISO-8601 string. properties: Feed properties to include. formats: On-chain payload formats. channel: Update channel. price_feed_ids: Numeric feed IDs. symbols: Feed symbols. parsed: Include parsed block. json_binary_encoding: Binary payload encoding. Returns: The raw ``JsonUpdate`` dict. """ ts_us = to_unix_timestamp_us(timestamp) body = self._build_price_body( properties=properties, formats=formats, channel=channel, price_feed_ids=price_feed_ids, symbols=symbols, parsed=parsed, json_binary_encoding=json_binary_encoding, timestamp=ts_us, ) data = self._request_authed(path="price", method="POST", data=body) # type: ignore[attr-defined] return cast(JsonUpdate, data)
[docs] def fetch_prices( self, *, timestamp: int | float | str | pd.Timestamp, properties: list[_Property], formats: list[_Format] | None = None, channel: _Channel = "real_time", price_feed_ids: list[int] | None = None, symbols: list[str] | None = None, json_binary_encoding: str | None = "hex", ) -> DataFrame[ParsedFeedSchema]: """DataFrame variant of :meth:`fetch_prices_raw`. Returns: DataFrame conforming to :class:`~pyth_pandas.schemas.ParsedFeedSchema`. """ payload = self.fetch_prices_raw( timestamp=timestamp, properties=properties, formats=formats or [], channel=channel, price_feed_ids=price_feed_ids, symbols=symbols, parsed=True, json_binary_encoding=json_binary_encoding, ) return self._update_to_dataframe(payload)
# ── /reduce_price ───────────────────────────────────────────────────
[docs] def reduce_price_raw( self, *, payload: JsonUpdate, price_feed_ids: list[int], ) -> JsonUpdate: """Reduce an existing on-chain payload to only the listed feeds. Args: payload: A ``JsonUpdate`` previously received from ``fetch_latest_prices_raw``, ``fetch_prices_raw``, or the WebSocket stream. price_feed_ids: Subset of feed IDs to retain. Returns: A new ``JsonUpdate`` containing only the requested feeds. """ body = {"payload": payload, "priceFeedIds": list(price_feed_ids)} data = self._request_authed( # type: ignore[attr-defined] path="reduce_price", method="POST", data=body ) return cast(JsonUpdate, data)
[docs] def reduce_price( self, *, payload: JsonUpdate, price_feed_ids: list[int], ) -> DataFrame[ParsedFeedSchema]: """DataFrame variant of :meth:`reduce_price_raw`.""" out = self.reduce_price_raw(payload=payload, price_feed_ids=price_feed_ids) return self._update_to_dataframe(out)