Coverage for app/upgrade.py: 88%

57 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2026-02-19 12:47 +0000

1from __future__ import annotations 

2 

3import importlib 

4import pkgutil 

5from dataclasses import dataclass 

6from typing import Callable, List 

7 

8from neo4j import GraphDatabase, basic_auth, ManagedTransaction 

9from loguru import logger 

10 

11from app.config.config import NEO4J_HOST, NEO4J_PORT, NEO4J_USER, NEO4J_PASSWORD 

12 

13 

14@dataclass(frozen=True) 

15class Migration: 

16 id: int 

17 description: str 

18 upgrade: Callable[[ManagedTransaction], None] # (tx) -> None 

19 

20 

21def _discover_migrations() -> List[Migration]: 

22 """Discover migrations in app.updates package.""" 

23 import app.updates as updates_pkg 

24 

25 migrations: List[Migration] = [] 

26 for modinfo in pkgutil.iter_modules(updates_pkg.__path__): 

27 name = modinfo.name 

28 if not name.startswith("_"): 

29 continue 

30 

31 module = importlib.import_module(f"{updates_pkg.__name__}.{name}") 

32 

33 mid = getattr(module, "ID", None) 

34 desc = getattr(module, "DESCRIPTION", "") 

35 upgrade = getattr(module, "upgrade", None) 

36 

37 if not isinstance(mid, int) or upgrade is None: 

38 logger.warning(f"Skip {name}: missing ID or upgrade()") 

39 continue 

40 

41 migrations.append(Migration(id=mid, description=desc, upgrade=upgrade)) 

42 

43 migrations.sort(key=lambda m: m.id) 

44 return migrations 

45 

46 

47def _get_or_init_schema_version(tx: ManagedTransaction) -> int: 

48 """ 

49 Ensure the Meta schema version node exists. 

50 Returns the current version. 

51 """ 

52 record = tx.run(""" 

53 MERGE (m:__sys__ {kind: "meta", key: "schema_version"}) 

54 ON CREATE SET m.value = 0, m.created_at = datetime() 

55 RETURN m.value AS version 

56 """).single() 

57 return int(record["version"] if record and record["version"] is not None else 0) 

58 

59 

60def _set_schema_version(tx: ManagedTransaction, version: int) -> None: 

61 tx.run(""" 

62 MATCH (m:__sys__ {kind: "meta", key: "schema_version"}) 

63 SET m.value = $version, m.updated_at = datetime() 

64 """, version=version) 

65 

66 

67def run_updates() -> None: 

68 driver = GraphDatabase.driver( 

69 f"neo4j://{NEO4J_HOST}:{NEO4J_PORT}", 

70 auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD), 

71 ) 

72 

73 migrations = _discover_migrations() 

74 if not migrations: 

75 logger.info("No migrations found.") 

76 driver.close() 

77 return 

78 

79 logger.info(f"Found {len(migrations)} migrations: {[m.id for m in migrations]}") 

80 

81 with driver.session() as session: 

82 # 1) get current version (inside a write tx so MERGE is safe) 

83 current_version = session.execute_write(_get_or_init_schema_version) 

84 logger.info(f"Current schema_version = {current_version}") 

85 

86 # 2) apply pending migrations 

87 for m in migrations: 

88 if m.id <= current_version: 

89 continue 

90 

91 logger.info(f"Applying migration {m.id:03d}: {m.description}") 

92 

93 # run migration in its own transaction 

94 def _run_one(tx: ManagedTransaction) -> None: 

95 m.upgrade(tx) 

96 

97 session.execute_write(_run_one) 

98 

99 # update version ONLY after success 

100 session.execute_write(_set_schema_version, m.id) 

101 

102 logger.info(f"Migration {m.id:03d} applied successfully") 

103 

104 driver.close() 

105 logger.info("All updates completed and driver closed.")