Skip to content

Commit a2e3ef3

Browse files
authored
feat(FIR-42854): Server side async query execution (#415)
1 parent 6667d21 commit a2e3ef3

File tree

20 files changed

+1213
-71
lines changed

20 files changed

+1213
-71
lines changed

docsrc/Connecting_and_queries.rst

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,73 @@ load on both server and client machines can be controlled. A suggested way is to
594594
run(run_multiple_queries())
595595

596596

597+
Server-side asynchronous query execution
598+
==========================================
599+
Firebolt supports server-side asynchronous query execution. This feature allows you to run
600+
queries in the background and fetch the results later. This is especially useful for long-running
601+
queries that you don't want to wait for or maintain a persistent connection to the server.
602+
603+
This feature is not to be confused with the Python SDK's asynchronous functionality, which is
604+
described in the :ref:`Asynchronous query execution <connecting_and_queries:Asynchronous query execution>` section,
605+
used to write concurrent code. Server-side asynchronous query execution is a feature of the
606+
Firebolt engine itself.
607+
608+
Submitting an asynchronous query
609+
--------------------------------
610+
611+
Use :py:meth:`firebolt.db.cursor.Cursor.execute_async` method to run query without maintaing a persistent connection.
612+
This method will return immediately, and the query will be executed in the background. Return value
613+
of execute_async is -1, which is the rowcount for queries where it's not applicable.
614+
`cursor.async_query_token` attribute will contain a token that can be used to monitor the query status.
615+
616+
::
617+
618+
# Synchronous execution
619+
cursor.execute("CREATE TABLE my_table (id INT, name TEXT, date_value DATE)")
620+
621+
# Asynchronous execution
622+
cursor.execute_async("INSERT INTO my_table VALUES (5, 'egg', '2022-01-01')")
623+
token = cursor.async_query_token
624+
625+
Trying to access `async_query_token` before calling `execute_async` will raise an exception.
626+
627+
.. note::
628+
Multiple-statement queries are not supported for asynchronous queries. However, you can run each statement
629+
separately using multiple `execute_async` calls.
630+
631+
.. note::
632+
Fetching data via SELECT is not supported and will raise an exception. execute_async is best suited for DML queries.
633+
634+
Monitoring the query status
635+
----------------------------
636+
637+
To check the async query status you need to retrieve the token of the query. The token is a unique
638+
identifier for the query and can be used to fetch the query status. You can store this token
639+
outside of the current process and use it later to check the query status. :ref:`Connection <firebolt.db:Connection>` object
640+
has two methods to check the query status: :py:meth:`firebolt.db.connection.Connection.is_async_query_running` and
641+
:py:meth:`firebolt.db.connection.Connection.is_async_query_successful`.`is_async_query_running` will return True
642+
if the query is still running, and False otherwise. `is_async_query_successful` will return True if the query
643+
has finished successfully, None if query is still running and False if the query has failed.
644+
645+
::
646+
647+
while(connection.is_async_query_running(token)):
648+
print("Query is still running")
649+
time.sleep(1)
650+
print("Query has finished")
651+
652+
success = connection.is_async_query_successful(token)
653+
# success is None if the query is still running
654+
if success is None:
655+
# we should not reach this point since we've waited for is_async_query_running
656+
raise Exception("The query is still running, use is_async_query_running to check the status")
657+
658+
if success:
659+
print("Query was successful")
660+
else:
661+
print("Query failed")
662+
663+
597664
Thread safety
598665
==============================
599666

src/firebolt/async_db/connection.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,19 @@
1010
from firebolt.client import DEFAULT_API_URL
1111
from firebolt.client.auth import Auth
1212
from firebolt.client.client import AsyncClient, AsyncClientV1, AsyncClientV2
13-
from firebolt.common.base_connection import BaseConnection
13+
from firebolt.common.base_connection import (
14+
ASYNC_QUERY_STATUS_REQUEST,
15+
ASYNC_QUERY_STATUS_RUNNING,
16+
ASYNC_QUERY_STATUS_SUCCESSFUL,
17+
BaseConnection,
18+
)
1419
from firebolt.common.cache import _firebolt_system_engine_cache
1520
from firebolt.common.constants import DEFAULT_TIMEOUT_SECONDS
16-
from firebolt.utils.exception import ConfigurationError, ConnectionClosedError
21+
from firebolt.utils.exception import (
22+
ConfigurationError,
23+
ConnectionClosedError,
24+
FireboltError,
25+
)
1726
from firebolt.utils.usage_tracker import get_user_agent_header
1827
from firebolt.utils.util import fix_url_schema, validate_engine_name_and_url_v1
1928

@@ -63,10 +72,9 @@ def __init__(
6372
api_endpoint: str,
6473
init_parameters: Optional[Dict[str, Any]] = None,
6574
):
66-
super().__init__()
75+
super().__init__(cursor_type)
6776
self.api_endpoint = api_endpoint
6877
self.engine_url = engine_url
69-
self.cursor_type = cursor_type
7078
self._cursors: List[Cursor] = []
7179
self._client = client
7280
self.init_parameters = init_parameters or {}
@@ -81,6 +89,50 @@ def cursor(self, **kwargs: Any) -> Cursor:
8189
self._cursors.append(c)
8290
return c
8391

92+
# Server-side async methods
93+
async def _get_async_query_status(self, token: str) -> str:
94+
if self.cursor_type != CursorV2:
95+
raise FireboltError(
96+
"This method is only supported for connection with service account."
97+
)
98+
cursor = self.cursor()
99+
await cursor.execute(ASYNC_QUERY_STATUS_REQUEST.format(token=token))
100+
result = await cursor.fetchone()
101+
if cursor.rowcount != 1 or not result:
102+
raise FireboltError("Unexpected result from async query status request.")
103+
columns = cursor.description
104+
result_dict = dict(zip([column.name for column in columns], result))
105+
return str(result_dict.get("status"))
106+
107+
async def is_async_query_running(self, token: str) -> bool:
108+
"""
109+
Check if an async query is still running.
110+
111+
Args:
112+
token: Async query token. Can be obtained from Cursor.async_query_token.
113+
114+
Returns:
115+
bool: True if async query is still running, False otherwise
116+
"""
117+
status = await self._get_async_query_status(token)
118+
return status == ASYNC_QUERY_STATUS_RUNNING
119+
120+
async def is_async_query_successful(self, token: str) -> Optional[bool]:
121+
"""
122+
Check if an async query has finished and was successful.
123+
124+
Args:
125+
token: Async query token. Can be obtained from Cursor.async_query_token.
126+
127+
Returns:
128+
bool: None if the query is still running, True if successful,
129+
False otherwise
130+
"""
131+
status = await self._get_async_query_status(token)
132+
if status == ASYNC_QUERY_STATUS_RUNNING:
133+
return None
134+
return status == ASYNC_QUERY_STATUS_SUCCESSFUL
135+
84136
# Context manager support
85137
async def __aenter__(self) -> Connection:
86138
if self.closed:

src/firebolt/async_db/cursor.py

Lines changed: 124 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import (
99
TYPE_CHECKING,
1010
Any,
11+
Dict,
1112
Iterator,
1213
List,
1314
Optional,
@@ -39,16 +40,18 @@
3940
UPDATE_PARAMETERS_HEADER,
4041
BaseCursor,
4142
CursorState,
42-
RowSet,
4343
_parse_update_endpoint,
4444
_parse_update_parameters,
4545
_raise_if_internal_set_parameter,
46+
async_not_allowed,
4647
check_not_closed,
4748
check_query_executed,
4849
)
4950
from firebolt.utils.exception import (
5051
EngineNotRunningError,
5152
FireboltDatabaseError,
53+
FireboltError,
54+
NotSupportedError,
5255
OperationalError,
5356
ProgrammingError,
5457
QueryTimeoutError,
@@ -186,53 +189,93 @@ async def _parse_response_headers(self, headers: Headers) -> None:
186189
param_dict = _parse_update_parameters(headers.get(UPDATE_PARAMETERS_HEADER))
187190
self._update_set_parameters(param_dict)
188191

192+
@abstractmethod
193+
async def execute_async(
194+
self,
195+
query: str,
196+
parameters: Optional[Sequence[ParameterType]] = None,
197+
skip_parsing: bool = False,
198+
) -> int:
199+
"""Execute a database query without maintaining a connection."""
200+
...
201+
189202
async def _do_execute(
190203
self,
191204
raw_query: str,
192205
parameters: Sequence[Sequence[ParameterType]],
193206
skip_parsing: bool = False,
194207
timeout: Optional[float] = None,
208+
async_execution: bool = False,
195209
) -> None:
196210
self._reset()
197-
# Allow users to manually skip parsing for performance improvement.
198211
queries: List[Union[SetParameter, str]] = (
199212
[raw_query] if skip_parsing else split_format_sql(raw_query, parameters)
200213
)
201214
timeout_controller = TimeoutController(timeout)
215+
216+
if len(queries) > 1 and async_execution:
217+
raise FireboltError(
218+
"Server side async does not support multi-statement queries"
219+
)
202220
try:
203221
for query in queries:
204-
start_time = time.time()
205-
Cursor._log_query(query)
206-
timeout_controller.raise_if_timeout()
207-
208-
if isinstance(query, SetParameter):
209-
row_set: RowSet = (-1, None, None, None)
210-
await self._validate_set_parameter(
211-
query, timeout_controller.remaining()
212-
)
213-
else:
214-
resp = await self._api_request(
215-
query,
216-
{"output_format": JSON_OUTPUT_FORMAT},
217-
timeout=timeout_controller.remaining(),
218-
)
219-
await self._raise_if_error(resp)
220-
await self._parse_response_headers(resp.headers)
221-
row_set = self._row_set_from_response(resp)
222-
223-
self._append_row_set(row_set)
224-
225-
logger.info(
226-
f"Query fetched {self.rowcount} rows in"
227-
f" {time.time() - start_time} seconds."
222+
await self._execute_single_query(
223+
query, timeout_controller, async_execution
228224
)
229-
230225
self._state = CursorState.DONE
231-
232226
except Exception:
233227
self._state = CursorState.ERROR
234228
raise
235229

230+
async def _execute_single_query(
231+
self,
232+
query: Union[SetParameter, str],
233+
timeout_controller: TimeoutController,
234+
async_execution: bool,
235+
) -> None:
236+
start_time = time.time()
237+
Cursor._log_query(query)
238+
timeout_controller.raise_if_timeout()
239+
240+
if isinstance(query, SetParameter):
241+
if async_execution:
242+
raise FireboltError(
243+
"Server side async does not support set statements, "
244+
"please use execute to set this parameter"
245+
)
246+
await self._validate_set_parameter(query, timeout_controller.remaining())
247+
else:
248+
await self._handle_query_execution(
249+
query, timeout_controller, async_execution
250+
)
251+
252+
if not async_execution:
253+
logger.info(
254+
f"Query fetched {self.rowcount} rows in"
255+
f" {time.time() - start_time} seconds."
256+
)
257+
else:
258+
logger.info("Query submitted for async execution.")
259+
260+
async def _handle_query_execution(
261+
self, query: str, timeout_controller: TimeoutController, async_execution: bool
262+
) -> None:
263+
query_params: Dict[str, Any] = {"output_format": JSON_OUTPUT_FORMAT}
264+
if async_execution:
265+
query_params["async"] = True
266+
resp = await self._api_request(
267+
query,
268+
query_params,
269+
timeout=timeout_controller.remaining(),
270+
)
271+
await self._raise_if_error(resp)
272+
if async_execution:
273+
self._parse_async_response(resp)
274+
else:
275+
await self._parse_response_headers(resp.headers)
276+
row_set = self._row_set_from_response(resp)
277+
self._append_row_set(row_set)
278+
236279
@check_not_closed
237280
async def execute(
238281
self,
@@ -346,6 +389,7 @@ async def nextset(self) -> None:
346389

347390
# Iteration support
348391
@check_not_closed
392+
@async_not_allowed
349393
@check_query_executed
350394
def __aiter__(self) -> Cursor:
351395
return self
@@ -373,6 +417,7 @@ async def __aexit__(
373417
self.close()
374418

375419
@check_not_closed
420+
@async_not_allowed
376421
@check_query_executed
377422
async def __anext__(self) -> List[ColType]:
378423
row = await self.fetchone()
@@ -392,6 +437,47 @@ def __init__(
392437
assert isinstance(client, AsyncClientV2)
393438
super().__init__(*args, client=client, connection=connection, **kwargs)
394439

440+
@check_not_closed
441+
async def execute_async(
442+
self,
443+
query: str,
444+
parameters: Optional[Sequence[ParameterType]] = None,
445+
skip_parsing: bool = False,
446+
) -> int:
447+
"""
448+
Execute a database query without maintating a connection.
449+
450+
Supported features:
451+
Parameterized queries: placeholder characters ('?') are substituted
452+
with values provided in `parameters`. Values are formatted to
453+
be properly recognized by database and to exclude SQL injection.
454+
455+
Not supported:
456+
Multi-statement queries: multiple statements, provided in a single query
457+
and separated by semicolon.
458+
SET statements: to provide additional query execution parameters, execute
459+
`SET param=value` statement before it. Use `execute` method to set
460+
parameters.
461+
462+
Args:
463+
query (str): SQL query to execute
464+
parameters (Optional[Sequence[ParameterType]]): A sequence of substitution
465+
parameters. Used to replace '?' placeholders inside a query with
466+
actual values
467+
skip_parsing (bool): Flag to disable query parsing. This will
468+
disable parameterized queries while potentially improving performance
469+
470+
Returns:
471+
int: Always returns -1, as async execution does not return row count.
472+
"""
473+
await self._do_execute(
474+
query,
475+
[parameters] if parameters else [],
476+
skip_parsing,
477+
async_execution=True,
478+
)
479+
return -1
480+
395481
async def is_db_available(self, database_name: str) -> bool:
396482
"""
397483
Verify that the database exists.
@@ -468,3 +554,13 @@ async def _filter_request(self, endpoint: str, filters: dict) -> Response:
468554
)
469555
resp.raise_for_status()
470556
return resp
557+
558+
async def execute_async(
559+
self,
560+
query: str,
561+
parameters: Optional[Sequence[ParameterType]] = None,
562+
skip_parsing: bool = False,
563+
) -> int:
564+
raise NotSupportedError(
565+
"Async execution is not supported in this version " " of Firebolt."
566+
)

src/firebolt/client/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
DEFAULT_API_URL: str = "api.app.firebolt.io"
77
PROTOCOL_VERSION_HEADER_NAME = "Firebolt-Protocol-Version"
8-
PROTOCOL_VERSION: str = "2.1"
8+
PROTOCOL_VERSION: str = "2.3"
99
_REQUEST_ERRORS: Tuple[Type, ...] = (
1010
HTTPError,
1111
InvalidURL,

0 commit comments

Comments
 (0)