Coverage for src/cstlcore/admin/services.py: 13%

247 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1""" 

2Admin services for dashboard and management operations. 

3""" 

4 

5import asyncio 

6from datetime import datetime 

7from typing import Any, Dict, Optional, List, Tuple 

8 

9import httpx 

10from sqlmodel import Session, desc, func, select 

11 

12from cstlcore.assets.models import Asset 

13from cstlcore.collections.models import Collection 

14from cstlcore.constellations.models import Constellation 

15from cstlcore.memberships.models import AccessEnum, ConstellationMembership 

16from cstlcore.settings import settings 

17from cstlcore.users.models import User 

18from cstlcore.ydocs.models import YDoc 

19 

20 

21# ============================================================================= 

22# Graph Theory Metrics Functions 

23# ============================================================================= 

24 

25 

26def compute_graph_density(n: int, m: int) -> float: 

27 """ 

28 Compute graph density using the formula: D = 2M / N(N-1) 

29 

30 Args: 

31 n: Number of nodes (elements) 

32 m: Number of unique undirected edges (relations) 

33 

34 Returns: 

35 Graph density in range [0, 1]. Returns 0 if N < 2. 

36 """ 

37 if n < 2: 

38 return 0.0 

39 return (2 * m) / (n * (n - 1)) 

40 

41 

42def compute_average_degree(n: int, m: int) -> float: 

43 """ 

44 Compute average degree using the formula: k = 2M / N 

45 

46 Args: 

47 n: Number of nodes (elements) 

48 m: Number of unique undirected edges (relations) 

49 

50 Returns: 

51 Average degree. Returns 0 if N < 1. 

52 """ 

53 if n < 1: 

54 return 0.0 

55 return (2 * m) / n 

56 

57 

58def interpret_density(density: float) -> Dict[str, str]: 

59 """ 

60 Provide heuristic interpretation of graph density for narrative analysis. 

61 

62 Thresholds (from graph.md specification): 

63 - 0.00 – 0.05: Sparse, linear narrative 

64 - 0.05 – 0.15: Structured but simple 

65 - 0.15 – 0.30: Rich, readable narrative (target range) 

66 - > 0.30: Highly interconnected, potential complexity overload 

67 

68 Args: 

69 density: Graph density value 

70 

71 Returns: 

72 Dictionary with level and description 

73 """ 

74 if density < 0.05: 

75 return "sparse" 

76 elif density < 0.15: 

77 return "simple" 

78 elif density < 0.30: 

79 return "rich" 

80 else: 

81 return "complex" 

82 

83def count_unique_undirected_edges(links: List[Dict[str, Any]]) -> int: 

84 """ 

85 Count unique undirected edges from a list of Neo4j links. 

86 

87 Rules (from graph.md specification): 

88 - Treat the graph as undirected 

89 - Do not count self-loops 

90 - Do not count duplicate edges 

91 

92 The Neo4j API already deduplicates links by their internal ID, 

93 but we need to filter self-loops. 

94 

95 Args: 

96 links: List of link dictionaries with start_node and end_node 

97 

98 Returns: 

99 Number of unique undirected edges (excluding self-loops) 

100 """ 

101 unique_edges = set() 

102 for link in links: 

103 start = link.get("start_node") 

104 end = link.get("end_node") 

105 

106 # Skip self-loops 

107 if start == end: 

108 continue 

109 

110 # Normalize edge direction for undirected graph 

111 edge = tuple(sorted([start, end])) 

112 unique_edges.add(edge) 

113 

114 return len(unique_edges) 

115 

116 

117async def fetch_neo4j_graph_data( 

118 constellation_id: str, token: str 

119) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: 

120 """ 

121 Fetch nodes and links from Neo4j API for a constellation. 

122 

123 Args: 

124 constellation_id: UUID of the constellation 

125 token: JWT token for authentication 

126 

127 Returns: 

128 Tuple of (nodes_list, links_list) 

129 

130 Raises: 

131 httpx.HTTPError: If Neo4j API request fails 

132 """ 

