Coverage for src/cstlcore/ydocs/decode_ydoc.py: 7%

83 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1# import base64 

2from pycrdt import Doc, XmlFragment, XmlElement, XmlText 

3from pycrdt._pycrdt import XmlText as IXmlText, XmlElement as IXmlElement, XmlFragment as IXmlFragment 

4from loguru import logger 

5 

6def get_plain_text_from_xmltext(element: XmlText) -> str: 

7 try: 

8 out_parts: list[str] = [] 

9 for chunk, attrs in element.diff(): 

10 # 1) Plain string chunk 

11 if isinstance(chunk, str): 

12 out_parts.append(chunk) 

13 continue 

14 

15 # 2) Wrap integrated types returned by diff() 

16 doc_obj = element.doc # type: ignore[attr-defined] 

17 wrapped_doc: XmlElement | XmlText | XmlFragment | None = None 

18 if isinstance(chunk, IXmlText): 

19 wrapped_doc = XmlText(_doc=doc_obj, _integrated=chunk) 

20 if isinstance(chunk, IXmlElement): 

21 wrapped_doc = XmlElement(_doc=doc_obj, _integrated=chunk) 

22 if isinstance(chunk, IXmlFragment): 

23 wrapped_doc = XmlFragment(_doc=doc_obj, _integrated=chunk) 

24 

25 if wrapped_doc is not None: 

26 # 2a) embed XmlElement : Plate mention = tag/type + attr value 

27 if isinstance(wrapped_doc, XmlElement): 

28 tag = (wrapped_doc.tag or "").lower() 

29 t = wrapped_doc.attributes.get("type") 

30 is_mention = ("mention" in tag) or (t in ("mention", "mention_inline", "mention_input")) 

31 if is_mention: 

32 val = ( 

33 wrapped_doc.attributes.get("value") 

34 or wrapped_doc.attributes.get("label") 

35 or wrapped_doc.attributes.get("name") 

36 ) 

37 out_parts.append(f"{val}" if isinstance(val, str) and val else "") 

38 continue 

39 # Else: Text of its children 

40 out_parts.append("".join(get_plain_text_content(children) for children in wrapped_doc.children)) 

41 continue 

42 

43 # 2b) embed XmlText : sometimes the mention is encoded in the chunk's attrs 

44 elif isinstance(wrapped_doc, XmlText): 

45 t = wrapped_doc.attributes.get("type") 

46 if t in ("mention", "mention_inline", "mention_input"): 

47 val = ( 

48 wrapped_doc.attributes.get("value") 

49 or wrapped_doc.attributes.get("label") 

50 or wrapped_doc.attributes.get("name") 

51 ) 

52 out_parts.append(f"{val}" if isinstance(val, str) and val else "") 

53 continue 

54 # Else: its text (may contain other embeds) 

55 out_parts.append(get_plain_text_content(wrapped_doc)) 

56 continue 

57 

58 # 2c) embed Fragment 

59 else: 

60 out_parts.append("".join(get_plain_text_content(children) for children in wrapped_doc.children)) 

61 continue 

62 

63 # 3) fallback: if attrs dict exists and contains the mention 

64 if isinstance(attrs, dict): 

65 t = attrs.get("type") 

66 if t in ("mention", "mention_inline", "mention_input"): 

67 v = attrs.get("value") or attrs.get("label") or attrs.get("name") 

68 out_parts.append(f"{v}" if isinstance(v, str) and v else "") 

69 continue 

70 

71 # 4) Unknown -> ignore (or placeholder) 

72 out_parts.append("") 

73 

74 return "".join(out_parts) 

75 except Exception: 

76 # Fallback in case of error during diff processing 

77 return str(element) 

78 

79def get_plain_text_content(element: XmlElement | XmlText | XmlFragment | None) -> str: 

80 if isinstance(element, XmlText): 

81 return get_plain_text_from_xmltext(element) 

82 elif isinstance(element, XmlElement): 

83 return ''.join(get_plain_text_content(child) for child in element.children) 

84 elif isinstance(element, XmlFragment): 

85 return '\n'.join(get_plain_text_content(child) for child in element.children) 

86 return "" 

87 

88def decode_ydoc(base64_update: str, text_field: str = 'content') -> str: 

89 # In postgres we store the decoded bytes directly, So we skip the first step 

90 # # 1) Base64 → bytes 

91 # try: 

92 # update = base64.b64decode(base64_update) 

93 # except BaseException as e: 

94 # logger.error(f"Base64 decoding failed: {e}") 

95 # return base64_update 

96 

97 # 2) Creation of the document and application of the update 

98 doc = Doc() 

99 try: 

100 doc.apply_update(base64_update) 

101 except BaseException as e: 

102 logger.error(f"apply_update failed: {e}") 

103 return base64_update 

104 

105 # 3) List of available keys (roots) 

106 keys = list(doc.keys()) 

107 

108 found_texts = {} 

109 # 4) Extract XmlFragment for each key 

110 for key in keys: 

111 try: 

112 crdt = doc.get(key, type=XmlFragment) 

113 if isinstance(crdt, XmlFragment): 

114 # If XmlFragment, look for Text inside 

115 text_content = "" 

116 

117 text_content = get_plain_text_content(crdt) 

118 

119 # logger.debug(f"Found text for key '{key}': {text_content}") 

120 

121 found_texts[key] = text_content 

122 else: 

123 raise TypeError(f"Expected XmlFragment, got {type(crdt).__name__}") 

124 except BaseException as e: 

125 logger.error(f"doc.get('{key}') raised: {e}") 

126 

127 # 5) Prioritize the requested text_field 

128 if text_field in found_texts: 

129 return found_texts[text_field] 

130 

131 # 6) Fallback to the first Text found 

132 if found_texts: 

133 first_key = next(iter(found_texts)) 

134 return found_texts[first_key] 

135 

136 logger.warning("No Text root found, returning original content") 

137 return base64_update