Coverage for app/upgrade.py: 88%
57 statements
« prev ^ index » next coverage.py v7.9.2, created at 2026-02-19 12:47 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2026-02-19 12:47 +0000
1from __future__ import annotations
3import importlib
4import pkgutil
5from dataclasses import dataclass
6from typing import Callable, List
8from neo4j import GraphDatabase, basic_auth, ManagedTransaction
9from loguru import logger
11from app.config.config import NEO4J_HOST, NEO4J_PORT, NEO4J_USER, NEO4J_PASSWORD
14@dataclass(frozen=True)
15class Migration:
16 id: int
17 description: str
18 upgrade: Callable[[ManagedTransaction], None] # (tx) -> None
21def _discover_migrations() -> List[Migration]:
22 """Discover migrations in app.updates package."""
23 import app.updates as updates_pkg
25 migrations: List[Migration] = []
26 for modinfo in pkgutil.iter_modules(updates_pkg.__path__):
27 name = modinfo.name
28 if not name.startswith("_"):
29 continue
31 module = importlib.import_module(f"{updates_pkg.__name__}.{name}")
33 mid = getattr(module, "ID", None)
34 desc = getattr(module, "DESCRIPTION", "")
35 upgrade = getattr(module, "upgrade", None)
37 if not isinstance(mid, int) or upgrade is None:
38 logger.warning(f"Skip {name}: missing ID or upgrade()")
39 continue
41 migrations.append(Migration(id=mid, description=desc, upgrade=upgrade))
43 migrations.sort(key=lambda m: m.id)
44 return migrations
47def _get_or_init_schema_version(tx: ManagedTransaction) -> int:
48 """
49 Ensure the Meta schema version node exists.
50 Returns the current version.
51 """
52 record = tx.run("""
53 MERGE (m:__sys__ {kind: "meta", key: "schema_version"})
54 ON CREATE SET m.value = 0, m.created_at = datetime()
55 RETURN m.value AS version
56 """).single()
57 return int(record["version"] if record and record["version"] is not None else 0)
60def _set_schema_version(tx: ManagedTransaction, version: int) -> None:
61 tx.run("""
62 MATCH (m:__sys__ {kind: "meta", key: "schema_version"})
63 SET m.value = $version, m.updated_at = datetime()
64 """, version=version)
67def run_updates() -> None:
68 driver = GraphDatabase.driver(
69 f"neo4j://{NEO4J_HOST}:{NEO4J_PORT}",
70 auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD),
71 )
73 migrations = _discover_migrations()
74 if not migrations:
75 logger.info("No migrations found.")
76 driver.close()
77 return
79 logger.info(f"Found {len(migrations)} migrations: {[m.id for m in migrations]}")
81 with driver.session() as session:
82 # 1) get current version (inside a write tx so MERGE is safe)
83 current_version = session.execute_write(_get_or_init_schema_version)
84 logger.info(f"Current schema_version = {current_version}")
86 # 2) apply pending migrations
87 for m in migrations:
88 if m.id <= current_version:
89 continue
91 logger.info(f"Applying migration {m.id:03d}: {m.description}")
93 # run migration in its own transaction
94 def _run_one(tx: ManagedTransaction) -> None:
95 m.upgrade(tx)
97 session.execute_write(_run_one)
99 # update version ONLY after success
100 session.execute_write(_set_schema_version, m.id)
102 logger.info(f"Migration {m.id:03d} applied successfully")
104 driver.close()
105 logger.info("All updates completed and driver closed.")