133 graph_api_url = str(settings.services.graph_api).rstrip("/") 

134 headers = {"Authorization": f"Bearer {token}"} 

135 

136 async with httpx.AsyncClient(timeout=30.0) as client: 

137 # Fetch nodes and links in parallel 

138 nodes_response, links_response = await asyncio.gather( 

139 client.get( 

140 f"{graph_api_url}/constellation/{constellation_id}/nodes", 

141 headers=headers, 

142 ), 

143 client.get( 

144 f"{graph_api_url}/constellation/{constellation_id}/links", 

145 headers=headers, 

146 ), 

147 ) 

148 

149 nodes_response.raise_for_status() 

150 links_response.raise_for_status() 

151 

152 nodes_data = nodes_response.json() 

153 links_data = links_response.json() 

154 

155 nodes = nodes_data.get("data", []) if nodes_data.get("success") else [] 

156 links = links_data.get("data", []) if links_data.get("success") else [] 

157 

158 return nodes, links 

159 

160 

161async def get_constellation_graph_metrics( 

162 session: Session, constellation_id: str, token: str 

163) -> Dict[str, Any]: 

164 """ 

165 Get graph theory metrics for a single constellation. 

166 

167 Args: 

168 session: Database session 

169 constellation_id: UUID of the constellation 

170 token: JWT token for Neo4j API authentication 

171 

172 Returns: 

173 Dictionary with graph metrics 

174 """ 

175 try: 

176 nodes, links = await fetch_neo4j_graph_data(constellation_id, token) 

177 

178 n = len(nodes) 

179 m = count_unique_undirected_edges(links) 

180 

181 density = compute_graph_density(n, m) 

182 avg_degree = compute_average_degree(n, m) 

183 interpretation = interpret_density(density) 

184 return { 

185 "constellationId": constellation_id, 

186 "elementCount": n, 

187 "relationCount": m, 

188 "graphDensity": round(density, 4), 

189 "averageDegree": round(avg_degree, 2), 

190 "interpretationLevel": interpretation, 

191 } 

192 

193 except httpx.HTTPError as e: 

194 # Return zero metrics if Neo4j API fails 

195 return { 

196 "constellationId": constellation_id, 

197 "elementCount": 0, 

198 "relationCount": 0, 

199 "graphDensity": 0.0, 

200 "averageDegree": 0.0, 

201 "interpretationLevel": interpret_density(0.0), 

202 "error": f"Failed to fetch graph data: {str(e)}", 

203 } 

204 

205 

206async def get_all_constellations_graph_metrics( 

207 session: Session, token: str 

208) -> Dict[str, Any]: 

209 """ 

210 Get graph theory metrics for all constellations (bulk endpoint). 

211 

212 Args: 

213 session: Database session 

214 token: JWT token for Neo4j API authentication 

215 

216 Returns: 

217 Dictionary with bulk metrics data 

218 """ 

219 # Get all constellations 

220 constellations = ( 

221 session.query(Constellation) 

222 .order_by(desc(Constellation.created_at)) 

223 .all() 

224 ) 

225 

226 metrics_list = [] 

227 total_density = 0.0 

228 total_elements = 0 

229 total_relations = 0 

230 

231 for constellation in constellations: 

232 try: 

233 nodes, links = await fetch_neo4j_graph_data(str(constellation.id), token) 

234 

235 n = len(nodes) 

236 m = count_unique_undirected_edges(links) 

237 density = compute_graph_density(n, m) 

238 avg_degree = compute_average_degree(n, m) 

239 interpretation = interpret_density(density) 

240 

241 metrics_list.append( 

242 { 

243 "constellationId": str(constellation.id), 

244 "constellationName": constellation.name, 

245 "elementCount": n, 

246 "relationCount": m, 

247 "graphDensity": round(density, 4), 

248 "averageDegree": round(avg_degree, 2), 

249 "interpretationLevel": interpretation, 

250 } 

251 ) 

