Coverage for src/cstlcore/sse/sse.py: 0%

27 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1import uuid 

2from typing import AsyncGenerator 

3 

4import httpx 

5from fastapi import BackgroundTasks, Depends 

6from loguru import logger 

7from pydantic import BaseModel, Field 

8 

9from cstlcore.auth.dependencies import oauth2_scheme 

10from cstlcore.settings import settings 

11 

12 

13class SSEData(BaseModel): 

14 constellation_id: uuid.UUID | None = None 

15 payload: dict = Field(default_factory=dict) 

16 

17 

18def _notify(token: str, sse_data: SSEData) -> None: 

19 try: 

20 uri = f"{settings.services.sse}/sse/notify/{sse_data.constellation_id}" 

21 headers = { 

22 "Content-Type": "application/json", 

23 "Authorization": f"Bearer {token}", 

24 } 

25 with httpx.Client() as client: 

26 res = client.post( 

27 uri, 

28 json=sse_data.payload, 

29 headers=headers, 

30 ) 

31 if res.status_code != 200: 

32 logger.error(f"SSE Response Error: [{res.status_code}] {res.text}") 

33 

34 except httpx.RequestError as e: 

35 logger.error(f"SSE Request Error: {e}") 

36 

37 

38async def notify_sse( 

39 background_tasks: BackgroundTasks, 

40 token: str = Depends(oauth2_scheme), 

41) -> AsyncGenerator[SSEData, None]: 

42 """ 

43 Dependency to handle Server-Sent Events (SSE) notifications. 

44 This function yields an SSEData instance that can be used to collect 

45 constellation and payload data. 

46 """ 

47 sse_data = SSEData() 

48 yield sse_data 

49 if settings.services.sse and sse_data.constellation_id and sse_data.payload: 

50 background_tasks.add_task(_notify, token, sse_data) 

51 else: 

52 logger.debug( 

53 "SSE notification skipped: missing config, constellation_id, or payload." 

54 )