Coverage for src/cstlcore/maps/router.py: 80%

71 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1import shutil 

2import uuid 

3 

4from fastapi import APIRouter, Depends, File, HTTPException 

5from fastapi.responses import FileResponse 

6from loguru import logger 

7from sqlmodel import Session, select 

8from cstlcore.memberships.dependencies import require_read_access 

9from cstlcore.constellations.models import Constellation 

10from cstlcore.collections.models import Collection 

11from cstlcore.database.dependencies import get_session 

12from cstlcore.maps.models import Map, MapCreate, MapPublic, MapUpdate 

13from cstlcore.maps.services import verify_and_store_map 

14from cstlcore.memberships.dependencies import ( 

15 require_read_access_maps, 

16 require_write_access_collections, 

17 require_write_access_maps, 

18) 

19from cstlcore.settings import settings 

20 

21router = APIRouter() 

22 

23 

24@router.get( 

25 "/maps/{map_id}/tiles/{zoom}/{x}/{y}.jpg", 

26 dependencies=[Depends(require_read_access_maps)], 

27) 

28async def get_map_tile_jpg(map_id: uuid.UUID, zoom: int, x: int, y: int): 

29 tile_path = ( 

30 settings.fs.map_directory / str(map_id) / str(zoom) / str(x) / f"{y}.jpg" 

31 ) 

32 logger.debug(f"Fetching tile from path: {tile_path}") 

33 if not tile_path.exists(): 

34 raise HTTPException(status_code=404, detail="Tile not found") 

35 return FileResponse(tile_path, media_type="image/jpeg") 

36 

37 

38@router.get( 

39 "/maps/{map_id}/tiles/{zoom}/{x}/{y}.png", 

40 dependencies=[Depends(require_read_access_maps)], 

41) 

42async def get_map_tile_png(map_id: uuid.UUID, zoom: int, x: int, y: int): 

43 tile_path = ( 

44 settings.fs.map_directory / str(map_id) / str(zoom) / str(x) / f"{y}.png" 

45 ) 

46 logger.debug(f"Fetching tile from path: {tile_path}") 

47 if not tile_path.exists(): 

48 raise HTTPException(status_code=404, detail="Tile not found") 

49 return FileResponse(tile_path, media_type="image/png") 

50 

51 

52@router.post( 

53 "/collections/{collection_id}/maps", response_model=MapPublic, status_code=201 

54) 

55async def create_map( 

56 _map: MapCreate = File(), 

57 collection: Collection = Depends(require_write_access_collections), 

58 session: Session = Depends(get_session), 

59): 

60 data = _map.model_dump(exclude={"file"}) 

61 db_map = Map.model_validate(data, update={"collection_id": collection.id}) 

62 

63 try: 

64 verify_and_store_map(_map.file, db_map.id) 

65 except ValueError as e: 

66 logger.error(f"Error storing map: {e}") 

67 raise HTTPException(status_code=400, detail=f"{e}") 

68 

69 session.add(db_map) 

70 session.commit() 

71 session.refresh(db_map) 

72 

73 return db_map 

74 

75 

76@router.get("/maps/{map_id}", response_model=MapPublic) 

77async def get_map( 

78 map: Map = Depends(require_read_access_maps), 

79): 

80 return map 

81 

82 

83@router.get("/maps/{map_id}/layer/base", response_model=dict) 

84async def get_map_base_layer( 

85 map: Map = Depends(require_read_access_maps), 

86): 

87 return map.base_layer 

88 

89 

90@router.delete("/maps/{map_id}", status_code=204) 

91async def delete_map( 

92 map_id: uuid.UUID, 

93 session: Session = Depends(get_session), 

94): 

95 db_map = session.get(Map, map_id) 

96 if not db_map: 

97 raise HTTPException(status_code=404, detail="Map not found") 

98 

99 session.delete(db_map) 

100 session.commit() 

101 

102 map_path = settings.fs.map_directory / str(db_map.id) 

103 

104 if map_path.exists(): 

105 shutil.rmtree(map_path) 

106 

107 

108@router.patch( 

109 "/maps/{map_id}", 

110 response_model=MapPublic, 

111) 

112async def update_map( 

113 _map: MapUpdate, 

114 map: Map = Depends(require_write_access_maps), 

115 session: Session = Depends(get_session), 

116): 

117 data = _map.model_dump(exclude_unset=True) 

118 

119 map.sqlmodel_update(data) 

120 

121 session.add(map) 

122 session.commit() 

123 session.refresh(map) 

124 

125 return map 

126 

127@router.get( 

128 "/constellations/{constellation_id}/maps", 

129 response_model=list[MapPublic], 

130) 

131async def get_all_maps_in_constellation( 

132 constellation: Constellation = Depends(require_read_access), 

133 session: Session = Depends(get_session), 

134): 

135 statement = ( 

136 select(Map) 

137 .join(Collection) 

138 .where(Collection.constellation_id == constellation.id) 

139 ) 

140 maps = session.exec(statement).all() 

141 return maps