252 

253 total_density += density 

254 total_elements += n 

255 total_relations += m 

256 

257 except httpx.HTTPError: 

258 # Include constellation with zero metrics on error 

259 metrics_list.append( 

260 { 

261 "constellationId": str(constellation.id), 

262 "constellationName": constellation.name, 

263 "elementCount": 0, 

264 "relationCount": 0, 

265 "graphDensity": 0.0, 

266 "averageDegree": 0.0, 

267 "interpretationLevel": "sparse", 

268 } 

269 ) 

270 

271 count = len(metrics_list) 

272 return { 

273 "constellations": metrics_list, 

274 "total": count, 

275 "averageDensity": round(total_density / max(1, count), 4), 

276 "averageElements": round(total_elements / max(1, count), 2), 

277 "averageRelations": round(total_relations / max(1, count), 2), 

278 } 

279 

280 

281def get_admin_configuration_info() -> Dict[str, Any]: 

282 """ 

283 Get current admin configuration information. 

284 """ 

285 return { 

286 "admin_emails_configured": list(settings.admin.emails) 

287 if settings.admin.emails 

288 else [], 

289 "total_admin_emails": len(settings.admin.emails) 

290 if settings.admin.emails 

291 else 0, 

292 "environment_variable": "ADMIN__EMAILS", 

293 "format": "Comma-separated email addresses", 

294 "note": "Admin privileges are granted based on email matching, not database roles", 

295 } 

296 

297 

298def get_dashboard_stats(session: Session) -> Dict[str, Any]: 

299 """Get overview statistics for the admin dashboard""" 

300 

301 # Get basic counts 

302 total_users = session.query(func.count(User.id)).scalar() 

303 total_constellations = session.query(func.count(Constellation.id)).scalar() 

304 total_collections = session.query(func.count(Collection.id)).scalar() 

305 total_assets = session.query(func.count(Asset.id)).scalar() 

306 

307 # Get verified users count 

308 verified_users = ( 

309 session.query(func.count(User.id)).filter(User.email_verified).scalar() 

310 ) 

311 

312 # Calculate today's new users (placeholder - would need created_at field) 

313 new_users_today = 0 # TODO: Add created_at field to User model 

314 

315 # Calculate today's new constellations 

316 today = datetime.now().date() 

317 new_constellations_today = ( 

318 session.query(func.count(Constellation.id)) 

319 .filter(func.date(Constellation.created_at) == today) 

320 .scalar() 

321 ) 

322 

323 return { 

324 "users": { 

325 "total": total_users, 

326 "active": verified_users, # Using verified as proxy for active 

327 "newToday": new_users_today, 

328 "growth": 0, # Placeholder percentage 

329 }, 

330 "constellations": { 

331 "total": total_constellations, 

332 "public": 0, # TODO: Add visibility field to constellation model 

333 "private": total_constellations, 

334 "newToday": new_constellations_today, 

335 "growth": 0, # Placeholder percentage 

336 }, 

337 "content": { 

338 "collections": total_collections, 

339 "assets": total_assets, 

340 "totalItems": total_collections + total_assets, 

341 }, 

342 "engagement": { 

343 "averageConstellationsPerUser": round( 

344 total_constellations / max(1, total_users), 2 

345 ), 

346 "averageCollectionsPerConstellation": round( 

347 total_collections / max(1, total_constellations), 2 

348 ), 

349 "verificationRate": round((verified_users / max(1, total_users)) * 100, 1), 

350 }, 

351 "system": { 

352 "status": "healthy", 

353 "uptime": "99.9%", 

354 "lastBackup": datetime.now().isoformat(), 

355 "version": "1.0.0", 

356 "environment": "development", 

357 }, 

358 } 

359 

360 

361def get_dashboard_activity(session: Session) -> Dict[str, Any]: 

362 """Get recent platform activity""" 

363 

364 # Get recent users (using id as proxy for creation time since no created_at) 

