Coverage for app/utils/send_sse_notification.py: 73%

11 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-02-19 12:47 +0000

1import requests 

2from app.config.config import SSE_HOSTNAME 

3from app.utils.typing import JSONObject, JSONValue 

4 

5def send_constellation_notification(constellation_uuid: str, data: JSONObject | JSONValue, message: str, data_type: str, token: str | None = None) -> bool: 

6 try: 

7 response = requests.post( 

8 f"{SSE_HOSTNAME}/sse/notify/{constellation_uuid}", 

9 headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}" } if token else { "Content-Type": "application/json" }, 

10 json={ 

11 "success": True, 

12 "data_type": data_type, 

13 "data": data, 

14 "message": message 

15 }, 

16 ) 

17 if response.status_code != 200: 

18 return False 

19 except requests.exceptions.RequestException: 

20 return False 

21 return True