from __future__ import annotations import hashlib import json import time import urllib.parse import urllib.request from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any @dataclass class Token: access_token: str expires_at: float @dataclass class StreamUrlResult: url: str timings: dict[str, float] class CapacityApiClient: def __init__( self, base_url: str, app_id: str, app_secret: str, account: str, method: str, timeout: int = 20, ): self.base_url = base_url.rstrip("/") self.app_id = app_id self.app_secret = app_secret self.account = account self.method = method self.timeout = timeout self.token: Token | None = None def get_stream_url(self, device_num: str) -> str: return self.get_stream_url_details(device_num).url def get_stream_url_details(self, device_num: str) -> StreamUrlResult: timings: dict[str, float] = {} started = time.monotonic() access_token = self._get_access_token(timings) timings["token_ms"] = round((time.monotonic() - started) * 1000, 2) started = time.monotonic() business_params = { "account": self.account, "deviceNum": device_num, "isSubStream": 0, "networkType": 1, "urlType": 1, } # 接口文档要求业务参数整体放进 params JSON 字符串后再参与签名。 params = { "accessToken": access_token, "appId": self.app_id, "method": self.method, "params": json.dumps(business_params, ensure_ascii=False, separators=(",", ":")), "timestamp": self._timestamp(), "v": "1.0.0", } params["sign"] = self._sign(params) timings["sign_ms"] = round((time.monotonic() - started) * 1000, 2) started = time.monotonic() data = self._get_json(f"{self.base_url}/rest", params) timings["stream_url_ms"] = round((time.monotonic() - started) * 1000, 2) if data.get("errorCode") != "0": raise RuntimeError(data.get("errorMsg") or f"播放地址接口返回错误 {data.get('errorCode')}") payload = data.get("data") or {} stream_url = payload.get("rtspUrl") or payload.get("rtspUri") if not stream_url: raise RuntimeError("播放地址接口未返回 RTSP 地址") return StreamUrlResult(stream_url, timings) def _get_access_token(self, timings: dict[str, float] | None = None) -> str: if self.token and time.time() < self.token.expires_at - 300: if timings is not None: timings["token_cache"] = 1 return self.token.access_token data = self._get_json( f"{self.base_url}/oauth/token", { "grantType": "client_credential", "appId": self.app_id, "appSecret": self.app_secret, }, ) if data.get("errorCode") != "0": raise RuntimeError(data.get("errorMsg") or f"获取 accessToken 失败 {data.get('errorCode')}") payload = data.get("data") or {} access_token = payload.get("accessToken") if not access_token: raise RuntimeError("token 接口未返回 accessToken") self.token = Token( access_token=access_token, expires_at=time.time() + int(payload.get("expiresIn") or 604800), ) return access_token def _get_json(self, url: str, params: dict[str, Any]) -> dict[str, Any]: query = urllib.parse.urlencode(params) with urllib.request.urlopen(f"{url}?{query}", timeout=self.timeout) as response: body = response.read().decode("utf-8") return json.loads(body) def _sign(self, params: dict[str, Any]) -> str: # 签名规则:appSecret + 按 ASCII key 排序后的 key/value + appSecret。 raw = self.app_secret + "".join(f"{key}{params[key]}" for key in sorted(params)) + self.app_secret return hashlib.md5(raw.encode("utf-8")).hexdigest().upper() @staticmethod def _timestamp() -> str: return datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S")