365 recent_users = session.query(User).order_by(desc(User.id)).limit(10).all() 

366 

367 # Get recent constellations 

368 recent_constellations = ( 

369 session.query(Constellation) 

370 .order_by(desc(Constellation.created_at)) 

371 .limit(10) 

372 .all() 

373 ) 

374 

375 activities = [] 

376 

377 # Add user activities 

378 for user in recent_users: 

379 activities.append( 

380 { 

381 "id": f"user_{user.id}", 

382 "type": "user_registered", 

383 "description": f"New user registered: {user.name}", 

384 "timestamp": datetime.now().isoformat(), # Placeholder 

385 "user": f"{user.name} <{user.email}>", 

386 } 

387 ) 

388 

389 # Add constellation activities 

390 for constellation in recent_constellations: 

391 activities.append( 

392 { 

393 "id": f"constellation_{constellation.id}", 

394 "type": "constellation_created", 

395 "description": f"New constellation created: {constellation.name}", 

396 "timestamp": constellation.created_at.isoformat() 

397 if constellation.created_at 

398 else datetime.now().isoformat(), 

399 "constellation": { 

400 "id": str(constellation.id), 

401 "name": constellation.name, 

402 }, 

403 } 

404 ) 

405 

406 # Sort by timestamp (most recent first) 

407 activities.sort(key=lambda x: x["timestamp"], reverse=True) 

408 

409 return { 

410 "activities": activities[:20] # Return top 20 most recent 

411 } 

412 

413 

414def get_dashboard_alerts(session: Session) -> Dict[str, Any]: 

415 """Get system alerts and notifications""" 

416 

417 alerts = [] 

418 

419 # Check for unverified users 

420 unverified_count = ( 

421 session.query(func.count(User.id)).filter(not User.email_verified).scalar() 

422 ) 

423 

424 if unverified_count > 10: 

425 alerts.append( 

426 { 

427 "id": "unverified_users", 

428 "type": "warning", 

429 "message": f"{unverified_count} users have unverified email addresses", 

430 "timestamp": datetime.now().isoformat(), 

431 "action": "Review user verification status", 

432 } 

433 ) 

434 

435 # Check system health (placeholder) 

436 alerts.append( 

437 { 

438 "id": "system_health", 

439 "type": "info", 

440 "message": "System is running normally", 

441 "timestamp": datetime.now().isoformat(), 

442 } 

443 ) 

444 

445 return {"alerts": alerts} 

446 

447 

448def get_users_list( 

449 session: Session, 

450 search: Optional[str] = None, 

451 status: Optional[str] = None, 

452 verified: Optional[bool] = None, 

453) -> Dict[str, Any]: 

454 """Get all users with filtering""" 

455 # Build query using SQLModel syntax 

456 try: 

457 # Use SQLModel's select with session.exec 

458 statement = select(User) 

459 

460 # Apply verification filter 

461 if verified is not None: 

462 statement = statement.where(User.email_verified == verified) 

463 

464 # Apply ordering 

465 statement = statement.order_by(desc(User.id)) 

466 

467 users = session.exec(statement).all() 

468 except Exception as e: 

469 print(f"Error during user query: {str(e)}") 

470 print(f"Error type: {type(e)}") 

471 raise 

472 

473 # Get total count for pagination 

474 try: 

475 count_statement = select(func.count(User.id)) 

476 if verified is not None: 

477 count_statement = count_statement.where(User.email_verified == verified) 

478 

479 total = session.exec(count_statement).one() 

480 except Exception as e: 

481 print(f"Error during count query: {str(e)}") 

482 raise 

483 

484 # Format users data 

485 users_data = [] 

486 for user in users: 

487 # Get user's constellation count 

488 constellation_count = ( 

489 session.query(func.count(ConstellationMembership.user_id)) 

490 .filter(ConstellationMembership.user_id == user.id) 

491 .scalar() 

492 ) 

493 

