Files
tokenresearch/app/device_manager.py
2026-06-03 11:04:16 +08:00

94 lines
3.3 KiB
Python

"""摄像头管理模块:加载设备列表、切换当前设备并解析播放地址。"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from app.capacity_api import CapacityApiClient
@dataclass(frozen=True)
class Device:
name: str
device_num: str
class DeviceManager:
def __init__(self, path: str, api_client: CapacityApiClient, fallback_url: str = ""):
self.devices = self._load_devices(path)
self.api_client = api_client
self.fallback_url = fallback_url
self.lock = threading.Lock()
self.current_device_num = self.devices[0].device_num if self.devices else ""
self.current_url = fallback_url
self.timings: dict[str, float] = {}
self.updated_at = 0.0
self.version = 0
def set_current_device(self, device_num: str) -> int:
if device_num not in {device.device_num for device in self.devices}:
raise ValueError("设备不在 devicelist.env 中")
with self.lock:
self.current_device_num = device_num
self.current_url = ""
self.timings = {}
self.updated_at = time.time()
self.version += 1
return self.version
def resolve_stream_url(self) -> str:
with self.lock:
device_num = self.current_device_num
version = self.version
if not device_num:
if self.fallback_url:
return self.fallback_url
raise RuntimeError("devicelist.env 中没有可用设备号")
result = self.api_client.get_stream_url_details(device_num)
with self.lock:
# 避免旧摄像头的慢接口响应覆盖用户刚切换的新选择。
if version != self.version or device_num != self.current_device_num:
return self.current_url
self.current_url = result.url
self.timings = dict(result.timings)
self.updated_at = time.time()
return result.url
def get_snapshot(self) -> dict[str, Any]:
with self.lock:
return {
"devices": [device.__dict__ for device in self.devices],
"current_device_num": self.current_device_num,
"current_url": self.current_url,
"source_timings": dict(self.timings),
"source_updated_at": self.updated_at,
}
@staticmethod
def _load_devices(path: str) -> list[Device]:
devices: list[Device] = []
file_path = Path(path)
if not file_path.exists():
return devices
for line in file_path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if "=" in stripped:
name, value = stripped.split("=", 1)
values = [item.strip() for item in value.split(",") if item.strip()]
display_name = name.strip()
else:
values = [stripped]
display_name = "摄像头"
for device_num in values:
name = display_name if len(values) == 1 else f"摄像头 {len(devices) + 1}"
devices.append(Device(name, device_num))
return devices