Coverage for app/routers/node.py: 93%

134 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-02-19 12:47 +0000

1from fastapi import APIRouter 

2from fastapi import Depends, Path, Query, Body 

3from fastapi.security import OAuth2PasswordBearer 

4from neo4j import GraphDatabase, basic_auth 

5from app.config.config import NEO4J_HOST, NEO4J_PORT, NEO4J_USER, NEO4J_PASSWORD, RESERVED_NODE_ATTRIBUTE_NAMES, RESERVED_NODE_LABEL_NAMES 

6from uuid import uuid4 

7from loguru import logger 

8 

9from app.routers.nodes import label, attribute 

10from app.utils.check_token import check_constellation_access, constellation_check_error 

11from app.utils.send_sse_notification import send_constellation_notification 

12from app.utils.response_format import generate_response, generate_error_response 

13from app.utils.decode_ydoc import decode_attributes_inplace, DecodeType, decode_ydoc 

14from app.utils.typing import Optional, List, JSONValue, serialize_value 

15 

16router = APIRouter() 

17 

18# Définir le schéma OAuth2 pour récupérer le JWT 

19oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 

20 

21# Define the Neo4j driver 

22driver = GraphDatabase.driver( 

23 f"neo4j://{NEO4J_HOST}:{NEO4J_PORT}", 

24 auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD)) 

25 

26router.include_router(label.router) 

27router.include_router(attribute.router) 

28 

29# Get all nodes 

30@router.get("/constellation/{constellation_uuid}/nodes", 

31 summary="Get all nodes", 

32 description="Get all nodes in the constellation.", 

33 response_description="All nodes in the constellation", 

34 responses={ 

35 200: { 

36 "description": "All nodes successfully returned", 

37 "content": { 

38 "application/json": { 

39 "example": { 

40 "success": True, 

41 "data": [ 

42 { 

43 "attributes": { 

44 "node_uuid": 1, 

45 "title": "example_title", 

46 "content": "example_content" 

47 }, 

48 "labels": [ 

49 "Node" 

50 ] 

51 } 

52 ], 

53 "message": "All nodes successfully returned", 

54 "error": None 

55 } 

56 } 

57 } 

58 }, 

59 **constellation_check_error, 

60 }, 

61 tags=["Node Base"] 

62) 

63async def read_nodes(constellation_uuid: str = Path(..., description="The UUID of the constellation to get the nodes from"), 

64 decode: Optional[DecodeType] = Query(default=None, description="Whether to decode the attributes or not. Options: 'xml', 'plain_text'"), 

65 in_filter: Optional[List[str]] = Query(default=None, description="Filter to include only specific attributes in the final results"), 

66 out_filter: Optional[List[str]] = Query(default=None, description="Filter to exclude specific attributes from the final results"), 

67 token: str = Depends(oauth2_scheme)): 

68 test = check_constellation_access(token, constellation_uuid, "READ") 

69 if test is not None: 

70 return test 

71 

72 if in_filter is None: 

73 in_filter = [] 

74 if out_filter is None: 

75 out_filter = [] 

76 

77 with driver.session() as session: 

78 result = session.run(""" 

79 MATCH (n {constellation_uuid: $constellation_uuid}) 

80 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

81 RETURN properties(n) AS attributes, labels(n) as labels 

82 """, 

83 constellation_uuid=constellation_uuid, 

84 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES 

85 ) 

86 final_result: JSONValue = [{"attributes": { 

87 key: serialize_value(value) 

88 for key, value in record["attributes"].items() 

89 if (len(in_filter) == 0 or key in in_filter) and (len(out_filter) == 0 or key not in out_filter) 

90 }, "labels": record["labels"]} for record in result if record["attributes"]] 

91 driver.close() 

92 

93 final_result = decode_attributes_inplace(final_result, decode) 

94 

95 return generate_response( 

96 status_code=200, 

97 data=final_result, 

98 message="All nodes successfully returned" 

99 ) 

100 

101# Get a specific node 

