Coverage for app/routers/sse.py: 91%
57 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-02-19 12:46 +0000
1from fastapi import APIRouter, Request
2from fastapi import Depends, Path
3from fastapi.security import OAuth2PasswordBearer
4from app.utils.check_token import check_constellation_access
5from app.utils.response_format import generate_response, generate_error_response
6from sse_starlette.sse import EventSourceResponse
7import asyncio
8import json
9from typing import Dict, Set
10from loguru import logger
12router = APIRouter()
14# Définir le schéma OAuth2 pour récupérer le JWT
15oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
17# Keep track of connected clients per constellation ID
18clients: Dict[str, Set[asyncio.Queue]] = {}
20async def event_stream(constellation_uuid: str):
21 queue = asyncio.Queue()
22 if constellation_uuid not in clients:
23 clients[constellation_uuid] = set()
24 clients[constellation_uuid].add(queue)
26 try:
27 while True:
28 data = await queue.get()
29 yield {"data": json.dumps(data)}
30 except:
31 pass
32 finally:
33 # Clean up on disconnect
34 clients[constellation_uuid].remove(queue)
35 if not clients[constellation_uuid]:
36 del clients[constellation_uuid]
38@router.get("/sse/updates/{constellation_uuid}")
39async def sse_updates(constellation_uuid: str = Path(..., description="The UUID of the constellation to notify"),
40 token: str = Depends(oauth2_scheme)):
41 test = check_constellation_access(token, constellation_uuid, "READ")
42 if test != True:
43 return test
45 return EventSourceResponse(event_stream(constellation_uuid))
47# Notify only clients in the specified constellation
48async def notify_clients(constellation_uuid: str, event_data):
49 constellation_clients = clients.get(constellation_uuid)
50 if not constellation_clients:
51 return
53 for client in constellation_clients.copy():
54 try:
55 await client.put(event_data)
56 except:
57 logger.error(f"Failed to send data to client in constellation {constellation_uuid}. Removing client.")
58 constellation_clients.remove(client) # Remove failed client
59 if not constellation_clients:
60 del clients[constellation_uuid]
62@router.post("/sse/notify/{constellation_uuid}")
63async def sse_notify(request: Request,
64 constellation_uuid: str = Path(..., description="The UUID of the constellation to notify"),
65 token: str = Depends(oauth2_scheme)):
66 test = check_constellation_access(token, constellation_uuid, "READ")
67 if test != True:
68 return test
70 try:
71 data = await request.json()
72 except json.JSONDecodeError:
73 logger.error(f"Invalid JSON data received for constellation {constellation_uuid}")
74 return generate_error_response(
75 status_code=400,
76 error_code="INVALID_JSON",
77 error_message="Invalid JSON data",
78 message="Invalid JSON data"
79 )
80 asyncio.create_task(notify_clients(constellation_uuid, data))
81 return generate_response(
82 status_code=200,
83 data=None,
84 message="Notification sent to clients"
85 )