494 users_data.append( 

495 { 

496 "id": str(user.id), 

497 "name": user.name, 

498 "email": user.email, 

499 "createdAt": datetime.now().isoformat(), # Placeholder 

500 "lastLoginAt": datetime.now().isoformat(), # Placeholder 

501 "isActive": user.email_verified, # Using verified as proxy for active 

502 "isVerified": user.email_verified, 

503 "constellationsCount": constellation_count, 

504 "totalElements": 0, # Placeholder - would need element count logic 

505 "totalLinks": 0, # Placeholder - would need link count logic 

506 "storageUsed": 0.0, # Placeholder - would need storage calculation 

507 } 

508 ) 

509 

510 return {"users": users_data, "total": total} 

511 

512 

513def toggle_user_status(session: Session, user_id: str) -> Dict[str, Any]: 

514 """Toggle user active/inactive status""" 

515 

516 user = session.query(User).filter(User.id == user_id).first() 

517 if not user: 

518 return {"success": False, "message": "User not found"} 

519 

520 # Toggle email verification as proxy for status 

521 user.email_verified = not user.email_verified 

522 session.add(user) 

523 session.commit() 

524 session.refresh(user) 

525 

526 return { 

527 "success": True, 

528 "message": f"User status updated to {'active' if user.email_verified else 'inactive'}", 

529 "user": { 

530 "id": str(user.id), 

531 "status": "active" if user.email_verified else "inactive", 

532 "emailVerified": user.email_verified, 

533 }, 

534 } 

535 

536 

537def get_constellations_list( 

538 session: Session, 

539 search: Optional[str] = None, 

540 sort_by: Optional[str] = None, 

541) -> Dict[str, Any]: 

542 """Get all constellations with filtering""" 

543 # Build query - simplified without complex search for now 

544 query = session.query(Constellation) 

545 

546 # Apply sorting 

547 if sort_by == "name": 

548 query = query.order_by(Constellation.name) 

549 elif sort_by == "created": 

550 query = query.order_by(desc(Constellation.created_at)) 

551 else: 

552 query = query.order_by(desc(Constellation.created_at)) 

553 

554 constellations = query.all() 

555 

556 # Get total count 

557 total = session.query(func.count(Constellation.id)).scalar() 

558 

559 # Format constellations data 

560 constellations_data = [] 

561 for constellation in constellations: 

562 # Get members count 

563 members_count = ( 

564 session.query(func.count(ConstellationMembership.user_id)) 

565 .filter(ConstellationMembership.constellation_id == constellation.id) 

566 .scalar() 

567 ) 

568 

569 # Get collections count 

570 collections_count = ( 

571 session.query(func.count(Collection.id)) 

572 .filter(Collection.constellation_id == constellation.id) 

573 .scalar() 

574 ) 

575 

576 # Get assets count 

577 assets_count = ( 

578 session.query(func.count(Asset.id)) 

579 .filter(Asset.constellation_id == constellation.id) 

580 .scalar() 

581 ) 

582 

583 # Get elements count (YDoc files in constellation's collections) 

584 elements_count = ( 

585 session.query(func.count(YDoc.id)) 

586 .join(Collection) 

587 .filter(Collection.constellation_id == constellation.id) 

588 .scalar() 

589 ) 

590 

591 # Get constellation owner 

592 owner_membership = ( 

593 session.query(ConstellationMembership) 

594 .join(User) 

595 .filter( 

596 ConstellationMembership.constellation_id == constellation.id, 

597 ConstellationMembership.access == AccessEnum.OWNER, 

598 ) 

599 .first() 

600 ) 

601 

602 owner_data = {"id": None, "name": "Unknown", "email": "unknown@example.com"} 

603 

604 if owner_membership and owner_membership.user: 

605 owner_data = { 

606 "id": str(owner_membership.user.id), 

607 "name": owner_membership.user.name, 

608 "email": owner_membership.user.email, 

609 } 

610 

