Coverage for src/cstlcore/admin/services.py: 13%
247 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
1"""
2Admin services for dashboard and management operations.
3"""
5import asyncio
6from datetime import datetime
7from typing import Any, Dict, Optional, List, Tuple
9import httpx
10from sqlmodel import Session, desc, func, select
12from cstlcore.assets.models import Asset
13from cstlcore.collections.models import Collection
14from cstlcore.constellations.models import Constellation
15from cstlcore.memberships.models import AccessEnum, ConstellationMembership
16from cstlcore.settings import settings
17from cstlcore.users.models import User
18from cstlcore.ydocs.models import YDoc
21# =============================================================================
22# Graph Theory Metrics Functions
23# =============================================================================
26def compute_graph_density(n: int, m: int) -> float:
27 """
28 Compute graph density using the formula: D = 2M / N(N-1)
30 Args:
31 n: Number of nodes (elements)
32 m: Number of unique undirected edges (relations)
34 Returns:
35 Graph density in range [0, 1]. Returns 0 if N < 2.
36 """
37 if n < 2:
38 return 0.0
39 return (2 * m) / (n * (n - 1))
42def compute_average_degree(n: int, m: int) -> float:
43 """
44 Compute average degree using the formula: k = 2M / N
46 Args:
47 n: Number of nodes (elements)
48 m: Number of unique undirected edges (relations)
50 Returns:
51 Average degree. Returns 0 if N < 1.
52 """
53 if n < 1:
54 return 0.0
55 return (2 * m) / n
58def interpret_density(density: float) -> Dict[str, str]:
59 """
60 Provide heuristic interpretation of graph density for narrative analysis.
62 Thresholds (from graph.md specification):
63 - 0.00 – 0.05: Sparse, linear narrative
64 - 0.05 – 0.15: Structured but simple
65 - 0.15 – 0.30: Rich, readable narrative (target range)
66 - > 0.30: Highly interconnected, potential complexity overload
68 Args:
69 density: Graph density value
71 Returns:
72 Dictionary with level and description
73 """
74 if density < 0.05:
75 return "sparse"
76 elif density < 0.15:
77 return "simple"
78 elif density < 0.30:
79 return "rich"
80 else:
81 return "complex"
83def count_unique_undirected_edges(links: List[Dict[str, Any]]) -> int:
84 """
85 Count unique undirected edges from a list of Neo4j links.
87 Rules (from graph.md specification):
88 - Treat the graph as undirected
89 - Do not count self-loops
90 - Do not count duplicate edges
92 The Neo4j API already deduplicates links by their internal ID,
93 but we need to filter self-loops.
95 Args:
96 links: List of link dictionaries with start_node and end_node
98 Returns:
99 Number of unique undirected edges (excluding self-loops)
100 """
101 unique_edges = set()
102 for link in links:
103 start = link.get("start_node")
104 end = link.get("end_node")
106 # Skip self-loops
107 if start == end:
108 continue
110 # Normalize edge direction for undirected graph
111 edge = tuple(sorted([start, end]))
112 unique_edges.add(edge)
114 return len(unique_edges)
117async def fetch_neo4j_graph_data(
118 constellation_id: str, token: str
119) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
120 """
121 Fetch nodes and links from Neo4j API for a constellation.
123 Args:
124 constellation_id: UUID of the constellation
125 token: JWT token for authentication
127 Returns:
128 Tuple of (nodes_list, links_list)
130 Raises:
131 httpx.HTTPError: If Neo4j API request fails
132 """
133 graph_api_url = str(settings.services.graph_api).rstrip("/")
134 headers = {"Authorization": f"Bearer {token}"}
136 async with httpx.AsyncClient(timeout=30.0) as client:
137 # Fetch nodes and links in parallel
138 nodes_response, links_response = await asyncio.gather(
139 client.get(
140 f"{graph_api_url}/constellation/{constellation_id}/nodes",
141 headers=headers,
142 ),
143 client.get(
144 f"{graph_api_url}/constellation/{constellation_id}/links",
145 headers=headers,
146 ),
147 )
149 nodes_response.raise_for_status()
150 links_response.raise_for_status()
152 nodes_data = nodes_response.json()
153 links_data = links_response.json()
155 nodes = nodes_data.get("data", []) if nodes_data.get("success") else []
156 links = links_data.get("data", []) if links_data.get("success") else []
158 return nodes, links
161async def get_constellation_graph_metrics(
162 session: Session, constellation_id: str, token: str
163) -> Dict[str, Any]:
164 """
165 Get graph theory metrics for a single constellation.
167 Args:
168 session: Database session
169 constellation_id: UUID of the constellation
170 token: JWT token for Neo4j API authentication
172 Returns:
173 Dictionary with graph metrics
174 """
175 try:
176 nodes, links = await fetch_neo4j_graph_data(constellation_id, token)
178 n = len(nodes)
179 m = count_unique_undirected_edges(links)
181 density = compute_graph_density(n, m)
182 avg_degree = compute_average_degree(n, m)
183 interpretation = interpret_density(density)
184 return {
185 "constellationId": constellation_id,
186 "elementCount": n,
187 "relationCount": m,
188 "graphDensity": round(density, 4),
189 "averageDegree": round(avg_degree, 2),
190 "interpretationLevel": interpretation,
191 }
193 except httpx.HTTPError as e:
194 # Return zero metrics if Neo4j API fails
195 return {
196 "constellationId": constellation_id,
197 "elementCount": 0,
198 "relationCount": 0,
199 "graphDensity": 0.0,
200 "averageDegree": 0.0,
201 "interpretationLevel": interpret_density(0.0),
202 "error": f"Failed to fetch graph data: {str(e)}",
203 }
206async def get_all_constellations_graph_metrics(
207 session: Session, token: str
208) -> Dict[str, Any]:
209 """
210 Get graph theory metrics for all constellations (bulk endpoint).
212 Args:
213 session: Database session
214 token: JWT token for Neo4j API authentication
216 Returns:
217 Dictionary with bulk metrics data
218 """
219 # Get all constellations
220 constellations = (
221 session.query(Constellation)
222 .order_by(desc(Constellation.created_at))
223 .all()
224 )
226 metrics_list = []
227 total_density = 0.0
228 total_elements = 0
229 total_relations = 0
231 for constellation in constellations:
232 try:
233 nodes, links = await fetch_neo4j_graph_data(str(constellation.id), token)
235 n = len(nodes)
236 m = count_unique_undirected_edges(links)
237 density = compute_graph_density(n, m)
238 avg_degree = compute_average_degree(n, m)
239 interpretation = interpret_density(density)
241 metrics_list.append(
242 {
243 "constellationId": str(constellation.id),
244 "constellationName": constellation.name,
245 "elementCount": n,
246 "relationCount": m,
247 "graphDensity": round(density, 4),
248 "averageDegree": round(avg_degree, 2),
249 "interpretationLevel": interpretation,
250 }
251 )
253 total_density += density
254 total_elements += n
255 total_relations += m
257 except httpx.HTTPError:
258 # Include constellation with zero metrics on error
259 metrics_list.append(
260 {
261 "constellationId": str(constellation.id),
262 "constellationName": constellation.name,
263 "elementCount": 0,
264 "relationCount": 0,
265 "graphDensity": 0.0,
266 "averageDegree": 0.0,
267 "interpretationLevel": "sparse",
268 }
269 )
271 count = len(metrics_list)
272 return {
273 "constellations": metrics_list,
274 "total": count,
275 "averageDensity": round(total_density / max(1, count), 4),
276 "averageElements": round(total_elements / max(1, count), 2),
277 "averageRelations": round(total_relations / max(1, count), 2),
278 }
281def get_admin_configuration_info() -> Dict[str, Any]:
282 """
283 Get current admin configuration information.
284 """
285 return {
286 "admin_emails_configured": list(settings.admin.emails)
287 if settings.admin.emails
288 else [],
289 "total_admin_emails": len(settings.admin.emails)
290 if settings.admin.emails
291 else 0,
292 "environment_variable": "ADMIN__EMAILS",
293 "format": "Comma-separated email addresses",
294 "note": "Admin privileges are granted based on email matching, not database roles",
295 }
298def get_dashboard_stats(session: Session) -> Dict[str, Any]:
299 """Get overview statistics for the admin dashboard"""
301 # Get basic counts
302 total_users = session.query(func.count(User.id)).scalar()
303 total_constellations = session.query(func.count(Constellation.id)).scalar()
304 total_collections = session.query(func.count(Collection.id)).scalar()
305 total_assets = session.query(func.count(Asset.id)).scalar()
307 # Get verified users count
308 verified_users = (
309 session.query(func.count(User.id)).filter(User.email_verified).scalar()
310 )
312 # Calculate today's new users (placeholder - would need created_at field)
313 new_users_today = 0 # TODO: Add created_at field to User model
315 # Calculate today's new constellations
316 today = datetime.now().date()
317 new_constellations_today = (
318 session.query(func.count(Constellation.id))
319 .filter(func.date(Constellation.created_at) == today)
320 .scalar()
321 )
323 return {
324 "users": {
325 "total": total_users,
326 "active": verified_users, # Using verified as proxy for active
327 "newToday": new_users_today,
328 "growth": 0, # Placeholder percentage
329 },
330 "constellations": {
331 "total": total_constellations,
332 "public": 0, # TODO: Add visibility field to constellation model
333 "private": total_constellations,
334 "newToday": new_constellations_today,
335 "growth": 0, # Placeholder percentage
336 },
337 "content": {
338 "collections": total_collections,
339 "assets": total_assets,
340 "totalItems": total_collections + total_assets,
341 },
342 "engagement": {
343 "averageConstellationsPerUser": round(
344 total_constellations / max(1, total_users), 2
345 ),
346 "averageCollectionsPerConstellation": round(
347 total_collections / max(1, total_constellations), 2
348 ),
349 "verificationRate": round((verified_users / max(1, total_users)) * 100, 1),
350 },
351 "system": {
352 "status": "healthy",
353 "uptime": "99.9%",
354 "lastBackup": datetime.now().isoformat(),
355 "version": "1.0.0",
356 "environment": "development",
357 },
358 }
361def get_dashboard_activity(session: Session) -> Dict[str, Any]:
362 """Get recent platform activity"""
364 # Get recent users (using id as proxy for creation time since no created_at)
365 recent_users = session.query(User).order_by(desc(User.id)).limit(10).all()
367 # Get recent constellations
368 recent_constellations = (
369 session.query(Constellation)
370 .order_by(desc(Constellation.created_at))
371 .limit(10)
372 .all()
373 )
375 activities = []
377 # Add user activities
378 for user in recent_users:
379 activities.append(
380 {
381 "id": f"user_{user.id}",
382 "type": "user_registered",
383 "description": f"New user registered: {user.name}",
384 "timestamp": datetime.now().isoformat(), # Placeholder
385 "user": f"{user.name} <{user.email}>",
386 }
387 )
389 # Add constellation activities
390 for constellation in recent_constellations:
391 activities.append(
392 {
393 "id": f"constellation_{constellation.id}",
394 "type": "constellation_created",
395 "description": f"New constellation created: {constellation.name}",
396 "timestamp": constellation.created_at.isoformat()
397 if constellation.created_at
398 else datetime.now().isoformat(),
399 "constellation": {
400 "id": str(constellation.id),
401 "name": constellation.name,
402 },
403 }
404 )
406 # Sort by timestamp (most recent first)
407 activities.sort(key=lambda x: x["timestamp"], reverse=True)
409 return {
410 "activities": activities[:20] # Return top 20 most recent
411 }
414def get_dashboard_alerts(session: Session) -> Dict[str, Any]:
415 """Get system alerts and notifications"""
417 alerts = []
419 # Check for unverified users
420 unverified_count = (
421 session.query(func.count(User.id)).filter(not User.email_verified).scalar()
422 )
424 if unverified_count > 10:
425 alerts.append(
426 {
427 "id": "unverified_users",
428 "type": "warning",
429 "message": f"{unverified_count} users have unverified email addresses",
430 "timestamp": datetime.now().isoformat(),
431 "action": "Review user verification status",
432 }
433 )
435 # Check system health (placeholder)
436 alerts.append(
437 {
438 "id": "system_health",
439 "type": "info",
440 "message": "System is running normally",
441 "timestamp": datetime.now().isoformat(),
442 }
443 )
445 return {"alerts": alerts}
448def get_users_list(
449 session: Session,
450 search: Optional[str] = None,
451 status: Optional[str] = None,
452 verified: Optional[bool] = None,
453) -> Dict[str, Any]:
454 """Get all users with filtering"""
455 # Build query using SQLModel syntax
456 try:
457 # Use SQLModel's select with session.exec
458 statement = select(User)
460 # Apply verification filter
461 if verified is not None:
462 statement = statement.where(User.email_verified == verified)
464 # Apply ordering
465 statement = statement.order_by(desc(User.id))
467 users = session.exec(statement).all()
468 except Exception as e:
469 print(f"Error during user query: {str(e)}")
470 print(f"Error type: {type(e)}")
471 raise
473 # Get total count for pagination
474 try:
475 count_statement = select(func.count(User.id))
476 if verified is not None:
477 count_statement = count_statement.where(User.email_verified == verified)
479 total = session.exec(count_statement).one()
480 except Exception as e:
481 print(f"Error during count query: {str(e)}")
482 raise
484 # Format users data
485 users_data = []
486 for user in users:
487 # Get user's constellation count
488 constellation_count = (
489 session.query(func.count(ConstellationMembership.user_id))
490 .filter(ConstellationMembership.user_id == user.id)
491 .scalar()
492 )
494 users_data.append(
495 {
496 "id": str(user.id),
497 "name": user.name,
498 "email": user.email,
499 "createdAt": datetime.now().isoformat(), # Placeholder
500 "lastLoginAt": datetime.now().isoformat(), # Placeholder
501 "isActive": user.email_verified, # Using verified as proxy for active
502 "isVerified": user.email_verified,
503 "constellationsCount": constellation_count,
504 "totalElements": 0, # Placeholder - would need element count logic
505 "totalLinks": 0, # Placeholder - would need link count logic
506 "storageUsed": 0.0, # Placeholder - would need storage calculation
507 }
508 )
510 return {"users": users_data, "total": total}
513def toggle_user_status(session: Session, user_id: str) -> Dict[str, Any]:
514 """Toggle user active/inactive status"""
516 user = session.query(User).filter(User.id == user_id).first()
517 if not user:
518 return {"success": False, "message": "User not found"}
520 # Toggle email verification as proxy for status
521 user.email_verified = not user.email_verified
522 session.add(user)
523 session.commit()
524 session.refresh(user)
526 return {
527 "success": True,
528 "message": f"User status updated to {'active' if user.email_verified else 'inactive'}",
529 "user": {
530 "id": str(user.id),
531 "status": "active" if user.email_verified else "inactive",
532 "emailVerified": user.email_verified,
533 },
534 }
537def get_constellations_list(
538 session: Session,
539 search: Optional[str] = None,
540 sort_by: Optional[str] = None,
541) -> Dict[str, Any]:
542 """Get all constellations with filtering"""
543 # Build query - simplified without complex search for now
544 query = session.query(Constellation)
546 # Apply sorting
547 if sort_by == "name":
548 query = query.order_by(Constellation.name)
549 elif sort_by == "created":
550 query = query.order_by(desc(Constellation.created_at))
551 else:
552 query = query.order_by(desc(Constellation.created_at))
554 constellations = query.all()
556 # Get total count
557 total = session.query(func.count(Constellation.id)).scalar()
559 # Format constellations data
560 constellations_data = []
561 for constellation in constellations:
562 # Get members count
563 members_count = (
564 session.query(func.count(ConstellationMembership.user_id))
565 .filter(ConstellationMembership.constellation_id == constellation.id)
566 .scalar()
567 )
569 # Get collections count
570 collections_count = (
571 session.query(func.count(Collection.id))
572 .filter(Collection.constellation_id == constellation.id)
573 .scalar()
574 )
576 # Get assets count
577 assets_count = (
578 session.query(func.count(Asset.id))
579 .filter(Asset.constellation_id == constellation.id)
580 .scalar()
581 )
583 # Get elements count (YDoc files in constellation's collections)
584 elements_count = (
585 session.query(func.count(YDoc.id))
586 .join(Collection)
587 .filter(Collection.constellation_id == constellation.id)
588 .scalar()
589 )
591 # Get constellation owner
592 owner_membership = (
593 session.query(ConstellationMembership)
594 .join(User)
595 .filter(
596 ConstellationMembership.constellation_id == constellation.id,
597 ConstellationMembership.access == AccessEnum.OWNER,
598 )
599 .first()
600 )
602 owner_data = {"id": None, "name": "Unknown", "email": "unknown@example.com"}
604 if owner_membership and owner_membership.user:
605 owner_data = {
606 "id": str(owner_membership.user.id),
607 "name": owner_membership.user.name,
608 "email": owner_membership.user.email,
609 }
611 constellations_data.append(
612 {
613 "id": str(constellation.id),
614 "title": constellation.name,
615 "description": constellation.description,
616 "createdAt": constellation.created_at.isoformat()
617 if constellation.created_at
618 else datetime.now().isoformat(),
619 "collaboratorsCount": members_count,
620 "collectionsCount": collections_count,
621 "elementsCount": elements_count,
622 "imagesCount": assets_count,
623 "storageUsed": 0.0, # Placeholder
624 "lastActivity": datetime.now().isoformat(), # Placeholder
625 "owner": owner_data,
626 "tags": ["placeholder"], # Placeholder - add tags logic later
627 }
628 )
630 return {
631 "constellations": constellations_data,
632 "total": total,
633 }
636def delete_constellation_admin(
637 session: Session, constellation_id: str
638) -> Dict[str, Any]:
639 """Delete a constellation (admin override)"""
641 constellation = (
642 session.query(Constellation)
643 .filter(Constellation.id == constellation_id)
644 .first()
645 )
647 if not constellation:
648 return {"success": False, "message": "Constellation not found"}
650 # Delete the constellation (cascades should handle related data)
651 session.delete(constellation)
652 session.commit()
654 return {
655 "success": True,
656 "message": f"Constellation '{constellation.name}' deleted successfully",
657 }
660def get_analytics_data(session: Session, period: str = "30d") -> Dict[str, Any]:
661 """Get comprehensive analytics data"""
663 # Get basic analytics
664 total_users = session.query(func.count(User.id)).scalar()
665 verified_users = (
666 session.query(func.count(User.id)).filter(User.email_verified).scalar()
667 )
669 # Get constellation analytics
670 total_constellations = session.query(func.count(Constellation.id)).scalar()
672 # Get content analytics
673 _ = session.query(func.count(Collection.id)).scalar()
674 total_assets = session.query(func.count(Asset.id)).scalar()
675 total_ydocs = session.query(func.count(YDoc.id)).scalar()
677 # Get top constellations (by number of elements/members)
678 top_constellations_query = (
679 session.query(
680 Constellation,
681 func.count(YDoc.id).label("elements_count"),
682 func.count(ConstellationMembership.user_id).label("members_count"),
683 )
684 .outerjoin(Collection, Collection.constellation_id == Constellation.id)
685 .outerjoin(YDoc, YDoc.collection_id == Collection.id)
686 .outerjoin(
687 ConstellationMembership,
688 ConstellationMembership.constellation_id == Constellation.id,
689 )
690 .group_by(Constellation.id)
691 .order_by(desc("elements_count"), desc("members_count"))
692 .limit(5)
693 .all()
694 )
696 top_constellations = []
697 for constellation, elements_count, members_count in top_constellations_query:
698 # Get owner
699 owner_membership = (
700 session.query(ConstellationMembership)
701 .join(User)
702 .filter(
703 ConstellationMembership.constellation_id == constellation.id,
704 ConstellationMembership.access == AccessEnum.OWNER,
705 )
706 .first()
707 )
709 owner_name = "Unknown"
710 if owner_membership and owner_membership.user:
711 owner_name = owner_membership.user.name
713 top_constellations.append(
714 {
715 "id": str(constellation.id),
716 "title": constellation.name,
717 "owner": owner_name,
718 "elementsCount": elements_count or 0,
719 "membersCount": members_count or 0,
720 "createdAt": constellation.created_at.isoformat()
721 if constellation.created_at
722 else datetime.now().isoformat(),
723 }
724 )
726 # Get top users (by number of constellations they own/participate in)
727 top_users_query = (
728 session.query(
729 User,
730 func.count(ConstellationMembership.constellation_id).label(
731 "constellations_count"
732 ),
733 )
734 .outerjoin(ConstellationMembership, ConstellationMembership.user_id == User.id)
735 .group_by(User.id)
736 .order_by(desc("constellations_count"))
737 .limit(5)
738 .all()
739 )
741 top_users = []
742 for user, constellations_count in top_users_query:
743 top_users.append(
744 {
745 "id": str(user.id),
746 "name": user.name,
747 "email": user.email,
748 "constellationsCount": constellations_count or 0,
749 "joinedAt": datetime.now().isoformat(), # Placeholder - would need created_at field
750 }
751 )
753 return {
754 "userStats": {
755 "totalUsers": total_users,
756 "activeUsers": verified_users,
757 "newUsersToday": 0, # Placeholder
758 "newUsersThisWeek": 0, # Placeholder
759 "newUsersThisMonth": 0, # Placeholder
760 "userGrowthRate": 0.0, # Placeholder
761 "averageSessionDuration": 0.0, # Placeholder
762 },
763 "constellationStats": {
764 "totalConstellations": total_constellations,
765 "newConstellationsToday": 0, # Placeholder
766 "newConstellationsThisWeek": 0, # Placeholder
767 "newConstellationsThisMonth": 0, # Placeholder
768 "constellationGrowthRate": 0.0, # Placeholder
769 "averageElementsPerConstellation": round(
770 total_ydocs / max(1, total_constellations), 2
771 ),
772 },
773 "systemStats": {
774 "totalStorageUsed": 0.0, # Placeholder
775 "averageStoragePerUser": 0.0, # Placeholder
776 "totalImages": total_assets,
777 "totalElements": total_ydocs,
778 "databaseSize": 0.0, # Placeholder
779 "activeConnections": 0, # Placeholder
780 },
781 "topConstellations": top_constellations,
782 "topUsers": top_users,
783 }
786def get_permissions_list(
787 session: Session,
788 search: Optional[str] = None,
789 role: Optional[str] = None,
790) -> Dict[str, Any]:
791 """Get all user-constellation permissions with filtering"""
792 # Build query for constellation memberships
793 query = session.query(ConstellationMembership).join(User).join(Constellation)
795 # Apply role filter
796 if role:
797 try:
798 role_enum = AccessEnum(role.upper())
799 query = query.filter(ConstellationMembership.access == role_enum)
800 except ValueError:
801 # Invalid role, return empty results
802 return {"permissions": [], "total": 0}
804 # Apply search filter - simplified for now (TODO: implement proper search)
805 # if search:
806 # # Search functionality to be implemented with proper SQLModel syntax
808 # Apply ordering
809 query = query.order_by(
810 ConstellationMembership.constellation_id, ConstellationMembership.user_id
811 )
812 total = query.count()
813 memberships = query.all()
815 # Format permissions data
816 permissions_data = []
817 for membership in memberships:
818 permissions_data.append(
819 {
820 "id": f"{membership.constellation_id}_{membership.user_id}",
821 "user": {
822 "id": str(membership.user.id),
823 "name": membership.user.name,
824 "email": membership.user.email,
825 },
826 "constellation": {
827 "id": str(membership.constellation.id),
828 "title": membership.constellation.name,
829 "description": membership.constellation.description,
830 },
831 "role": membership.access.value.lower(),
832 "grantedAt": datetime.now().isoformat(), # Placeholder - would need created_at field
833 "grantedBy": "system", # Placeholder - would need granted_by field
834 "lastAccessed": None, # Placeholder - would need last_accessed field
835 "permissions": {
836 "canView": True, # All roles can view
837 "canEdit": membership.access
838 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER],
839 "canDelete": membership.access
840 in [AccessEnum.ADMIN, AccessEnum.OWNER],
841 "canShare": membership.access
842 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER],
843 "canManageUsers": membership.access
844 in [AccessEnum.ADMIN, AccessEnum.OWNER],
845 },
846 }
847 )
849 return {
850 "permissions": permissions_data,
851 "total": total,
852 }
855def update_permission_role(
856 session: Session, permission_id: str, new_role: str
857) -> Dict[str, Any]:
858 """Update user role for a constellation"""
860 try:
861 # Parse permission_id (format: "constellation_id_user_id")
862 parts = permission_id.split("_", 1)
863 if len(parts) != 2:
864 return {"success": False, "message": "Invalid permission ID format"}
866 constellation_id, user_id = parts
868 # Validate role
869 try:
870 role_enum = AccessEnum(new_role.upper())
871 except ValueError:
872 return {"success": False, "message": f"Invalid role: {new_role}"}
874 # Find the membership
875 membership = (
876 session.query(ConstellationMembership)
877 .filter(
878 ConstellationMembership.constellation_id == constellation_id,
879 ConstellationMembership.user_id == user_id,
880 )
881 .first()
882 )
884 if not membership:
885 return {"success": False, "message": "Permission not found"}
887 # Update the role
888 old_role = membership.access.value.lower()
889 membership.access = role_enum
890 session.add(membership)
891 session.commit()
892 session.refresh(membership)
894 return {
895 "success": True,
896 "message": f"Role updated from {old_role} to {new_role.lower()}",
897 "permission": {
898 "id": permission_id,
899 "role": new_role.lower(),
900 "permissions": {
901 "canView": True,
902 "canEdit": role_enum
903 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER],
904 "canDelete": role_enum in [AccessEnum.ADMIN, AccessEnum.OWNER],
905 "canShare": role_enum
906 in [AccessEnum.WRITE, AccessEnum.ADMIN, AccessEnum.OWNER],
907 "canManageUsers": role_enum in [AccessEnum.ADMIN, AccessEnum.OWNER],
908 },
909 },
910 }
912 except Exception as e:
913 session.rollback()
914 return {"success": False, "message": f"Error updating role: {str(e)}"}
917def revoke_permission(session: Session, permission_id: str) -> Dict[str, Any]:
918 """Revoke user permission from a constellation"""
920 try:
921 # Parse permission_id (format: "constellation_id_user_id")
922 parts = permission_id.split("_", 1)
923 if len(parts) != 2:
924 return {"success": False, "message": "Invalid permission ID format"}
926 constellation_id, user_id = parts
928 # Find the membership
929 membership = (
930 session.query(ConstellationMembership)
931 .filter(
932 ConstellationMembership.constellation_id == constellation_id,
933 ConstellationMembership.user_id == user_id,
934 )
935 .first()
936 )
938 if not membership:
939 return {"success": False, "message": "Permission not found"}
941 # Don't allow revoking OWNER permissions
942 if membership.access == AccessEnum.OWNER:
943 return {
944 "success": False,
945 "message": "Cannot revoke owner permissions. Transfer ownership first.",
946 }
948 # Delete the membership
949 session.delete(membership)
950 session.commit()
952 return {"success": True, "message": "Permission revoked successfully"}
954 except Exception as e:
955 session.rollback()
956 return {"success": False, "message": f"Error revoking permission: {str(e)}"}