"""FastAPI 入口模块:提供页面、视频流、状态、设备切换和 WebSocket 接口。""" from __future__ import annotations import asyncio from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from app.capacity_api import CapacityApiClient from app.config import load_settings, mask_url from app.detector import DetrVehicleDetector from app.device_manager import DeviceManager from app.stream_worker import StreamWorker settings = load_settings() api_client = CapacityApiClient( base_url=settings.api_base_url, app_id=settings.app_id, app_secret=settings.app_secret, account=settings.device_account, method=settings.stream_method, ) detector = DetrVehicleDetector( model_name=settings.detr_model, confidence=settings.confidence, vehicle_labels=settings.vehicle_labels, ) device_manager = DeviceManager( path=settings.device_list_path, api_client=api_client, fallback_url=settings.stream_url, ) def resolve_stream_url() -> str: return device_manager.resolve_stream_url() worker = StreamWorker( stream_url=resolve_stream_url, detector=detector, frame_skip=settings.frame_skip, jpeg_quality=settings.jpeg_quality, resize_width=settings.resize_width, ) video_grid_devices = device_manager.get_video_grid_devices() video_grid_workers = { device.device_num: StreamWorker( stream_url=lambda device_num=device.device_num: device_manager.resolve_stream_url_for(device_num), detector=detector, frame_skip=settings.frame_skip, jpeg_quality=settings.jpeg_quality, resize_width=settings.resize_width, ) for device in video_grid_devices } app = FastAPI(title="DETR 动态打标") app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") def display_model_name(model_name: str) -> str: return model_name.rsplit("/", 1)[-1] @app.on_event("startup") def startup() -> None: worker.start() for grid_worker in video_grid_workers.values(): grid_worker.start() @app.on_event("shutdown") def shutdown() -> None: worker.stop() for grid_worker in video_grid_workers.values(): grid_worker.stop() @app.get("/", response_class=HTMLResponse) def index(request: Request) -> HTMLResponse: return templates.TemplateResponse( "index.html", { "request": request, "model": display_model_name(settings.detr_model), "device": detector.device_name, "stream_url": f"设备号:{device_manager.get_snapshot()['current_device_num']}", "video_grid_devices": video_grid_devices, }, ) @app.get("/tokenizer", response_class=HTMLResponse) def tokenizer(request: Request) -> HTMLResponse: return templates.TemplateResponse( "tokenizer.html", { "request": request, "model": display_model_name(settings.detr_model), "device": detector.device_name, }, ) @app.get("/tokenizer/state") def tokenizer_state() -> JSONResponse: snapshot = worker.get_snapshot() frame = worker.get_frame_rgb() if frame is None: return JSONResponse( { "ready": False, "frame_id": snapshot["frame_id"], "connected": snapshot["connected"], "error": snapshot["error"] or "等待视频帧", } ) data = detector.inspect_tokens(frame) data.update( { "ready": True, "frame_id": snapshot["frame_id"], "updated_at": snapshot["updated_at"], "connected": snapshot["connected"], "error": snapshot["error"], } ) return JSONResponse(data) @app.get("/video") def video() -> StreamingResponse: return stream_video(worker) @app.get("/video/{device_num}") def video_device(device_num: str) -> StreamingResponse: grid_worker = video_grid_workers.get(device_num) if grid_worker is None: raise HTTPException(status_code=404, detail="设备不在视频网格中") return stream_video(grid_worker) def stream_video(stream_worker: StreamWorker) -> StreamingResponse: async def generate(): while True: frame = stream_worker.get_jpeg() if frame is None: await asyncio.sleep(0.1) continue yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n" await asyncio.sleep(0.03) return StreamingResponse( generate(), media_type="multipart/x-mixed-replace; boundary=frame", ) @app.get("/detections") def detections() -> JSONResponse: snapshot = worker.get_snapshot() return JSONResponse( { "frame_id": snapshot["frame_id"], "updated_at": snapshot["updated_at"], "detections": snapshot["detections"], } ) @app.get("/status") def status() -> JSONResponse: snapshot = worker.get_snapshot() device_snapshot = device_manager.get_snapshot() timings = dict(device_snapshot["source_timings"]) # 合并取流地址和 OpenCV 读流耗时,前端按同一个 timings 对象展示。 timings.update(snapshot["timings"]) return JSONResponse( { "running": snapshot["running"], "connected": snapshot["connected"], "frame_id": snapshot["frame_id"], "updated_at": snapshot["updated_at"], "fps": snapshot["fps"], "error": snapshot["error"], "source": mask_url(device_snapshot["current_url"]) if device_snapshot["current_url"] else "等待获取播放地址", "model": display_model_name(settings.detr_model), "device": detector.device_name, "frame_skip": settings.frame_skip, "confidence": settings.confidence, "devices": device_snapshot["devices"], "current_device_num": device_snapshot["current_device_num"], "timings": timings, } ) @app.post("/devices/{device_num}") def switch_device(device_num: str) -> JSONResponse: try: version = device_manager.set_current_device(device_num) except ValueError as exc: print(f"[device-switch] invalid device={device_num}", flush=True) raise HTTPException(status_code=404, detail=str(exc)) from exc worker.reconnect() print(f"[device-switch] accepted device={device_num} version={version}", flush=True) return JSONResponse({"current_device_num": device_num, "version": version}) @app.websocket("/ws/detections") async def websocket_detections(websocket: WebSocket) -> None: await websocket.accept() try: while True: data = worker.get_snapshot() device_snapshot = device_manager.get_snapshot() data.update( { "devices": device_snapshot["devices"], "current_device_num": device_snapshot["current_device_num"], "source": mask_url(device_snapshot["current_url"]) if device_snapshot["current_url"] else "等待获取播放地址", "timings": {**device_snapshot["source_timings"], **data["timings"]}, } ) await websocket.send_json(data) await asyncio.sleep(0.5) except WebSocketDisconnect: return