Skip to content

Commit d664609

Browse files
committed
refactor(IncrementalGraph): use Subsequent Result nodes to reduce mutation
Replicates graphql/graphql-js@3bbbb08
1 parent 63f4a68 commit d664609

File tree

3 files changed

+149
-75
lines changed

3 files changed

+149
-75
lines changed

src/graphql/execution/incremental_graph.py

Lines changed: 140 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
Generator,
1212
Iterable,
1313
Sequence,
14+
Union,
1415
cast,
1516
)
1617

1718
from graphql.execution.types import (
18-
is_deferred_fragment_record,
19+
SubsequentResultRecord,
1920
is_deferred_grouped_field_set_record,
2021
)
2122

@@ -30,20 +31,68 @@
3031
ReconcilableDeferredGroupedFieldSetResult,
3132
StreamItemsRecord,
3233
StreamItemsResult,
33-
SubsequentResultRecord,
3434
)
3535

36+
try:
37+
from typing import TypeGuard
38+
except ImportError: # Python < 3.10
39+
from typing_extensions import TypeGuard
40+
3641
__all__ = ["IncrementalGraph"]
3742

3843

44+
class DeferredFragmentNode:
45+
"""A node representing a deferred fragment in the incremental graph."""
46+
47+
__slots__ = (
48+
"children",
49+
"deferred_fragment_record",
50+
"expected_reconcilable_results",
51+
"reconcilable_results",
52+
"results",
53+
)
54+
55+
deferred_fragment_record: DeferredFragmentRecord
56+
expected_reconcilable_results: int
57+
results: list[DeferredGroupedFieldSetResult]
58+
reconcilable_results: list[ReconcilableDeferredGroupedFieldSetResult]
59+
children: list[DeferredFragmentNode]
60+
61+
def __init__(self, deferred_fragment_record: DeferredFragmentRecord) -> None:
62+
"""Initialize the DeferredFragmentNode."""
63+
self.deferred_fragment_record = deferred_fragment_record
64+
self.expected_reconcilable_results = 0
65+
self.results = []
66+
self.reconcilable_results = []
67+
self.children = []
68+
69+
70+
SubsequentResultNode = Union[DeferredFragmentNode, SubsequentResultRecord]
71+
72+
73+
def is_deferred_fragment_node(
74+
node: DeferredFragmentNode | None,
75+
) -> TypeGuard[DeferredFragmentNode]:
76+
"""Check whether the given node is a deferred fragment node."""
77+
return isinstance(node, DeferredFragmentNode)
78+
79+
80+
def is_stream_node(
81+
node: SubsequentResultNode | None,
82+
) -> TypeGuard[SubsequentResultRecord]:
83+
"""Check whether the given result node is a stream node."""
84+
return isinstance(node, SubsequentResultRecord)
85+
86+
3987
class IncrementalGraph:
4088
"""Helper class to execute incremental Graphs.
4189
4290
For internal use only.
4391
"""
4492

45-
_pending: dict[SubsequentResultRecord, None]
46-
_new_pending: dict[SubsequentResultRecord, None]
93+
_pending: dict[SubsequentResultNode, None]
94+
_deferred_fragment_nodes: dict[DeferredFragmentRecord, DeferredFragmentNode]
95+
_new_pending: dict[SubsequentResultNode, None]
4796
_completed_queue: list[IncrementalDataRecordResult]
4897
_next_queue: list[Future[Iterable[IncrementalDataRecordResult]]]
4998

