Coverage for app/routers/sse.py: 91%

57 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-02-19 12:46 +0000

1from fastapi import APIRouter, Request 

2from fastapi import Depends, Path 

3from fastapi.security import OAuth2PasswordBearer 

4from app.utils.check_token import check_constellation_access 

5from app.utils.response_format import generate_response, generate_error_response 

6from sse_starlette.sse import EventSourceResponse 

7import asyncio 

8import json 

9from typing import Dict, Set 

10from loguru import logger 

11 

12router = APIRouter() 

13 

14# Définir le schéma OAuth2 pour récupérer le JWT 

15oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

16 

17# Keep track of connected clients per constellation ID 

18clients: Dict[str, Set[asyncio.Queue]] = {} 

19 

20async def event_stream(constellation_uuid: str): 

21 queue = asyncio.Queue() 

22 if constellation_uuid not in clients: 

23 clients[constellation_uuid] = set() 

24 clients[constellation_uuid].add(queue) 

25 

26 try: 

27 while True: 

28 data = await queue.get() 

29 yield {"data": json.dumps(data)} 

30 except: 

31 pass 

32 finally: 

33 # Clean up on disconnect 

34 clients[constellation_uuid].remove(queue) 

35 if not clients[constellation_uuid]: 

36 del clients[constellation_uuid] 

37 

38@router.get("/sse/updates/{constellation_uuid}") 

39async def sse_updates(constellation_uuid: str = Path(..., description="The UUID of the constellation to notify"), 

40 token: str = Depends(oauth2_scheme)): 

41 test = check_constellation_access(token, constellation_uuid, "READ") 

42 if test != True: 

43 return test 

44 

45 return EventSourceResponse(event_stream(constellation_uuid)) 

46 

47# Notify only clients in the specified constellation 

48async def notify_clients(constellation_uuid: str, event_data): 

49 constellation_clients = clients.get(constellation_uuid) 

50 if not constellation_clients: 

51 return 

52 

53 for client in constellation_clients.copy(): 

54 try: 

55 await client.put(event_data) 

56 except: 

57 logger.error(f"Failed to send data to client in constellation {constellation_uuid}. Removing client.") 

58 constellation_clients.remove(client) # Remove failed client 

59 if not constellation_clients: 

60 del clients[constellation_uuid] 

61 

62@router.post("/sse/notify/{constellation_uuid}") 

63async def sse_notify(request: Request, 

64 constellation_uuid: str = Path(..., description="The UUID of the constellation to notify"), 

65 token: str = Depends(oauth2_scheme)): 

66 test = check_constellation_access(token, constellation_uuid, "READ") 

67 if test != True: 

68 return test 

69 

70 try: 

71 data = await request.json() 

72 except json.JSONDecodeError: 

73 logger.error(f"Invalid JSON data received for constellation {constellation_uuid}") 

74 return generate_error_response( 

75 status_code=400, 

76 error_code="INVALID_JSON", 

77 error_message="Invalid JSON data", 

78 message="Invalid JSON data" 

79 ) 

80 asyncio.create_task(notify_clients(constellation_uuid, data)) 

81 return generate_response( 

82 status_code=200, 

83 data=None, 

84 message="Notification sent to clients" 

85 )