102@router.get("/constellation/{constellation_uuid}/node/{node_uuid}", 

103 summary="Get a node by UUID", 

104 description="Retrieve a node's details by its UUID.", 

105 response_description="Node details", 

106 responses={ 

107 200: { 

108 "description": "Node retrieved successfully", 

109 "content": { 

110 "application/json": { 

111 "example": { 

112 "success": True, 

113 "data": [ 

114 { 

115 "attributes": { 

116 "node_uuid": 1, 

117 "title": "example_title", 

118 "content": "example_content" 

119 }, 

120 "labels": [ 

121 "Node" 

122 ] 

123 } 

124 ], 

125 "message": "Node retrieved successfully", 

126 "error": None 

127 } 

128 } 

129 } 

130 }, 

131 **constellation_check_error, 

132 }, 

133 tags=["Node Base"] 

134) 

135async def read_node(constellation_uuid: str = Path(..., description="The UUID of the constellation to get the node from"), 

136 decode: Optional[DecodeType] = Query(default=None, description="Whether to decode the attributes or not. Options: 'xml', 'plain_text'"), 

137 in_filter: Optional[List[str]] = Query(default=None, description="Filter to include only specific attributes in the final results"), 

138 out_filter: Optional[List[str]] = Query(default=None, description="Filter to exclude specific attributes from the final results"), 

139 node_uuid: str = Path(..., description="The UUID of the node to get"), 

140 token: str = Depends(oauth2_scheme)): 

141 test = check_constellation_access(token, constellation_uuid, "READ") 

142 if test is not None: 

143 return test 

144 

145 if in_filter is None: 

146 in_filter = [] 

147 if out_filter is None: 

148 out_filter = [] 

149 

150 with driver.session() as session: 

151 result = session.run(""" 

152 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

153 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

154 RETURN properties(n) AS attributes, labels(n) as labels 

155 """, 

156 constellation_uuid=constellation_uuid, 

157 node_uuid=node_uuid, 

158 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES 

159 ) 

160 final_result: JSONValue = [{"attributes": { 

161 key: serialize_value(value) 

162 for key, value in record["attributes"].items() 

163 if (len(in_filter) == 0 or key in in_filter) and (len(out_filter) == 0 or key not in out_filter) 

164 }, "labels": record["labels"]} for record in result] 

165 driver.close() 

166 

167 if len(final_result) == 0: 

168 return generate_error_response( 

169 status_code=404, 

170 error_code="NODE_NOT_FOUND", 

171 error_message="Node with UUID {} not found".format(node_uuid), 

172 message="Node with UUID {} not found".format(node_uuid) 

173 ) 

174 

175 final_result = decode_attributes_inplace(final_result, decode) 

176 

177 return generate_response( 

178 status_code=200, 

179 data=final_result, 

180 message="Node retrieved successfully" 

181 ) 

182 

183# Create a new node 

184@router.post("/constellation/{constellation_uuid}/node", 

185 summary="Create a new node", 

186 description="Create a new node with the title and content given in parameter.", 

187 response_description="Node created", 

188 responses={ 

189 200: { 

190 "description": "Node created successfully", 

191 "content": { 

192 "application/json": { 

193 "example": { 

194 "success": True, 

195 "data": [ 

196 { 

197 "attributes": { 

198 "node_uuid": 1, 

199 "title": "example_title", 

200 "content": "example_content" 

201 }, 

202 "labels": [ 

203 "Node" 

204 ] 

205 } 

206 ], 

207 "message": "Node created successfully", 

208 "error": None 

209 } 

210 } 

211 } 

212 }, 

213 **constellation_check_error, 

214 }, 

215 tags=["Node Base"] 

216) 

217async def create_node(constellation_uuid: str = Path(..., description="The UUID of the constellation to create the node in"), 

218 labels: List[str] = Body(default=[], description="Labels to assign to the node."), 

219 attributes: dict[str, str] = Body(..., description="The attributes of the node to create. The node_uuid and constellation_uuid will be automatically generated and added to the node."), 

220 decode: Optional[DecodeType] = Query(default=None, description="Whether to decode the attributes or not. Options: 'xml', 'plain_text'"), 

221 in_filter: Optional[List[str]] = Query(default=None, description="Filter to include only specific attributes in the final results"), 

222 out_filter: Optional[List[str]] = Query(default=None, description="Filter to exclude specific attributes from the final results"), 

223 token: str = Depends(oauth2_scheme)): 

