Skip to content
This repository was archived by the owner on Dec 15, 2020. It is now read-only.

Commit ecd313a

Browse files
Add max_in_memory_pages parameter to execute_futures
1 parent 5ec62d6 commit ecd313a

File tree

2 files changed

+57
-13
lines changed

2 files changed

+57
-13
lines changed

aiocassandra.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
class _Paginator:
2121

22-
def __init__(self, request, *, executor, loop):
22+
def __init__(self, request, *, executor, loop, max_in_memory_pages=None):
2323
self.cassandra_fut = None
2424

2525
self._request = request
@@ -30,10 +30,35 @@ def __init__(self, request, *, executor, loop):
3030
self._deque = deque()
3131
self._exc = None
3232
self._drain_event = asyncio.Event(loop=loop)
33+
self._no_fetching_page = asyncio.Event(loop=loop)
3334
self._finish_event = asyncio.Event(loop=loop)
3435
self._exit_event = Event()
3536

3637
self.__pages = set()
38+
self._max_in_memory_pages = max_in_memory_pages
39+
self._page_size = None
40+
41+
def _start_fetching_next_page(self):
42+
self._no_fetching_page.clear()
43+
_fn = self.cassandra_fut.start_fetching_next_page
44+
fut = self._loop.run_in_executor(self._executor, _fn)
45+
self.__pages.add(fut)
46+
fut.add_done_callback(self.__pages.remove)
47+
48+
def _maybe_start_prefetch_next_page(self):
49+
if self._finish_event.is_set() or not self._no_fetching_page.is_set():
50+
return
51+
52+
if not self.cassandra_fut.has_more_pages:
53+
self._finish_event.set()
54+
return
55+
56+
if self._max_in_memory_pages is None:
57+
pass
58+
elif len(self._deque) > self._page_size * (self._max_in_memory_pages - 1):
59+
return
60+
61+
self._start_fetching_next_page()
3762

3863
def _handle_page(self, rows):
3964
if self._exit_event.is_set():
@@ -42,19 +67,15 @@ def _handle_page(self, rows):
4267
'Paginator is closed, skipping new %i records', _len)
4368
return
4469

70+
if self._page_size is None:
71+
self._page_size = len(rows)
72+
4573
for row in rows:
4674
self._deque.append(row)
4775

76+
self._loop.call_soon_threadsafe(self._no_fetching_page.set)
4877
self._loop.call_soon_threadsafe(self._drain_event.set)
49-
50-
if self.cassandra_fut.has_more_pages:
51-
_fn = self.cassandra_fut.start_fetching_next_page
52-
fut = self._loop.run_in_executor(self._executor, _fn)
53-
self.__pages.add(fut)
54-
fut.add_done_callback(self.__pages.remove)
55-
return
56-
57-
self._loop.call_soon_threadsafe(self._finish_event.set)
78+
self._loop.call_soon_threadsafe(self._maybe_start_prefetch_next_page)
5879

5980
def _handle_err(self, exc):
6081
self._exc = exc
@@ -102,8 +123,11 @@ async def _paginator(self):
102123
if self._exc is not None:
103124
raise self._exc
104125

126+
self._maybe_start_prefetch_next_page()
127+
105128
while self._deque:
106129
await yield_(self._deque.popleft())
130+
self._maybe_start_prefetch_next_page()
107131

108132
await asyncio.wait(
109133
(
@@ -153,12 +177,13 @@ async def execute_future(self, *args, **kwargs):
153177
return await asyncio_fut
154178

155179

156-
def execute_futures(self, *args, **kwargs):
180+
def execute_futures(self, *args, max_in_memory_pages=None, **kwargs):
157181
_request = partial(self.execute_async, *args, **kwargs)
158182
return _Paginator(
159183
_request,
160184
executor=self._asyncio_executor,
161-
loop=self._asyncio_loop
185+
loop=self._asyncio_loop,
186+
max_in_memory_pages=max_in_memory_pages
162187
)
163188

164189

tests/test_aiocassandra.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async def test_execute_futures_simple_statement_empty(cassandra):
131131

132132
@pytest.mark.asyncio
133133
async def test_execute_futures_simple_statement(cassandra):
134-
cql = 'SELECT * FROM system.size_estimates;'
134+
cql = 'SELECT * FROM system.size_estimates LIMIT 500;'
135135
statement = SimpleStatement(cql, fetch_size=100)
136136

137137
ret = []
@@ -144,6 +144,25 @@ async def test_execute_futures_simple_statement(cassandra):
144144
assert len(ret) != 0
145145

146146

147+
@pytest.mark.asyncio
148+
async def test_execute_futures_simple_statement_limit_pages(cassandra):
149+
cql = 'SELECT * FROM system.size_estimates LIMIT 50;'
150+
statement = SimpleStatement(cql, fetch_size=10)
151+
152+
ret = []
153+
154+
async with cassandra.execute_futures(statement, max_in_memory_pages=3) as paginator:
155+
await asyncio.sleep(0.5) # wait for fetching pages
156+
assert len(paginator._deque) == 30
157+
async for row in paginator:
158+
await asyncio.sleep(0.2) # slow down consumer
159+
assert isinstance(row, tuple)
160+
assert len(paginator._deque) <= 30
161+
ret.append(row)
162+
163+
assert len(ret) == 50
164+
165+
147166
@pytest.mark.asyncio
148167
async def test_execute_futures_break(cassandra):
149168
cql = 'SELECT * FROM system.size_estimates;'

0 commit comments

Comments
 (0)