Coverage for src/cstlcore/ydocs/router.py: 59%

202 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1import base64 

2import uuid 

3from hashlib import sha256 

4 

5import httpx 

6from fastapi import APIRouter, Depends, HTTPException, Query, Request 

7from pydantic import BaseModel 

8from sqlalchemy.exc import IntegrityError 

9from sqlmodel import Session, select 

10from rapidfuzz import fuzz 

11 

12from cstlcore.auth.dependencies import get_current_user 

13from cstlcore.collections.models import Collection, CollectionWithYdocsPublic 

14from cstlcore.constellations.dependencies import get_existing_constellation 

15from cstlcore.constellations.models import Constellation 

16from cstlcore.database.dependencies import get_session 

17from cstlcore.memberships.dependencies import ( 

18 require_read_access_collections, 

19 require_write_access_collections, 

20) 

21from cstlcore.settings import settings 

22from cstlcore.users.models import User 

23from cstlcore.ydocs.models import ( 

24 YDoc, 

25 YDocContentPublic, 

26 YDocContentUpdate, 

27 YDocCreate, 

28 YDocPublic, 

29 YDocUpdate, 

30) 

31from cstlcore.ydocs.services import ydoc_exists 

32from cstlcore.ydocs.decode_ydoc import decode_ydoc 

33 

34router = APIRouter() 

35 

36 

37class YDocWordCount(BaseModel): 

38 """Word count data for a single YDoc.""" 

39 id: uuid.UUID 

40 name: str 

41 word_count: int 

42 

43 

44class Neo4jNodeWordCount(BaseModel): 

45 """Word count data for a single Neo4j node.""" 

46 node_uuid: str 

47 title: str 

48 word_count: int 

49 

50 

51class CollectionWordCount(BaseModel): 

52 """Word count data for a collection.""" 

53 id: uuid.UUID 

54 name: str 

55 word_count: int 

56 ydoc_count: int 

57 ydocs: list[YDocWordCount] 

58 

59 

60class WordCountResponse(BaseModel): 

61 """Response model for word count endpoint.""" 

62 total_word_count: int 

63 total_ydoc_count: int 

64 total_collection_count: int 

65 total_neo4j_node_count: int 

66 total_neo4j_word_count: int 

67 collections: list[CollectionWordCount] 

68 neo4j_nodes: list[Neo4jNodeWordCount] 

69 

70 

71@router.get("/collections/{collection_id}/ydocs", response_model=list[YDocPublic]) 

72async def get_all_ydocs( 

73 collection: Collection = Depends(require_read_access_collections), 

74): 

75 ydocs = collection.files 

76 return ydocs 

77 

78 

79@router.post( 

80 "/collections/{collection_id}/ydocs", 

81 response_model=YDocPublic, 

82 status_code=201, 

83) 

84async def create_ydoc( 

85 ydoc: YDocCreate, 

86 collection: Collection = Depends(require_write_access_collections), 

87 current_user: User = Depends(get_current_user), 

88 session: Session = Depends(get_session), 

89): 

90 ydoc_data = ydoc.model_dump() 

91 parent_id = ydoc_data.get("parent_id") 

92 name = ydoc_data.get("name") 

93 if not isinstance(name, str): 

94 raise HTTPException(status_code=400, detail="name is required") 

95 if ydoc_exists(session, collection.id, name, parent_id): 

96 raise HTTPException( 

97 status_code=409, detail="YDoc with same name already exists in this parent" 

98 ) 

99 db_ydoc = YDoc.model_validate( 

100 ydoc_data, 

101 update={ 

102 "owner_id": current_user.id, 

103 "collection_id": collection.id, 

104 }, 

105 ) 

106 session.add(db_ydoc) 

107 try: 

108 session.commit() 

109 session.refresh(db_ydoc) 

110 except IntegrityError: 

111 session.rollback() 

112 raise HTTPException( 

113 status_code=409, detail="YDoc with same name already exists in this parent" 

114 ) 

115 return db_ydoc 

116 

117 

118@router.get("/collections/{collection_id}/ydocs/search", response_model=list[YDocPublic]) 

