Files
tokenresearch/app/capacity_api.py
2026-06-03 11:04:16 +08:00

126 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""能力开放接口客户端:获取 accessToken、签名并请求摄像头 RTSP 地址。"""
from __future__ import annotations
import hashlib
import json
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
@dataclass
class Token:
access_token: str
expires_at: float
@dataclass
class StreamUrlResult:
url: str
timings: dict[str, float]
class CapacityApiClient:
def __init__(
self,
base_url: str,
app_id: str,
app_secret: str,
account: str,
method: str,
timeout: int = 20,
):
self.base_url = base_url.rstrip("/")
self.app_id = app_id
self.app_secret = app_secret
self.account = account
self.method = method
self.timeout = timeout
self.token: Token | None = None
def get_stream_url(self, device_num: str) -> str:
return self.get_stream_url_details(device_num).url
def get_stream_url_details(self, device_num: str) -> StreamUrlResult:
timings: dict[str, float] = {}
started = time.monotonic()
access_token = self._get_access_token(timings)
timings["token_ms"] = round((time.monotonic() - started) * 1000, 2)
started = time.monotonic()
business_params = {
"account": self.account,
"deviceNum": device_num,
"isSubStream": 0,
"networkType": 1,
"urlType": 1,
}
# 接口文档要求业务参数整体放进 params JSON 字符串后再参与签名。
params = {
"accessToken": access_token,
"appId": self.app_id,
"method": self.method,
"params": json.dumps(business_params, ensure_ascii=False, separators=(",", ":")),
"timestamp": self._timestamp(),
"v": "1.0.0",
}
params["sign"] = self._sign(params)
timings["sign_ms"] = round((time.monotonic() - started) * 1000, 2)
started = time.monotonic()
data = self._get_json(f"{self.base_url}/rest", params)
timings["stream_url_ms"] = round((time.monotonic() - started) * 1000, 2)
if data.get("errorCode") != "0":
raise RuntimeError(data.get("errorMsg") or f"播放地址接口返回错误 {data.get('errorCode')}")
payload = data.get("data") or {}
stream_url = payload.get("rtspUrl") or payload.get("rtspUri")
if not stream_url:
raise RuntimeError("播放地址接口未返回 RTSP 地址")
return StreamUrlResult(stream_url, timings)
def _get_access_token(self, timings: dict[str, float] | None = None) -> str:
if self.token and time.time() < self.token.expires_at - 300:
if timings is not None:
timings["token_cache"] = 1
return self.token.access_token
data = self._get_json(
f"{self.base_url}/oauth/token",
{
"grantType": "client_credential",
"appId": self.app_id,
"appSecret": self.app_secret,
},
)
if data.get("errorCode") != "0":
raise RuntimeError(data.get("errorMsg") or f"获取 accessToken 失败 {data.get('errorCode')}")
payload = data.get("data") or {}
access_token = payload.get("accessToken")
if not access_token:
raise RuntimeError("token 接口未返回 accessToken")
self.token = Token(
access_token=access_token,
expires_at=time.time() + int(payload.get("expiresIn") or 604800),
)
return access_token
def _get_json(self, url: str, params: dict[str, Any]) -> dict[str, Any]:
query = urllib.parse.urlencode(params)
with urllib.request.urlopen(f"{url}?{query}", timeout=self.timeout) as response:
body = response.read().decode("utf-8")
return json.loads(body)
def _sign(self, params: dict[str, Any]) -> str:
# 签名规则appSecret + 按 ASCII key 排序后的 key/value + appSecret。
raw = self.app_secret + "".join(f"{key}{params[key]}" for key in sorted(params)) + self.app_secret
return hashlib.md5(raw.encode("utf-8")).hexdigest().upper()
@staticmethod
def _timestamp() -> str:
return datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S")