Skip to content

Commit e380de2

Browse files
authored
refactor(FIR-49930): decoupling query preprocessing logic (#467)
1 parent 060809e commit e380de2

File tree

6 files changed

+639
-211
lines changed

6 files changed

+639
-211
lines changed

src/firebolt/async_db/cursor.py

Lines changed: 49 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
UPDATE_ENDPOINT_HEADER,
2626
UPDATE_PARAMETERS_HEADER,
2727
CursorState,
28-
ParameterStyle,
2928
)
3029
from firebolt.common.cursor.base_cursor import (
3130
BaseCursor,
@@ -38,6 +37,10 @@
3837
check_not_closed,
3938
check_query_executed,
4039
)
40+
from firebolt.common.cursor.statement_planners import (
41+
ExecutionPlan,
42+
StatementPlannerFactory,
43+
)
4144
from firebolt.common.row_set.asynchronous.base import BaseAsyncRowSet
4245
from firebolt.common.row_set.asynchronous.in_memory import InMemoryAsyncRowSet
4346
from firebolt.common.row_set.asynchronous.streaming import StreamingAsyncRowSet
@@ -222,117 +225,84 @@ async def _do_execute(
222225
from firebolt.async_db import paramstyle
223226

224227
try:
225-
parameter_style = ParameterStyle(paramstyle)
226-
except ValueError:
227-
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")
228-
try:
229-
if parameter_style == ParameterStyle.FB_NUMERIC:
230-
await self._execute_fb_numeric(
231-
raw_query, parameters, timeout, async_execution, streaming
232-
)
233-
else:
234-
queries: List[Union[SetParameter, str]] = (
235-
[raw_query]
236-
if skip_parsing
237-
else self._formatter.split_format_sql(raw_query, parameters)
238-
)
239-
timeout_controller = TimeoutController(timeout)
240-
if len(queries) > 1 and async_execution:
241-
raise FireboltError(
242-
"Server side async does not support multi-statement queries"
243-
)
244-
for query in queries:
245-
await self._execute_single_query(
246-
query, timeout_controller, async_execution, streaming
247-
)
228+
statement_planner = StatementPlannerFactory.create_planner(
229+
paramstyle, self._formatter
230+
)
231+
232+
plan = statement_planner.create_execution_plan(
233+
raw_query, parameters, skip_parsing, async_execution, streaming
234+
)
235+
await self._execute_plan(plan, timeout)
248236
self._state = CursorState.DONE
249237
except Exception:
250238
self._state = CursorState.ERROR
251239
raise
252240

253-
async def _execute_fb_numeric(
241+
async def _execute_plan(
254242
self,
255-
query: str,
256-
parameters: Sequence[Sequence[ParameterType]],
243+
plan: ExecutionPlan,
257244
timeout: Optional[float],
258-
async_execution: bool,
259-
streaming: bool,
260245
) -> None:
261-
Cursor._log_query(query)
246+
"""Execute an execution plan."""
262247
timeout_controller = TimeoutController(timeout)
263-
timeout_controller.raise_if_timeout()
264-
query_params = self._build_fb_numeric_query_params(
265-
parameters, streaming, async_execution
266-
)
267-
resp = await self._api_request(
268-
query,
269-
query_params,
270-
timeout=timeout_controller.remaining(),
271-
)
272-
await self._raise_if_error(resp)
273-
if async_execution:
274-
await resp.aread()
275-
self._parse_async_response(resp)
276-
else:
277-
await self._parse_response_headers(resp.headers)
278-
await self._append_row_set_from_response(resp)
248+
249+
for query in plan.queries:
250+
if isinstance(query, SetParameter):
251+
if plan.async_execution:
252+
raise FireboltError(
253+
"Server side async does not support set statements, "
254+
"please use execute to set this parameter"
255+
)
256+
await self._validate_set_parameter(
257+
query, timeout_controller.remaining()
258+
)
259+
else:
260+
# Regular query execution
261+
await self._execute_single_query(
262+
query,
263+
plan.query_params,
264+
timeout_controller,
265+
plan.async_execution,
266+
plan.streaming,
267+
)
279268

280269
async def _execute_single_query(
281270
self,
282-
query: Union[SetParameter, str],
271+
query: str,
272+
query_params: Optional[Dict[str, Any]],
283273
timeout_controller: TimeoutController,
284274
async_execution: bool,
285275
streaming: bool,
286276
) -> None:
277+
"""Execute a single query."""
287278
start_time = time.time()
288279
Cursor._log_query(query)
289280
timeout_controller.raise_if_timeout()
290281

291-
if isinstance(query, SetParameter):
292-
if async_execution:
293-
raise FireboltError(
294-
"Server side async does not support set statements, "
295-
"please use execute to set this parameter"
296-
)
297-
await self._validate_set_parameter(query, timeout_controller.remaining())
298-
else:
299-
await self._handle_query_execution(
300-
query, timeout_controller, async_execution, streaming
301-
)
302-
303-
if not async_execution:
304-
logger.info(
305-
f"Query fetched {self.rowcount} rows in"
306-
f" {time.time() - start_time} seconds."
307-
)
308-
else:
309-
logger.info("Query submitted for async execution.")
282+
final_params = query_params or {}
310283

311-
async def _handle_query_execution(
312-
self,
313-
query: str,
314-
timeout_controller: TimeoutController,
315-
async_execution: bool,
316-
streaming: bool,
317-
) -> None:
318-
query_params: Dict[str, Any] = {
319-
"output_format": self._get_output_format(streaming)
320-
}
321-
if async_execution:
322-
query_params["async"] = True
323284
resp = await self._api_request(
324285
query,
325-
query_params,
286+
final_params,
326287
timeout=timeout_controller.remaining(),
327288
)
328289
await self._raise_if_error(resp)
290+
329291
if async_execution:
330292
await resp.aread()
331293
self._parse_async_response(resp)
332294
else:
333295
await self._parse_response_headers(resp.headers)
334296
await self._append_row_set_from_response(resp)
335297

298+
if not async_execution:
299+
logger.info(
300+
f"Query fetched {self.rowcount} rows in"
301+
f" {time.time() - start_time} seconds."
302+
)
303+
else:
304+
logger.info("Query submitted for async execution.")
305+
336306
async def use_database(self, database: str, cache: bool = True) -> None:
337307
"""Switch the current database context with caching."""
338308
if cache:

src/firebolt/common/cursor/base_cursor.py

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
from __future__ import annotations
22

3-
import json
43
import logging
54
import re
65
from types import TracebackType
7-
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
6+
from typing import Any, Dict, List, Optional, Tuple, Type, Union
87

98
from httpx import URL, Response
109

1110
from firebolt.client.auth.base import Auth
1211
from firebolt.client.client import AsyncClient, Client
13-
from firebolt.common._types import ParameterType, RawColType, SetParameter
12+
from firebolt.common._types import RawColType, SetParameter
1413
from firebolt.common.constants import (
1514
DISALLOWED_PARAMETER_LIST,
1615
IMMUTABLE_PARAMETER_LIST,
@@ -238,42 +237,6 @@ def _log_query(query: Union[str, SetParameter]) -> None:
238237
):
239238
logger.debug(f"Running query: {query}")
240239

241-
def _build_fb_numeric_query_params(
242-
self,
243-
parameters: Sequence[Sequence[ParameterType]],
244-
streaming: bool,
245-
async_execution: bool,
246-
) -> Dict[str, Any]:
247-
"""
248-
Build query parameters dictionary for fb_numeric paramstyle.
249-
250-
Args:
251-
parameters: A sequence of parameter sequences. For fb_numeric,
252-
only the first parameter sequence is used.
253-
streaming: Whether streaming is enabled
254-
async_execution: Whether async execution is enabled
255-
256-
Returns:
257-
Dictionary of query parameters to send with the request
258-
"""
259-
param_list = parameters[0] if parameters else []
260-
query_parameters = [
261-
{
262-
"name": f"${i+1}",
263-
"value": self._formatter.convert_parameter_for_serialization(value),
264-
}
265-
for i, value in enumerate(param_list)
266-
]
267-
268-
query_params: Dict[str, Any] = {
269-
"output_format": self._get_output_format(streaming),
270-
}
271-
if query_parameters:
272-
query_params["query_parameters"] = json.dumps(query_parameters)
273-
if async_execution:
274-
query_params["async"] = True
275-
return query_params
276-
277240
@property
278241
def engine_name(self) -> str:
279242
"""

0 commit comments

Comments
 (0)