119async def search_ydocs_in_collection( 

120 search_query: str = Query(..., description="The search query"), 

121 limit: int = Query(default=100, ge=0, description="The maximum number of items to return"), 

122 page: int = Query(default=1, ge=1, description="The page number to return"), 

123 collection: Collection = Depends(require_read_access_collections), 

124 session: Session = Depends(get_session), 

125): 

126 """Search YDocs by name within a collection.""" 

127 # Get all ydocs from the DB 

128 all_ydocs = collection.files 

129 

130 # Then filter search using fuzzy matching (fuzz.partial_ratio >= 80) 

131 matched_ydocs = [] 

132 for ydoc in all_ydocs: 

133 # Calculate score based on name 

134 name_score = fuzz.partial_ratio(search_query.lower(), ydoc.name.lower()) 

135 # Get the content and calculate score 

136 ydoc_content = session.exec( 

137 select(YDoc).where(YDoc.id == ydoc.id, YDoc.collection_id == collection.id) 

138 ).first() 

139 content_decoded = ( 

140 decode_ydoc(ydoc_content.content) 

141 if ydoc_content and ydoc_content.content 

142 else "" 

143 ) 

144 # Once we have the decoded content, calculate the score 

145 content_score = fuzz.partial_ratio(search_query.lower(), content_decoded.lower()) 

146 # Keep the highest score 

147 max_score = max(name_score, content_score) 

148 if max_score >= 80: 

149 matched_ydocs.append((ydoc, max_score)) 

150 

151 # Sort by score descending 

152 matched_ydocs.sort(key=lambda x: x[1], reverse=True) 

153 

154 # Paginate 

155 start_index = (page - 1) * limit 

156 end_index = start_index + limit 

157 if start_index >= len(matched_ydocs): 

158 return [] 

159 paginated_ydocs = [ydoc for ydoc, score in matched_ydocs[start_index:end_index]] 

160 

161 return paginated_ydocs 

162 

163 

164@router.get("/collections/{collection_id}/ydocs/{ydoc_id}", response_model=YDocPublic) 

165async def get_ydoc( 

166 ydoc_id: uuid.UUID, 

167 collection: Collection = Depends(require_read_access_collections), 

168 session: Session = Depends(get_session), 

169): 

170 ydoc = session.exec( 

171 select(YDoc).where( 

172 YDoc.id == ydoc_id, 

173 YDoc.collection_id == collection.id, 

174 ) 

175 ).first() 

176 if not ydoc: 

177 raise HTTPException(status_code=404, detail="YDoc not found") 

178 return ydoc 

179 

180 

181@router.get( 

182 "/collections/{collection_id}/ydocs/{ydoc_id}/content", 

183 response_model=YDocContentPublic, 

184) 

185async def get_ydoc_content( 

186 ydoc_id: uuid.UUID, 

187 collection: Collection = Depends(require_read_access_collections), 

188 session: Session = Depends(get_session), 

189): 

190 ydoc = session.exec( 

191 select(YDoc).where(YDoc.id == ydoc_id, YDoc.collection_id == collection.id) 

192 ).first() 

193 if not ydoc: 

194 raise HTTPException(status_code=404, detail="YDoc not found") 

195 if ydoc.content is None: 

196 return {"content": None, "size": None, "checksum": None} 

197 return { 

198 "content": base64.b64encode(ydoc.content).decode("utf-8"), 

199 "size": ydoc.size, 

200 "checksum": ydoc.checksum, 

201 } 

202 

203 

204@router.patch("/collections/{collection_id}/ydocs/{ydoc_id}", response_model=YDocPublic) 

205async def update_ydoc( 

206 ydoc_id: uuid.UUID, 

207 ydoc: YDocUpdate, 

208 collection: Collection = Depends(require_write_access_collections), 

209 session: Session = Depends(get_session), 

210): 

211 db_ydoc = session.exec( 

212 select(YDoc).where(YDoc.id == ydoc_id, YDoc.collection_id == collection.id) 

213 ).first() 

214 if not db_ydoc: 

215 raise HTTPException(status_code=404, detail="YDoc not found") 

216 

217 ydoc_data = ydoc.model_dump(exclude_unset=True) 

