Coverage for src/cstlcore/sse/sse.py: 0%
27 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
1import uuid
2from typing import AsyncGenerator
4import httpx
5from fastapi import BackgroundTasks, Depends
6from loguru import logger
7from pydantic import BaseModel, Field
9from cstlcore.auth.dependencies import oauth2_scheme
10from cstlcore.settings import settings
13class SSEData(BaseModel):
14 constellation_id: uuid.UUID | None = None
15 payload: dict = Field(default_factory=dict)
18def _notify(token: str, sse_data: SSEData) -> None:
19 try:
20 uri = f"{settings.services.sse}/sse/notify/{sse_data.constellation_id}"
21 headers = {
22 "Content-Type": "application/json",
23 "Authorization": f"Bearer {token}",
24 }
25 with httpx.Client() as client:
26 res = client.post(
27 uri,
28 json=sse_data.payload,
29 headers=headers,
30 )
31 if res.status_code != 200:
32 logger.error(f"SSE Response Error: [{res.status_code}] {res.text}")
34 except httpx.RequestError as e:
35 logger.error(f"SSE Request Error: {e}")
38async def notify_sse(
39 background_tasks: BackgroundTasks,
40 token: str = Depends(oauth2_scheme),
41) -> AsyncGenerator[SSEData, None]:
42 """
43 Dependency to handle Server-Sent Events (SSE) notifications.
44 This function yields an SSEData instance that can be used to collect
45 constellation and payload data.
46 """
47 sse_data = SSEData()
48 yield sse_data
49 if settings.services.sse and sse_data.constellation_id and sse_data.payload:
50 background_tasks.add_task(_notify, token, sse_data)
51 else:
52 logger.debug(
53 "SSE notification skipped: missing config, constellation_id, or payload."
54 )