611 constellations_data.append( 

612 { 

613 "id": str(constellation.id), 

614 "title": constellation.name, 

615 "description": constellation.description, 

616 "createdAt": constellation.created_at.isoformat() 

617 if constellation.created_at 

618 else datetime.now().isoformat(), 

619 "collaboratorsCount": members_count, 

620 "collectionsCount": collections_count, 

621 "elementsCount": elements_count, 

622 "imagesCount": assets_count, 

623 "storageUsed": 0.0, # Placeholder 

624 "lastActivity": datetime.now().isoformat(), # Placeholder 

625 "owner": owner_data, 

626 "tags": ["placeholder"], # Placeholder - add tags logic later 

627 } 

628 ) 

629 

630 return { 

631 "constellations": constellations_data, 

632 "total": total, 

633 } 

634 

635 

636def delete_constellation_admin( 

637 session: Session, constellation_id: str 

638) -> Dict[str, Any]: 

639 """Delete a constellation (admin override)""" 

640 

641 constellation = ( 

642 session.query(Constellation) 

643 .filter(Constellation.id == constellation_id) 

644 .first() 

645 ) 

646 

647 if not constellation: 

648 return {"success": False, "message": "Constellation not found"} 

649 

650 # Delete the constellation (cascades should handle related data) 

651 session.delete(constellation) 

652 session.commit() 

653 

654 return { 

655 "success": True, 

656 "message": f"Constellation '{constellation.name}' deleted successfully", 

657 } 

658 

659 

660def get_analytics_data(session: Session, period: str = "30d") -> Dict[str, Any]: 

661 """Get comprehensive analytics data""" 

662 

663 # Get basic analytics 

664 total_users = session.query(func.count(User.id)).scalar() 

665 verified_users = ( 

666 session.query(func.count(User.id)).filter(User.email_verified).scalar() 

667 ) 

668 

669 # Get constellation analytics 

670 total_constellations = session.query(func.count(Constellation.id)).scalar() 

671 

672 # Get content analytics 

673 _ = session.query(func.count(Collection.id)).scalar() 

674 total_assets = session.query(func.count(Asset.id)).scalar() 

675 total_ydocs = session.query(func.count(YDoc.id)).scalar() 

676 

677 # Get top constellations (by number of elements/members) 

678 top_constellations_query = ( 

679 session.query( 

680 Constellation, 

681 func.count(YDoc.id).label("elements_count"), 

682 func.count(ConstellationMembership.user_id).label("members_count"), 

683 ) 

684 .outerjoin(Collection, Collection.constellation_id == Constellation.id) 

685 .outerjoin(YDoc, YDoc.collection_id == Collection.id) 

686 .outerjoin( 

687 ConstellationMembership, 

688 ConstellationMembership.constellation_id == Constellation.id, 

689 ) 

690 .group_by(Constellation.id) 

691 .order_by(desc("elements_count"), desc("members_count")) 

692 .limit(5) 

693 .all() 

694 ) 

695 

696 top_constellations = [] 

697 for constellation, elements_count, members_count in top_constellations_query: 

698 # Get owner 

699 owner_membership = ( 

700 session.query(ConstellationMembership) 

701 .join(User) 

702 .filter( 

703 ConstellationMembership.constellation_id == constellation.id, 

704 ConstellationMembership.access == AccessEnum.OWNER, 

705 ) 

706 .first() 

707 ) 

708 

709 owner_name = "Unknown" 

710 if owner_membership and owner_membership.user: 

711 owner_name = owner_membership.user.name 

712 

713 top_constellations.append( 

714 { 

715 "id": str(constellation.id), 

716 "title": constellation.name, 

717 "owner": owner_name, 

718 "elementsCount": elements_count or 0, 

719 "membersCount": members_count or 0, 

720 "createdAt": constellation.created_at.isoformat() 

721 if constellation.created_at 

722 else datetime.now().isoformat(), 

723 } 

724 ) 

725 

726 # Get top users (by number of constellations they own/participate in) 