224 test = check_constellation_access(token, constellation_uuid, "WRITE") 

225 if test is not None: 

226 return test 

227 

228 if in_filter is None: 

229 in_filter = [] 

230 if out_filter is None: 

231 out_filter = [] 

232 

233 # Remove the reserved attribute names 

234 for key in RESERVED_NODE_ATTRIBUTE_NAMES: 

235 attributes.pop(key, None) 

236 

237 # Remove the reserved label names 

238 for reserved_label in RESERVED_NODE_LABEL_NAMES: 

239 if reserved_label in labels: 

240 labels.remove(reserved_label) 

241 

242 with driver.session() as session: 

243 wordCount = 0 

244 for key in attributes.keys(): 

245 if key not in RESERVED_NODE_ATTRIBUTE_NAMES: 

246 wordCount += len(decode_ydoc(attributes[key], True).split()) 

247 

248 result = session.run(""" 

249 CREATE (n) 

250 SET n:$($labels) 

251 SET n += $attributes 

252 SET n.created_at = datetime() 

253 SET n.updated_at = datetime() 

254 SET n.constellation_uuid = $constellation_uuid, n.node_uuid = $node_uuid 

255 SET n.word_count = $word_count 

256 SET n.label_count = size(labels(n)) 

257 SET n.link_count = 0 

258 RETURN properties(n) AS attributes, labels(n) as labels 

259 """, 

260 labels=labels, 

261 attributes=attributes, 

262 constellation_uuid=constellation_uuid, 

263 node_uuid=str(uuid4()), 

264 word_count=wordCount, 

265 ) 

266 final_result: JSONValue = [{"attributes": { 

267 key: serialize_value(value) 

268 for key, value in record["attributes"].items() 

269 if (len(in_filter) == 0 or key in in_filter) and (len(out_filter) == 0 or key not in out_filter) 

270 }, "labels": record["labels"]} for record in result] 

271 driver.close() 

272 

273 final_result = decode_attributes_inplace(final_result, decode) 

274 

275 # Send a notification to the SSE server 

276 notification_result = send_constellation_notification( 

277 constellation_uuid=constellation_uuid, 

278 data=final_result, 

279 message="Node created", 

280 data_type="NODE_CREATED", 

281 token=token 

282 ) 

283 if not notification_result: 

284 logger.warning("Create Node: Error sending notification to SSE server") 

285 

286 return generate_response( 

287 status_code=200, 

288 data=final_result, 

289 message="Node created successfully" 

290 ) 

291 

292# Update a node 

293@router.patch("/constellation/{constellation_uuid}/node/{node_uuid}", 

294 summary="Update a node", 

295 description="Update the node specified by its UUID with the title and content given in parameter.", 

296 response_description="Node updated", 

297 responses={ 

298 200: { 

299 "description": "Node updated successfully", 

300 "content": { 

301 "application/json": { 

302 "example": { 

303 "success": True, 

304 "data": [ 

305 { 

306 "attributes": { 

307 "node_uuid": 1, 

308 "title": "example_title", 

309 "content": "example_content" 

310 }, 

311 "labels": [ 

312 "Node" 

313 ] 

314 } 

315 ], 

316 "message": "Node updated successfully", 

317 "error": None 

318 } 

319 } 

320 } 

321 }, 

322 **constellation_check_error, 

323 }, 

324 tags=["Node Base"] 

325) 