@@ -52,6 +101,7 @@ class IncrementalGraph:
52101
def __init__(self) -> None:
53102
"""Initialize the IncrementalGraph."""
54103
self._pending = {}
104+
self._deferred_fragment_nodes = {}
55105
self._new_pending = {}
56106
self._completed_queue = []
57107
self._next_queue = []
@@ -66,8 +116,10 @@ def add_incremental_data_records(
66116
for deferred_fragment_record in (
67117
incremental_data_record.deferred_fragment_records
68118
): # pragma: no branch
69-
deferred_fragment_record.expected_reconcilable_results += 1
70-
self._add_deferred_fragment_record(deferred_fragment_record)
119+
deferred_fragment_node = self._add_deferred_fragment_node(
120+
deferred_fragment_record
121+
)
122+
deferred_fragment_node.expected_reconcilable_results += 1
71123

72124
deferred_result = incremental_data_record.result
73125
if is_awaitable(deferred_result):
@@ -103,6 +155,20 @@ async def enqueue_stream(
103155
else:
104156
self._enqueue(stream_result) # type: ignore
105157

158+
def add_completed_reconcilable_deferred_grouped_field_set(
159+
self, reconcilable_result: ReconcilableDeferredGroupedFieldSetResult
160+
) -> None:
161+
"""Add a completed reconcilable deferred grouped field set result."""
162+
deferred_fragment_nodes = filter(
163+
is_deferred_fragment_node,
164+
map(
165+
self._deferred_fragment_nodes.get,
166+
reconcilable_result.deferred_fragment_records,
167+
),
168+
)
169+
for deferred_fragment_node in deferred_fragment_nodes:
170+
deferred_fragment_node.reconcilable_results.append(reconcilable_result)
171+
106172
def get_new_pending(self) -> list[SubsequentResultRecord]:
107173
"""Get new pending subsequent result records."""
108174
_pending, _new_pending = self._pending, self._new_pending
@@ -113,17 +179,16 @@ def get_new_pending(self) -> list[SubsequentResultRecord]:
113179
add_iteration = iterate.append
114180
while iterate:
115181
node = iterate.pop(0)
116-
if is_deferred_fragment_record(node):
117-
if node.expected_reconcilable_results:
118-
_pending[node] = None
119-
add_result(node)
120-
continue
121-
for child in node.children:
122-
_new_pending[child] = None
123-
add_iteration(child)
124-
else:
182+
if is_stream_node(node):
125183
_pending[node] = None
126184
add_result(node)
185+
elif node.expected_reconcilable_results: # type: ignore
186+
_pending[node] = None
187+
add_result(node.deferred_fragment_record) # type: ignore
188+
else:
189+
for child in node.children: # type: ignore
190+
_new_pending[child] = None
191+
add_iteration(child)
127192
_new_pending.clear()
128193
return new_pending
129194

@@ -152,53 +217,87 @@ def complete_deferred_fragment(
152217
deferred_fragment_record: DeferredFragmentRecord,
153218
) -> list[ReconcilableDeferredGroupedFieldSetResult] | None:
154219
"""Complete a deferred fragment."""
155-
reconcilable_results = deferred_fragment_record.reconcilable_results
156-
if deferred_fragment_record.expected_reconcilable_results != len(
220+
try:
221+
deferred_fragment_node = self._deferred_fragment_nodes[
222+
deferred_fragment_record
223+
]
224+
except KeyError: # pragma: no cover
225+
return None
226+
reconcilable_results = deferred_fragment_node.reconcilable_results
227+
if deferred_fragment_node.expected_reconcilable_results != len(
157228
reconcilable_results
158229
):
159230
return None
160-
self.remove_subsequent_result_record(deferred_fragment_record)
231+
self._remove_pending(deferred_fragment_node)
161232
new_pending = self._new_pending
162-
for child in deferred_fragment_record.children:
233+
for child in deferred_fragment_node.children:
163234
new_pending[child] = None
164235
for result in child.results:
165236
self._enqueue(result)
166237
return reconcilable_results
167238

168-
def remove_subsequent_result_record(
239+
def remove_deferred_fragment(
169240
self,
170-
subsequent_result_record: SubsequentResultRecord,
241+
deferred_fragment_record: DeferredFragmentRecord,
171242
) -> None:
172-
"""Remove a subsequent result record as no longer pending."""
173-
del self._pending[subsequent_result_record]
243+
"""Remove a deferred fragment."""
244+
try:
245+
deferred_fragment_node = self._deferred_fragment_nodes[
246+
deferred_fragment_record
247+
]
248+
except KeyError: # pragma: no cover
249+
return
250+
self._remove_pending(deferred_fragment_node)
251+
for child in deferred_fragment_node.children: # pragma: no cover
252+
self.remove_deferred_fragment(child.deferred_fragment_record)
253+
254+
def remove_stream(self, stream_record: SubsequentResultRecord) -> None:
255+
"""Remove a stream record as no longer pending."""
256+
self._remove_pending(stream_record)
257+
258+
def _remove_pending(self, subsequent_result_node: SubsequentResultNode) -> None:
259+
"""Remove a subsequent result node as no longer pending."""
260+
del self._pending[subsequent_result_node]
174261
if not self._pending:
175262
self.stop_incremental_data()
176263

177-
def _add_deferred_fragment_record(
264+
def _add_deferred_fragment_node(
178265
self, deferred_fragment_record: DeferredFragmentRecord
179-
) -> None:
180-
"""Add deferred fragment record."""
181-
parent = deferred_fragment_record.parent
182-
if parent is None:
183-
if deferred_fragment_record.id is not None:
184-
return
185-
self._new_pending[deferred_fragment_record] = None
186-
return
187-
if deferred_fragment_record in parent.children:
188-
return
189-
parent.children[deferred_fragment_record] = None
190-
self._add_deferred_fragment_record(parent)
266+
) -> DeferredFragmentNode:
267+
"""Add a deferred fragment node."""
268+
try:
269+
deferred_fragment_node = self._deferred_fragment_nodes[
270+
deferred_fragment_record
271+
]
272+
except KeyError:
273+
deferred_fragment_node = DeferredFragmentNode(deferred_fragment_record)
274+
self._deferred_fragment_nodes[deferred_fragment_record] = (
275+
deferred_fragment_node
276+
)
277+
parent = deferred_fragment_record.parent
278+
if parent is None:
279+
self._new_pending[deferred_fragment_node] = None
280+
else:
281+
parent_node = self._add_deferred_fragment_node(parent)
282+
parent_node.children.append(deferred_fragment_node)
283+
return deferred_fragment_node
191284

192285
def _enqueue_completed_deferred_grouped_field_set(
193286
self, result: DeferredGroupedFieldSetResult
194287
) -> None:
195288
"""Enqueue completed deferred grouped field set result."""
196-
has_pending_parent = False
289+
is_pending = False
197290
for deferred_fragment_record in result.deferred_fragment_records:
198-
if deferred_fragment_record.id is not None:
199-
has_pending_parent = True
200-
deferred_fragment_record.results.append(result)
201-
if has_pending_parent:
291+
try:
292+
deferred_fragment_node = self._deferred_fragment_nodes[
293+
deferred_fragment_record
294+
]
295+
except KeyError: # pragma: no cover
296+
continue
297+
if deferred_fragment_node in self._pending:
298+
is_pending = True
299+
deferred_fragment_node.results.append(result)
300+
if is_pending:
202301
self._enqueue(result)
203302

204303
def _add_task(self, awaitable: Awaitable[Any]) -> None:

src/graphql/execution/incremental_publisher.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def _handle_completed_deferred_grouped_field_set(
218218
if is_non_reconcilable_deferred_grouped_field_set_result(
219219
deferred_grouped_field_set_result
220220
):
221-
remove_subsequent = self._incremental_graph.remove_subsequent_result_record
221+
remove_deferred = self._incremental_graph.remove_deferred_fragment
222222
for deferred_fragment_record in (
223223
deferred_grouped_field_set_result.deferred_fragment_records
224224
): # pragma: no branch
@@ -227,19 +227,16 @@ def _handle_completed_deferred_grouped_field_set(
227227
append_completed(
228228
CompletedResult(id_, deferred_grouped_field_set_result.errors)
229229
)
230-
231-
remove_subsequent(deferred_fragment_record)
230+
remove_deferred(deferred_fragment_record)
232231
return
232+
233233
deferred_grouped_field_set_result = cast(
234234
"ReconcilableDeferredGroupedFieldSetResult",
235235
deferred_grouped_field_set_result,
236236
)
237-
for deferred_fragment_record in (
238-
deferred_grouped_field_set_result.deferred_fragment_records
239-
): # pragma: no branch
240-
deferred_fragment_record.reconcilable_results.append(
241-
deferred_grouped_field_set_result
242-
)
237+
self._incremental_graph.add_completed_reconcilable_deferred_grouped_field_set(
238+
deferred_grouped_field_set_result
239+
)
243240
incremental_data_records = (
244241
deferred_grouped_field_set_result.incremental_data_records
245242
)
@@ -288,7 +285,7 @@ async def _handle_completed_stream_items(
288285
incremental_graph = self._incremental_graph
289286
if stream_items_result.errors is not None:
290287
context.completed.append(CompletedResult(id_, stream_items_result.errors))
291-
incremental_graph.remove_subsequent_result_record(stream_record)
288+
incremental_graph.remove_stream(stream_record)
292289
if is_cancellable_stream_record(stream_record):
293290
cancellable_streams = self._context.cancellable_streams
294291
if cancellable_streams: # pragma: no branch
@@ -297,7 +294,7 @@ async def _handle_completed_stream_items(
297294
await stream_record.early_return
298295
elif stream_items_result.result is None:
299296
context.completed.append(CompletedResult(id_))
300-
incremental_graph.remove_subsequent_result_record(stream_record)
297+
incremental_graph.remove_stream(stream_record)
301298
if is_cancellable_stream_record(stream_record):
302299
cancellable_streams = self._context.cancellable_streams
303300
if cancellable_streams: # pragma: no branch

src/graphql/execution/types.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
"SubsequentResultRecord",
5959
"TerminatingStreamItemsResult",
6060
"is_cancellable_stream_record",
61-
"is_deferred_fragment_record",
6261
"is_deferred_grouped_field_set_record",
6362
"is_deferred_grouped_field_set_result",
6463
"is_non_reconcilable_deferred_grouped_field_set_result",
@@ -695,13 +694,6 @@ class FormattedCompletedResult(TypedDict):
695694
errors: NotRequired[list[GraphQLFormattedError]]
696695

697696

698-
def is_deferred_fragment_record(
699-
subsequent_result_record: SubsequentResultRecord,
700-
) -> TypeGuard[DeferredFragmentRecord]:
701-
"""Check if the subsequent result record is a deferred fragment record."""
702-
return isinstance(subsequent_result_record, DeferredFragmentRecord)
703-
704-
705697
def is_deferred_grouped_field_set_record(
706698
incremental_data_record: IncrementalDataRecord,
707699
) -> TypeGuard[DeferredGroupedFieldSetRecord]:
@@ -838,18 +830,8 @@ class DeferredFragmentRecord(SubsequentResultRecord):
838830
"""
839831

840832
parent: DeferredFragmentRecord | None
841-
expected_reconcilable_results: int
842-
results: list[DeferredGroupedFieldSetResult]
843-
reconcilable_results: list[ReconcilableDeferredGroupedFieldSetResult]
844-
children: dict[DeferredFragmentRecord, None]
845833

846-
__slots__ = (
847-
"children",
848-
"expected_reconcilable_results",
849-
"parent",
850-
"reconcilable_results",
851-
"results",
852-
)
834+
__slots__ = ("parent",)
853835

854836
def __init__(
855837
self,
@@ -859,10 +841,6 @@ def __init__(
859841
) -> None:
860842
super().__init__(path, label)
861843
self.parent = parent
862-
self.expected_reconcilable_results = 0
863-
self.results = []
864-
self.reconcilable_results = []
865-
self.children = {}
866844

867845

868846
class CancellableStreamRecord(SubsequentResultRecord):

0 commit comments

Comments
 (0)