Coverage for src/cstlcore/ydocs/decode_ydoc.py: 7%
83 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
1# import base64
2from pycrdt import Doc, XmlFragment, XmlElement, XmlText
3from pycrdt._pycrdt import XmlText as IXmlText, XmlElement as IXmlElement, XmlFragment as IXmlFragment
4from loguru import logger
6def get_plain_text_from_xmltext(element: XmlText) -> str:
7 try:
8 out_parts: list[str] = []
9 for chunk, attrs in element.diff():
10 # 1) Plain string chunk
11 if isinstance(chunk, str):
12 out_parts.append(chunk)
13 continue
15 # 2) Wrap integrated types returned by diff()
16 doc_obj = element.doc # type: ignore[attr-defined]
17 wrapped_doc: XmlElement | XmlText | XmlFragment | None = None
18 if isinstance(chunk, IXmlText):
19 wrapped_doc = XmlText(_doc=doc_obj, _integrated=chunk)
20 if isinstance(chunk, IXmlElement):
21 wrapped_doc = XmlElement(_doc=doc_obj, _integrated=chunk)
22 if isinstance(chunk, IXmlFragment):
23 wrapped_doc = XmlFragment(_doc=doc_obj, _integrated=chunk)
25 if wrapped_doc is not None:
26 # 2a) embed XmlElement : Plate mention = tag/type + attr value
27 if isinstance(wrapped_doc, XmlElement):
28 tag = (wrapped_doc.tag or "").lower()
29 t = wrapped_doc.attributes.get("type")
30 is_mention = ("mention" in tag) or (t in ("mention", "mention_inline", "mention_input"))
31 if is_mention:
32 val = (
33 wrapped_doc.attributes.get("value")
34 or wrapped_doc.attributes.get("label")
35 or wrapped_doc.attributes.get("name")
36 )
37 out_parts.append(f"{val}" if isinstance(val, str) and val else "")
38 continue
39 # Else: Text of its children
40 out_parts.append("".join(get_plain_text_content(children) for children in wrapped_doc.children))
41 continue
43 # 2b) embed XmlText : sometimes the mention is encoded in the chunk's attrs
44 elif isinstance(wrapped_doc, XmlText):
45 t = wrapped_doc.attributes.get("type")
46 if t in ("mention", "mention_inline", "mention_input"):
47 val = (
48 wrapped_doc.attributes.get("value")
49 or wrapped_doc.attributes.get("label")
50 or wrapped_doc.attributes.get("name")
51 )
52 out_parts.append(f"{val}" if isinstance(val, str) and val else "")
53 continue
54 # Else: its text (may contain other embeds)
55 out_parts.append(get_plain_text_content(wrapped_doc))
56 continue
58 # 2c) embed Fragment
59 else:
60 out_parts.append("".join(get_plain_text_content(children) for children in wrapped_doc.children))
61 continue
63 # 3) fallback: if attrs dict exists and contains the mention
64 if isinstance(attrs, dict):
65 t = attrs.get("type")
66 if t in ("mention", "mention_inline", "mention_input"):
67 v = attrs.get("value") or attrs.get("label") or attrs.get("name")
68 out_parts.append(f"{v}" if isinstance(v, str) and v else "")
69 continue
71 # 4) Unknown -> ignore (or placeholder)
72 out_parts.append("")
74 return "".join(out_parts)
75 except Exception:
76 # Fallback in case of error during diff processing
77 return str(element)
79def get_plain_text_content(element: XmlElement | XmlText | XmlFragment | None) -> str:
80 if isinstance(element, XmlText):
81 return get_plain_text_from_xmltext(element)
82 elif isinstance(element, XmlElement):
83 return ''.join(get_plain_text_content(child) for child in element.children)
84 elif isinstance(element, XmlFragment):
85 return '\n'.join(get_plain_text_content(child) for child in element.children)
86 return ""
88def decode_ydoc(base64_update: str, text_field: str = 'content') -> str:
89 # In postgres we store the decoded bytes directly, So we skip the first step
90 # # 1) Base64 → bytes
91 # try:
92 # update = base64.b64decode(base64_update)
93 # except BaseException as e:
94 # logger.error(f"Base64 decoding failed: {e}")
95 # return base64_update
97 # 2) Creation of the document and application of the update
98 doc = Doc()
99 try:
100 doc.apply_update(base64_update)
101 except BaseException as e:
102 logger.error(f"apply_update failed: {e}")
103 return base64_update
105 # 3) List of available keys (roots)
106 keys = list(doc.keys())
108 found_texts = {}
109 # 4) Extract XmlFragment for each key
110 for key in keys:
111 try:
112 crdt = doc.get(key, type=XmlFragment)
113 if isinstance(crdt, XmlFragment):
114 # If XmlFragment, look for Text inside
115 text_content = ""
117 text_content = get_plain_text_content(crdt)
119 # logger.debug(f"Found text for key '{key}': {text_content}")
121 found_texts[key] = text_content
122 else:
123 raise TypeError(f"Expected XmlFragment, got {type(crdt).__name__}")
124 except BaseException as e:
125 logger.error(f"doc.get('{key}') raised: {e}")
127 # 5) Prioritize the requested text_field
128 if text_field in found_texts:
129 return found_texts[text_field]
131 # 6) Fallback to the first Text found
132 if found_texts:
133 first_key = next(iter(found_texts))
134 return found_texts[first_key]
136 logger.warning("No Text root found, returning original content")
137 return base64_update