727 top_users_query = ( 

728 session.query( 

729 User, 

730 func.count(ConstellationMembership.constellation_id).label( 

731 "constellations_count" 

732 ), 

733 ) 

734 .outerjoin(ConstellationMembership, ConstellationMembership.user_id == User.id) 

735 .group_by(User.id) 

736 .order_by(desc("constellations_count")) 

737 .limit(5) 

738 .all() 

739 ) 

740 

741 top_users = [] 

742 for user, constellations_count in top_users_query: 

743 top_users.append( 

744 { 

745 "id": str(user.id), 

746 "name": user.name, 

747 "email": user.email, 

748 "constellationsCount": constellations_count or 0, 

749 "joinedAt": datetime.now().isoformat(), # Placeholder - would need created_at field 

750 } 

751 ) 

752 

753 return { 

754 "userStats": { 

755 "totalUsers": total_users, 

756 "activeUsers": verified_users, 

757 "newUsersToday": 0, # Placeholder 

758 "newUsersThisWeek": 0, # Placeholder 

759 "newUsersThisMonth": 0, # Placeholder 

760 "userGrowthRate": 0.0, # Placeholder 

761 "averageSessionDuration": 0.0, # Placeholder 

762 }, 

763 "constellationStats": { 

764 "totalConstellations": total_constellations, 

765 "newConstellationsToday": 0, # Placeholder 

766 "newConstellationsThisWeek": 0, # Placeholder 

767 "newConstellationsThisMonth": 0, # Placeholder 

768 "constellationGrowthRate": 0.0, # Placeholder 

769 "averageElementsPerConstellation": round( 

770 total_ydocs / max(1, total_constellations), 2 

771 ), 

772 }, 

773 "systemStats": { 

774 "totalStorageUsed": 0.0, # Placeholder 

775 "averageStoragePerUser": 0.0, # Placeholder 

776 "totalImages": total_assets, 

777 "totalElements": total_ydocs, 

778 "databaseSize": 0.0, # Placeholder 

779 "activeConnections": 0, # Placeholder 

780 }, 

781 "topConstellations": top_constellations, 

782 "topUsers": top_users, 

783 } 

784 

785 

786def get_permissions_list( 

787 session: Session, 

788 search: Optional[str] = None, 

789 role: Optional[str] = None, 

790) -> Dict[str, Any]: 

791 """Get all user-constellation permissions with filtering""" 

792 # Build query for constellation memberships 

793 query = session.query(ConstellationMembership).join(User).join(Constellation) 

794 

795 # Apply role filter 

796 if role: 

797 try: 

798 role_enum = AccessEnum(role.upper()) 

799 query = query.filter(ConstellationMembership.access == role_enum) 

800 except ValueError: 

801 # Invalid role, return empty results 

802 return {"permissions": [], "total": 0} 

803 

804 # Apply search filter - simplified for now (TODO: implement proper search) 

805 # if search: 

806 # # Search functionality to be implemented with proper SQLModel syntax 

807 

808 # Apply ordering 

809 query = query.order_by( 

810 ConstellationMembership.constellation_id, ConstellationMembership.user_id 

811 ) 

812 total = query.count() 

813 memberships = query.all() 

814 

815 # Format permissions data 

816 permissions_data = [] 

817 for membership in memberships: 

818 permissions_data.append( 

819 { 

820 "id": f"{membership.constellation_id}_{membership.user_id}", 

821 "user": { 

822 "id": str(membership.user.id), 

823 "name": membership.user.name, 

824 "email": membership.user.email, 

825 }, 

826 "constellation": { 

827 "id": str(membership.constellation.id), 

828 "title": membership.constellation.name, 

829 "description": membership.constellation.description, 

830 }, 

831 "role": membership.access.value.lower(), 

832 "grantedAt": datetime.now().isoformat(), # Placeholder - would need created_at field 

833 "grantedBy": "system", # Placeholder - would need granted_by field 

834 "lastAccessed": None, # Placeholder - would need last_accessed field 

835 "permissions": { 

836 "canView": True, # All roles can view 

837 "canEdit": membership.access 

838 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER], 

839 "canDelete": membership.access 

840 in [AccessEnum.ADMIN, AccessEnum.OWNER], 

841 "canShare": membership.access 

842 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER], 

843 "canManageUsers": membership.access 

844 in [AccessEnum.ADMIN, AccessEnum.OWNER], 

845 }, 