326async def update_node(constellation_uuid: str = Path(..., description="The UUID of the constellation to update the node in"), 

327 node_uuid: str = Path(..., description="The UUID of the node to update"), 

328 labels: List[str] = Body(default=[], description="Labels to assign to the node."), 

329 attributes: dict[str, str] = Body(..., description="The attributes of the node to update. The node_uuid and constellation_uuid can't be changed."), 

330 decode: Optional[DecodeType] = Query(default=None, description="Whether to decode the attributes or not. Options: 'xml', 'plain_text'"), 

331 in_filter: Optional[List[str]] = Query(default=None, description="Filter to include only specific attributes in the final results"), 

332 out_filter: Optional[List[str]] = Query(default=None, description="Filter to exclude specific attributes from the final results"), 

333 token: str = Depends(oauth2_scheme)): 

334 test = check_constellation_access(token, constellation_uuid, "WRITE") 

335 if test is not None: 

336 return test 

337 

338 if in_filter is None: 

339 in_filter = [] 

340 if out_filter is None: 

341 out_filter = [] 

342 

343 # Remove the reserved attribute names 

344 for key in RESERVED_NODE_ATTRIBUTE_NAMES: 

345 attributes.pop(key, None) 

346 

347 # Remove the reserved label names 

348 for reserved_label in RESERVED_NODE_LABEL_NAMES: 

349 if reserved_label in labels: 

350 labels.remove(reserved_label) 

351 

352 with driver.session() as session: 

353 # Check if the node exists 

354 uuid_search = session.run(""" 

355 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

356 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

357 RETURN n, coalesce(n.word_count, -1) AS currentWordCount, properties(n) AS currrentAttributes 

358 """, 

359 constellation_uuid=constellation_uuid, 

360 node_uuid=node_uuid, 

361 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES 

362 ) 

363 records = [record for record in uuid_search] 

364 if len([record["n"] for record in records]) == 0: 

365 return generate_error_response( 

366 status_code=404, 

367 error_code="NODE_NOT_FOUND", 

368 error_message="Node with UUID {} not found".format(node_uuid), 

369 message="Node with UUID {} not found".format(node_uuid) 

370 ) 

371 

372 currentWordCount = [record["currentWordCount"] for record in records][0] 

373 currentAttributes = [record["currrentAttributes"] for record in records][0] 

374 

375 if currentWordCount == -1: # word_count has not been computed yet 

376 currentWordCount = 0 

377 for key in currentAttributes.keys(): # Add all current attributes word count 

378 if key not in RESERVED_NODE_ATTRIBUTE_NAMES: 

379 currentWordCount += len(decode_ydoc(currentAttributes[key], True).split()) 

380 

381 for currentKey in currentAttributes.keys(): # Subtract old attributes word count that will be removed 

382 if currentKey not in RESERVED_NODE_ATTRIBUTE_NAMES and currentKey in attributes.keys(): 

383 currentWordCount -= len(decode_ydoc(currentAttributes[currentKey], True).split()) 

384 

385 for key in attributes.keys(): # Add new attributes word count that will be added/updated 

386 if key not in RESERVED_NODE_ATTRIBUTE_NAMES: 

387 currentWordCount += len(decode_ydoc(attributes[key], True).split()) 

388 

389 

390 # Only change the labels if some are provided 

391 if len(labels) > 0: 

392 result = session.run(""" 

393 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

394 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

395 REMOVE n:$(labels(n)) 

396 SET n:$($labels) 

397 SET n.created_at = coalesce(n.created_at, datetime()) 

398 SET n.updated_at = datetime() 

399 SET n.word_count = $word_count 

400 SET n.label_count = size(labels(n)) 

401 SET n += $attributes 

402 RETURN properties(n) AS attributes, labels(n) as labels 

403 """, 

404 labels=labels, 

405 attributes=attributes, 

406 constellation_uuid=constellation_uuid, 

407 node_uuid=node_uuid, 

408 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES, 

409 word_count=currentWordCount, 

410 ) 

411 # Just update the attributes since no labels were provided 

412 else: 

413 result = session.run(""" 

414 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

415 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

416 SET n.created_at = coalesce(n.created_at, datetime()) 

417 SET n.updated_at = datetime() 

418 SET n.word_count = $word_count 

419 SET n += $attributes 

420 RETURN properties(n) AS attributes, labels(n) as labels 

421 """, 

422 attributes=attributes, 

423 constellation_uuid=constellation_uuid, 

424 node_uuid=node_uuid, 

425 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES, 

426 word_count=currentWordCount, 

427 ) 

