-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_execution.py
More file actions
73 lines (62 loc) · 2.27 KB
/
query_execution.py
File metadata and controls
73 lines (62 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from query import Query
from constants import *
from text_selection import TextSelection
from sqlalchemy.sql import text
import asyncio
import pandas as pd
import sqlalchemy
import sqlalchemy.ext.asyncio
class QueryExecution:
def __init__(self, sql_str: str):
self._sql_engine = sqlalchemy.ext.asyncio.create_async_engine(
f"{DB_URL}/{DB_NAME}",
)
self.query = Query(sql_str)
try:
loop = asyncio.get_event_loop()
except Exception:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._execute_base_query())
# loop.close()
if self.query.semantic_predicate is not None:
print("Executing semantic query")
input_query = f"SELECT Lyrics FROM songs WHERE {self.query.semantic_predicate}"
old_query = self.query
old_results_df = self.results_df
self.query = Query(input_query)
try:
loop = asyncio.get_event_loop()
except Exception:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._execute_base_query())
results = self.get_results()["Lyrics"].values[0]
self.query = old_query
self.results_df = old_results_df
self.txt_selection = TextSelection(self.results_df, results)
results_df = self.txt_selection.satisfied_ids
ids = results_df["id"].values # Assuming this is a list of integers
ids_string = ','.join(map(str, ids))
order_clause = ' '.join([f"WHEN {id_} THEN {i}" for i, id_ in enumerate(ids)])
input_query = f"""
SELECT id, Title FROM songs
WHERE id IN ({ids_string})
ORDER BY CASE id {order_clause} END
"""
self.query = Query(input_query)
try:
loop = asyncio.get_event_loop()
except Exception:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._execute_base_query())
loop.close()
self.results_df = self.get_results()
self.execution_complete = True
async def _execute_base_query(self):
async with self._sql_engine.begin() as conn:
self.results_df = await conn.run_sync(
lambda conn: pd.read_sql(text(self.query.base_query_without_semantic), conn))
def get_results(self):
return self.results_df