846 } 

847 ) 

848 

849 return { 

850 "permissions": permissions_data, 

851 "total": total, 

852 } 

853 

854 

855def update_permission_role( 

856 session: Session, permission_id: str, new_role: str 

857) -> Dict[str, Any]: 

858 """Update user role for a constellation""" 

859 

860 try: 

861 # Parse permission_id (format: "constellation_id_user_id") 

862 parts = permission_id.split("_", 1) 

863 if len(parts) != 2: 

864 return {"success": False, "message": "Invalid permission ID format"} 

865 

866 constellation_id, user_id = parts 

867 

868 # Validate role 

869 try: 

870 role_enum = AccessEnum(new_role.upper()) 

871 except ValueError: 

872 return {"success": False, "message": f"Invalid role: {new_role}"} 

873 

874 # Find the membership 

875 membership = ( 

876 session.query(ConstellationMembership) 

877 .filter( 

878 ConstellationMembership.constellation_id == constellation_id, 

879 ConstellationMembership.user_id == user_id, 

880 ) 

881 .first() 

882 ) 

883 

884 if not membership: 

885 return {"success": False, "message": "Permission not found"} 

886 

887 # Update the role 

888 old_role = membership.access.value.lower() 

889 membership.access = role_enum 

890 session.add(membership) 

891 session.commit() 

892 session.refresh(membership) 

893 

894 return { 

895 "success": True, 

896 "message": f"Role updated from {old_role} to {new_role.lower()}", 

897 "permission": { 

898 "id": permission_id, 

899 "role": new_role.lower(), 

900 "permissions": { 

901 "canView": True, 

902 "canEdit": role_enum 

903 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER], 

904 "canDelete": role_enum in [AccessEnum.ADMIN, AccessEnum.OWNER], 

905 "canShare": role_enum 

906 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER], 

907 "canManageUsers": role_enum in [AccessEnum.ADMIN, AccessEnum.OWNER], 

908 }, 

909 }, 

910 } 

911 

912 except Exception as e: 

913 session.rollback() 

914 return {"success": False, "message": f"Error updating role: {str(e)}"} 

915 

916 

917def revoke_permission(session: Session, permission_id: str) -> Dict[str, Any]: 

918 """Revoke user permission from a constellation""" 

919 

920 try: 

921 # Parse permission_id (format: "constellation_id_user_id") 

922 parts = permission_id.split("_", 1) 

923 if len(parts) != 2: 

924 return {"success": False, "message": "Invalid permission ID format"} 

925 

926 constellation_id, user_id = parts 

927 

928 # Find the membership 

929 membership = ( 

930 session.query(ConstellationMembership) 

931 .filter( 

932 ConstellationMembership.constellation_id == constellation_id, 

933 ConstellationMembership.user_id == user_id, 

934 ) 

935 .first() 

936 ) 

937 

938 if not membership: 

939 return {"success": False, "message": "Permission not found"} 

940 

941 # Don't allow revoking OWNER permissions 

942 if membership.access == AccessEnum.OWNER: 

943 return { 

944 "success": False, 

945 "message": "Cannot revoke owner permissions. Transfer ownership first.", 

946 } 

947 

948 # Delete the membership 

949 session.delete(membership) 

950 session.commit() 

951 

952 return {"success": True, "message": "Permission revoked successfully"} 

953 

954 except Exception as e: 

955 session.rollback() 

956 return {"success": False, "message": f"Error revoking permission: {str(e)}"}