|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | from collections.abc import Iterable |
4 | | -from typing import cast |
| 4 | +from typing import Any, cast |
5 | 5 |
|
6 | 6 | from hyperbase.constants import ATOM_ENCODE_TABLE |
7 | 7 | from hyperbase.hyperedge import Atom, Hyperedge, UniqueAtom |
@@ -63,11 +63,59 @@ def split_edge_str(edge_str: str) -> tuple[str, ...]: |
63 | 63 | return tuple(tokens) |
64 | 64 |
|
65 | 65 |
|
66 | | -def _parsed_token(token: str) -> Hyperedge: |
67 | | - if _edge_str_has_outer_parens(token): |
68 | | - return hedge(token) |
69 | | - else: |
70 | | - return Atom(token) |
| 66 | +def _hedge_from_str(source: str) -> Hyperedge: |
| 67 | + """Iteratively parse an edge string into a Hyperedge. |
| 68 | +
|
| 69 | + Uses an explicit stack rather than recursion so that pathologically |
| 70 | + nested edge strings cannot exhaust Python's call stack. Each frame in |
| 71 | + the stack represents one open ``(...)`` group being assembled and |
| 72 | + holds: ``[parens_flag, tokens, next_token_index, children_built]``. |
| 73 | + """ |
| 74 | + edge_str = source.strip().replace("\n", " ") |
| 75 | + parens = _edge_str_has_outer_parens(edge_str) |
| 76 | + inner = edge_str[1:-1] if parens else edge_str |
| 77 | + |
| 78 | + tokens = split_edge_str(inner) |
| 79 | + if not tokens: |
| 80 | + raise ValueError(f"Edge string is empty: '{source}'") |
| 81 | + |
| 82 | + stack: list[list[Any]] = [[parens, tokens, 0, []]] |
| 83 | + final: Hyperedge | None = None |
| 84 | + |
| 85 | + while stack: |
| 86 | + frame = stack[-1] |
| 87 | + if frame[2] >= len(frame[1]): |
| 88 | + # All tokens for this frame consumed; build the edge. |
| 89 | + children: list[Hyperedge] = frame[3] |
| 90 | + frame_parens: bool = frame[0] |
| 91 | + if len(children) == 1 and isinstance(children[0], Atom): |
| 92 | + built: Hyperedge = Atom(str(children[0]), frame_parens) |
| 93 | + elif children: |
| 94 | + built = Hyperedge(tuple(children)) |
| 95 | + else: |
| 96 | + # Unreachable: empty token lists are rejected before push, |
| 97 | + # but keep the guard for defensiveness. |
| 98 | + raise ValueError(f"Edge string is empty: '{source}'") |
| 99 | + stack.pop() |
| 100 | + if stack: |
| 101 | + stack[-1][3].append(built) |
| 102 | + else: |
| 103 | + final = built |
| 104 | + continue |
| 105 | + |
| 106 | + token = frame[1][frame[2]] |
| 107 | + frame[2] += 1 |
| 108 | + if _edge_str_has_outer_parens(token): |
| 109 | + inner_tok = token[1:-1] |
| 110 | + sub_tokens = split_edge_str(inner_tok) |
| 111 | + if not sub_tokens: |
| 112 | + raise ValueError(f"Edge string is empty: '{token}'") |
| 113 | + stack.append([True, sub_tokens, 0, []]) |
| 114 | + else: |
| 115 | + frame[3].append(Atom(token)) |
| 116 | + |
| 117 | + assert final is not None # loop guarantees this |
| 118 | + return final |
71 | 119 |
|
72 | 120 |
|
73 | 121 | def _collect_positions(tok_pos: Hyperedge) -> list[int]: |
@@ -121,23 +169,7 @@ def hedge( |
121 | 169 | _source = cast(Iterable, source) |
122 | 170 | return Hyperedge(tuple(hedge(item) for item in _source)) |
123 | 171 | elif type(source) is str: |
124 | | - edge_str = source.strip().replace("\n", " ") |
125 | | - edge_inner_str = edge_str |
126 | | - |
127 | | - parens = _edge_str_has_outer_parens(edge_str) |
128 | | - if parens: |
129 | | - edge_inner_str = edge_str[1:-1] |
130 | | - |
131 | | - tokens = split_edge_str(edge_inner_str) |
132 | | - if not tokens: |
133 | | - raise ValueError(f"Edge string is empty: '{source}'") |
134 | | - edges = tuple(_parsed_token(token) for token in tokens) |
135 | | - if len(edges) == 1 and isinstance(edges[0], Atom): |
136 | | - return Atom(str(edges[0]), parens) |
137 | | - elif len(edges) > 0: |
138 | | - return Hyperedge(edges) |
139 | | - else: |
140 | | - raise ValueError(f"Edge string is empty: '{source}'") |
| 172 | + return _hedge_from_str(source) |
141 | 173 | elif type(source) in {Hyperedge, Atom, UniqueAtom}: |
142 | 174 | return source # type: ignore |
143 | 175 | else: |
|
0 commit comments