218 # If name or parent_id is being updated, ensure uniqueness within collection 

219 new_name = ydoc_data.get("name", db_ydoc.name) 

220 new_parent = ydoc_data.get("parent_id", db_ydoc.parent_id) 

221 if ydoc_exists(session, collection.id, new_name, new_parent, exclude_id=db_ydoc.id): 

222 raise HTTPException( 

223 status_code=409, detail="YDoc with same name already exists in this parent" 

224 ) 

225 

226 db_ydoc.sqlmodel_update(ydoc_data) 

227 

228 session.add(db_ydoc) 

229 try: 

230 session.commit() 

231 session.refresh(db_ydoc) 

232 except IntegrityError: 

233 session.rollback() 

234 raise HTTPException( 

235 status_code=409, detail="YDoc with same name already exists in this parent" 

236 ) 

237 

238 return db_ydoc 

239 

240 

241@router.patch( 

242 "/collections/{collection_id}/ydocs/{ydoc_id}/content", 

243 response_model=YDocContentPublic, 

244) 

245async def update_ydoc_content( 

246 ydoc_id: uuid.UUID, 

247 data: YDocContentUpdate, 

248 collection: Collection = Depends(require_write_access_collections), 

249 session: Session = Depends(get_session), 

250): 

251 db_ydoc = session.exec( 

252 select(YDoc).where(YDoc.id == ydoc_id, YDoc.collection_id == collection.id) 

253 ).first() 

254 if not db_ydoc: 

255 raise HTTPException(status_code=404, detail="YDoc not found") 

256 # Prevent writing content to folder nodes 

257 if db_ydoc.is_folder: 

258 raise HTTPException( 

259 status_code=400, detail="Cannot set content on a folder YDoc" 

260 ) 

261 

262 if not data.content: 

263 db_ydoc.content = None 

264 db_ydoc.checksum = None 

265 db_ydoc.size = None 

266 else: 

267 # data.content is expected to be base64 encoded string 

268 try: 

269 decoded = base64.b64decode(data.content) 

270 except Exception: 

271 raise HTTPException(status_code=400, detail="Invalid base64 content") 

272 db_ydoc.content = decoded 

273 # compute checksum/size from the actual bytes stored 

274 db_ydoc.checksum = sha256(decoded).hexdigest() 

275 db_ydoc.size = len(decoded) 

276 session.add(db_ydoc) 

277 session.commit() 

278 session.refresh(db_ydoc) 

279 # Return the content metadata consistent with GET /content 

280 if db_ydoc.content is None: 

281 return {"content": None, "size": None, "checksum": None} 

282 return { 

283 "content": base64.b64encode(db_ydoc.content).decode("utf-8"), 

284 "size": db_ydoc.size, 

285 "checksum": db_ydoc.checksum, 

286 } 

287 

288 

289@router.delete("/collections/{collection_id}/ydocs/{ydoc_id}") 

290async def delete_ydoc( 

291 ydoc_id: uuid.UUID, 

292 collection: Collection = Depends(require_write_access_collections), 

293 session: Session = Depends(get_session), 

294): 

295 db_ydoc = session.exec( 

296 select(YDoc).where(YDoc.id == ydoc_id, YDoc.collection_id == collection.id) 

297 ).first() 

298 if not db_ydoc: 

299 raise HTTPException(status_code=404, detail="YDoc not found") 

300 

301 session.delete(db_ydoc) 

302 session.commit() 

303 

304 return {"ok": True, "ids": [ydoc_id]} 

305 

306 

307@router.get( 

308 "/constellations/{constellation_id}/ydocs", 

309 response_model=list[CollectionWithYdocsPublic], 

310) 

311async def get_ydocs_by_constellation( 

312 constellation: Constellation = Depends(get_existing_constellation), 

313 session: Session = Depends(get_session), 

314): 

315 """Get all YDocs in a constellation. Used by RAG. 

316 TODO: Review endpoint security and access control. 

317 Purpose is to be used by other internal services, not directly by users. 

318 """ 

319 collections = session.exec( 

320 select(Collection).where(Collection.constellation_id == constellation.id) 

321 ).all() 

322 return collections 

323 

324 