428 final_result: JSONValue = [{"attributes": { 

429 key: serialize_value(value) 

430 for key, value in record["attributes"].items() 

431 if (len(in_filter) == 0 or key in in_filter) and (len(out_filter) == 0 or key not in out_filter) 

432 }, "labels": record["labels"]} for record in result] 

433 driver.close() 

434 

435 final_result = decode_attributes_inplace(final_result, decode) 

436 

437 # Send a notification to the SSE server 

438 notification_result = send_constellation_notification( 

439 constellation_uuid=constellation_uuid, 

440 data=final_result, 

441 message="Node updated", 

442 data_type="NODE_UPDATED", 

443 token=token 

444 ) 

445 if not notification_result: 

446 logger.warning("Update Node: Error sending notification to SSE server") 

447 

448 return generate_response( 

449 status_code=200, 

450 data=final_result, 

451 message="Node updated successfully" 

452 ) 

453 

454# Delete a node 

455@router.delete("/constellation/{constellation_uuid}/node/{node_uuid}", 

456 summary="Delete a node", 

457 description="Delete the node specified by its UUID.", 

458 response_description="Node deleted", 

459 responses={ 

460 200: { 

461 "description": "Node deleted successfully", 

462 "content": { 

463 "application/json": { 

464 "example": { 

465 "success": True, 

466 "data": None, 

467 "message": "Node deleted successfully", 

468 "error": None 

469 } 

470 } 

471 } 

472 }, 

473 **constellation_check_error, 

474 }, 

475 tags=["Node Base"] 

476) 

477async def delete_node(constellation_uuid: str = Path(..., description="The UUID of the constellation to delete the node from"), 

478 node_uuid: str = Path(..., description="The UUID of the node to delete"), 

479 token: str = Depends(oauth2_scheme)): 

480 test = check_constellation_access(token, constellation_uuid, "WRITE") 

481 if test is not None: 

482 return test 

483 

484 with driver.session() as session: 

485 # Check if the node exists 

486 uuid_search = session.run(""" 

487 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

488 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

489 RETURN n 

490 """, 

491 constellation_uuid=constellation_uuid, 

492 node_uuid=node_uuid, 

493 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES 

494 ) 

495 if len([record["n"] for record in uuid_search]) == 0: 

496 return generate_error_response( 

497 status_code=404, 

498 error_code="NODE_NOT_FOUND", 

499 error_message="Node with UUID {} not found".format(node_uuid), 

500 message="Node with UUID {} not found".format(node_uuid) 

501 ) 

502 

503 session.run(""" 

504 MATCH (n {constellation_uuid: $constellation_uuid, node_uuid: $node_uuid}) 

505 WHERE ALL(label IN $RESERVED_NODE_LABEL_NAMES WHERE NOT label IN labels(n)) 

506 OPTIONAL MATCH (n)--(m) 

507 WITH n, collect(DISTINCT m) AS neighbors 

508 DETACH DELETE n 

509 FOREACH (neighbor IN neighbors | 

510 SET neighbor.link_count = COUNT { (neighbor)--() } 

511 ) 

512 """, 

513 constellation_uuid=constellation_uuid, 

514 node_uuid=node_uuid, 

515 RESERVED_NODE_LABEL_NAMES=RESERVED_NODE_LABEL_NAMES 

516 ) 

517 driver.close() 

518 

519 # Send a notification to the SSE server 

520 notification_result = send_constellation_notification( 

521 constellation_uuid=constellation_uuid, 

522 data={ 

523 "node_uuid": node_uuid, 

524 "message": "Node deleted successfully" 

525 }, 

526 message="Node deleted", 

527 data_type="NODE_DELETED", 

528 token=token 

529 ) 

530 if not notification_result: 

531 logger.warning("Delete Node: Error sending notification to SSE server") 

532 

533 return generate_response( 

534 status_code=200, 

535 data=None, 

536 message="Node deleted successfully" 

537 )