325@router.get( 

326 "/constellations/{constellation_id}/ydocs/wordcount", 

327 response_model=WordCountResponse, 

328) 

329async def get_constellation_word_count( 

330 request: Request, 

331 constellation: Constellation = Depends(get_existing_constellation), 

332 session: Session = Depends(get_session), 

333): 

334 """Get total word count across all YDocs in a constellation and Neo4j nodes.""" 

335 # Get all collections in the constellation 

336 collections = session.exec( 

337 select(Collection).where(Collection.constellation_id == constellation.id) 

338 ).all() 

339 

340 total_word_count = 0 

341 total_ydoc_count = 0 

342 collection_word_counts = [] 

343 

344 # Iterate through each collection and count words in all YDocs 

345 for collection in collections: 

346 ydocs = session.exec( 

347 select(YDoc).where(YDoc.collection_id == collection.id) 

348 ).all() 

349 

350 collection_word_count = 0 

351 collection_ydoc_count = 0 

352 ydoc_word_counts = [] 

353 

354 for ydoc in ydocs: 

355 # Only count words in non-folder YDocs with content 

356 if not ydoc.is_folder and ydoc.content: 

357 content_decoded = decode_ydoc(ydoc.content) 

358 if content_decoded: 

359 word_count = len(content_decoded.split()) 

360 collection_word_count += word_count 

361 collection_ydoc_count += 1 

362 

363 ydoc_word_counts.append(YDocWordCount( 

364 id=ydoc.id, 

365 name=ydoc.name, 

366 word_count=word_count, 

367 )) 

368 

369 total_word_count += collection_word_count 

370 total_ydoc_count += collection_ydoc_count 

371 

372 collection_word_counts.append(CollectionWordCount( 

373 id=collection.id, 

374 name=collection.name, 

375 word_count=collection_word_count, 

376 ydoc_count=collection_ydoc_count, 

377 ydocs=ydoc_word_counts, 

378 )) 

379 

380 # Fetch Neo4j nodes word count 

381 neo4j_nodes = [] 

382 neo4j_word_count = 0 

383 neo4j_node_count = 0 

384 

385 try: 

386 graph_api_url = str(settings.services.graph_api).rstrip("/") 

387 # Get authorization token from request 

388 auth_header = request.headers.get("Authorization", "") 

389 headers = {"Authorization": auth_header} if auth_header else {} 

390 

391 async with httpx.AsyncClient(timeout=30.0) as client: 

392 nodes_response = await client.get( 

393 f"{graph_api_url}/constellation/{constellation.id}/nodes", 

394 headers=headers, 

395 params={"in_filter": ["node_uuid", "title", "word_count"]}, 

396 ) 

397 

398 if nodes_response.status_code == 200: 

399 nodes_data = nodes_response.json() 

400 if nodes_data.get("success") and nodes_data.get("data"): 

401 for node in nodes_data["data"]: 

402 attributes = node.get("attributes", {}) 

403 node_uuid = attributes.get("node_uuid") 

404 title = attributes.get("title", "Untitled") 

405 word_count = attributes.get("word_count", 0) 

406 

407 if node_uuid is not None and word_count > 0: 

408 neo4j_nodes.append(Neo4jNodeWordCount( 

409 node_uuid=node_uuid, 

410 title=title, 

411 word_count=word_count, 

412 )) 

413 neo4j_word_count += word_count 

414 neo4j_node_count += 1 

415 except Exception: 

416 # If Neo4j API is unavailable, continue without Neo4j data 

417 pass 

418 

419 return WordCountResponse( 

420 total_word_count=total_word_count + neo4j_word_count, 

421 total_ydoc_count=total_ydoc_count, 

422 total_collection_count=len(collections), 

423 total_neo4j_node_count=neo4j_node_count, 

424 total_neo4j_word_count=neo4j_word_count, 

425 collections=collection_word_counts, 

426 neo4j_nodes=neo4j_nodes, 

427 ) 

428 

429 

430@router.get("/ydocs/owned", response_model=list[YDocPublic]) 

431async def get_owned_ydocs( 

432 current_user: User = Depends(get_current_user), 

433): 

434 return current_user.owned_files