From 68fb3a80efc16c6c7caa26c4995a6eb20275d583 Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 21 May 2025 12:13:29 +0200 Subject: [PATCH 01/19] integrate existing poc, add simple test --- quasardb/dask/__init__.py | 277 ++++++++++++++++++++++++++++++++++++++ setup.py | 2 + tests/test_dask.py | 63 +++++++++ 3 files changed, 342 insertions(+) create mode 100644 quasardb/dask/__init__.py create mode 100644 tests/test_dask.py diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py new file mode 100644 index 00000000..bdd80b83 --- /dev/null +++ b/quasardb/dask/__init__.py @@ -0,0 +1,277 @@ +# pylint: disable=C0103,C0111,C0302,R0903 + +# Copyright (c) 2009-2025, quasardb SAS. All rights reserved. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of quasardb nor the names of its contributors may +# be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY QUASARDB AND CONTRIBUTORS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# + +import logging +import quasardb +import datetime +import re + +logger = logging.getLogger("quasardb.dask") + + +class DaskRequired(ImportError): + """ + Exception raised when trying to use QuasarDB dask integration, but + required packages are not installed has not been installed. + """ + + pass + + +try: + import dask.dataframe as dd + from dask.delayed import delayed + import quasardb.pandas as qdbpd + import pandas as pd + +except ImportError as err: + logger.exception(err) + raise DaskRequired( + "The dask library is required to use QuasarDB dask integration." + ) from err + +class DateParserRequired(ImportError): + pass + +try: + import dateparser + +except ImportError as err: + logger.exception(err) + raise DaskRequired( + "dateparser required!" + ) from err + +table_pattern = re.compile(r"(?i)\bFROM\s+([`\"\[]?\w+[`\"\]]?)") +range_pattern = re.compile(r"(?i)\bIN\s+RANGE\s*\(([^,]+),\s*([^,]+)\)") + + +def _read_dataframe( + query: str, meta: pd.DataFrame, conn_kwargs: dict, query_kwargs: dict +) -> qdbpd.DataFrame: + logger.debug('Querying QuasarDB with query: "%s"', query) + with quasardb.Cluster(**conn_kwargs) as conn: + df = qdbpd.query(conn, query, **query_kwargs) + + if len(df) == 0: + return meta + else: + return df + + +def _extract_table_name_from_query(query: str) -> str: + # XXX:igor for now this works for queries using only one table + + logger.debug('Extracting table name from query: "%s"', query) + match = re.search(table_pattern, query) + if match: + table_name = match.group(1) + logger.debug('Extracted table name: "%s"', table_name) + return table_name + else: + raise ValueError("Could not extract table name from query. ") + + +def _extract_range_from_query(conn, query: str, table_name: str) -> tuple: + """ + Extracts the range from the query, parses it to datetime and returns. + If no range is found in the query, it queries the table for the first and last timestamp. + """ + logger.debug('Extracting query range from: "%s"', query) + match = re.search(range_pattern, query) + # first we check try to extract "in range (start, end)" from query + # if we can't do it we will query first() and last() from the table + query_range = tuple() + if match: + if len(match.group()) == 2: + start_str = match.group(1) + end_str = match.group(2) + logger.debug("Extracted strings: (%s, %s)", start_str, end_str) + parser_settings = { + "PREFER_DAY_OF_MONTH": "first", + "PREFER_MONTH_OF_YEAR": "first", + } + start_date = dateparser.parse(start_str, settings=parser_settings) + end_date = dateparser.parse(end_str, settings=parser_settings) + query_range = (start_date, end_date) + logger.debug("Parsed datetime: %s", query_range) + return query_range + + logger.debug( + "No range found in query, querying table for first and last timestamp" + ) + range_query = f"SELECT first($timestamp), last($timestamp) FROM {table_name}" + df = qdbpd.query(conn, range_query) + if not df.empty: + df.iloc[0]["last($timestamp)"] += datetime.timedelta(microseconds=1) + query_range += tuple(df.iloc[0]) + logger.debug("Extracted range from table: %s", query_range) + return query_range + + +def _create_subrange_query( + query: str, query_range: tuple[datetime.datetime, datetime.datetime] +) -> str: + """ + Adds range to base query. + IF range is found in the query, it will be replaced with the new range. + IF no range is found, it will be added after the "FROM {table}" clause. + """ + new_query = query + range_match = re.search(range_pattern, query) + start_str = query_range[0].strftime("%Y-%m-%dT%H:%M:%S.%fZ") + end_str = query_range[1].strftime("%Y-%m-%dT%H:%M:%S.%fZ") + if range_match: + if len(range_match.groups()) == 2: + new_query = re.sub( + range_pattern, + f"IN RANGE ({start_str}, {end_str})", + query, + ) + logger.debug("Created subquery: %s", new_query) + return new_query + + table_match = re.search(table_pattern, query) + new_query = re.sub( + table_pattern, + f"FROM {table_match.group(1)} IN RANGE ({start_str}, {end_str})", + query, + ) + logger.debug("Created subquery: %s", new_query) + return new_query + + +def _split_by_timedelta( + query_range: tuple[datetime.datetime, datetime.datetime], delta: datetime.timedelta +) -> tuple[datetime.datetime, datetime.datetime]: + """ + Splits passed range into smaller ranges of size of delta. + """ + + ranges = [] + + if len(query_range) != 2: + return ranges + + start = query_range[0] + end = query_range[1] + current_start = start + + while current_start < end: + current_end = min(current_start + delta, end) + ranges.append((current_start, current_end)) + current_start = current_end + + return ranges + + +def _get_meta(conn, query: str) -> pd.DataFrame: + """ + Meta is an empty dataframe with the expected schema of the query result. + Extract df schema from the first row of the query result. + """ + # XXX:igor we can use different approaches to get the meta + # 1. get the meta from the first row + # this will require us to modify passed query, we have to add a limit 1 (check for existing limit, check for offset) + # does this put a lot of pressure on the db? + # this approach provides the most accurate meta + # 2. get the meta from the table schema + # we will have to add $timestamp and $table for "select *" queries + # with more complicated queries we can run into different edge cases + # we will have to extract actual col names and aliases + # get types for col names and assign them to the aliases + # will be less accurate + query += " LIMIT 1" + return qdbpd.query(conn, query).iloc[:0] + + +def query( + query: str, + uri: str, + *, + user_name: str = "", + user_private_key: str = "", + cluster_public_key: str = "", + user_security_file: str = "", + cluster_public_key_file: str = "", + timeout: datetime.timedelta = datetime.timedelta(seconds=60), + do_version_check: bool = False, + enable_encryption: bool = False, + client_max_parallelism: int = 0, + index=None, + blobs: bool = False, + numpy: bool = False, +): + conn_kwargs = { + "uri": uri, + "user_name": user_name, + "user_private_key": user_private_key, + "cluster_public_key": cluster_public_key, + "user_security_file": user_security_file, + "cluster_public_key_file": cluster_public_key_file, + "timeout": timeout, + "do_version_check": do_version_check, + "enable_encryption": enable_encryption, + "client_max_parallelism": client_max_parallelism, + } + + query_kwargs = { + "index": index, + "blobs": blobs, + "numpy": numpy, + } + + + table_name = _extract_table_name_from_query(query) + with quasardb.Cluster(**conn_kwargs) as conn: + meta = _get_meta(conn, query) + shard_size = conn.table(table_name.replace("\"", "")).get_shard_size() + query_range = _extract_range_from_query(conn, query, table_name) + # XXX:igor this will work good for tables with a lot of data in all buckets + # for small tables we end up with a lot of small queries + # + # dd.read_sql_query function estimates data size from X first rows, + # then splits the data so it fits into "bytes_per_chunk" parameter + # (i think) it uses limit and offset to do this + ranges_to_query = _split_by_timedelta(query_range, shard_size) + + if len(ranges_to_query) == 0: + logging.warning("No ranges to query, returning empty dataframe") + return meta + + logger.debug("Assembling subqueries for %d ranges", len(ranges_to_query)) + parts = [] + for rng in ranges_to_query: + sub_query = _create_subrange_query(query, rng) + parts.append( + delayed(_read_dataframe)(sub_query, meta, conn_kwargs, query_kwargs) + ) + logger.debug("Assembled %d subqueries", len(parts)) + + return dd.from_delayed(parts, meta=meta) diff --git a/setup.py b/setup.py index 62054071..17b4e9a3 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ package_name, "quasardb.pandas", "quasardb.numpy", + "quasardb.dask", "quasardb.extensions", ] @@ -205,6 +206,7 @@ def run(self): extras_require={ "pandas": ["pandas"], "test": ["pytest"], + "dask": ["dask[dataframe]", "dask[delayed]", "pandas", "dateparser"], }, packages=packages, package_data={package_name: package_modules}, diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 00000000..814fe199 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,63 @@ +import pytest +import quasardb.pandas as qdbpd +import quasardb.dask as qdbdsk +import numpy.ma as ma +import logging +import numpy as np +import pandas as pd + +logger = logging.getLogger("test-dask") + +def _to_numpy_masked(xs): + data = xs.to_numpy() + mask = xs.isna() + return ma.masked_array(data=data, mask=mask) + + +def _assert_series_equal(lhs, rhs): + lhs_ = _to_numpy_masked(lhs) + rhs_ = _to_numpy_masked(rhs) + + assert ma.count_masked(lhs_) == ma.count_masked(rhs_) + + logger.debug("lhs: %s", lhs_[:10]) + logger.debug("rhs: %s", rhs_[:10]) + + lhs_ = lhs_.torecords() + rhs_ = rhs_.torecords() + + for (lval, lmask), (rval, rmask) in zip(lhs_, rhs_): + assert lmask == rmask + + if lmask is False: + assert lval == rval + + +def _assert_df_equal(lhs, rhs): + """ + Verifies DataFrames lhs and rhs are equal(ish). We're not pedantic that we're comparing + metadata and things like that. + + Typically one would use `lhs` for the DataFrame that was generated in code, and + `rhs` for the DataFrame that's returned by qdbpd. + """ + + np.testing.assert_array_equal(lhs.index.to_numpy(), rhs.index.to_numpy()) + assert len(lhs.columns) == len(rhs.columns) + for col in lhs.columns: + _assert_series_equal(lhs[col], rhs[col]) + + +def test_compare_pandas_dask_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + (_, _, df, table) = df_with_table + + qdbpd_write_fn(df, qdbd_connection, table, write_through=True) + + table_name = table.get_name() + + query = f"SELECT * FROM \"{table_name}\"" + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, uri="qdb://127.0.0.1:2836").compute() + + _assert_df_equal(pandas_df, dask_df) From f584699e6ea60c8826c000f32e53698fcbcd5c51 Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 21 May 2025 14:48:49 +0200 Subject: [PATCH 02/19] check if is select query --- quasardb/dask/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index bdd80b83..260d2a05 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -68,6 +68,7 @@ class DateParserRequired(ImportError): "dateparser required!" ) from err +general_select_pattern = re.compile(r'(?i)^\s*SELECT\b') table_pattern = re.compile(r"(?i)\bFROM\s+([`\"\[]?\w+[`\"\]]?)") range_pattern = re.compile(r"(?i)\bIN\s+RANGE\s*\(([^,]+),\s*([^,]+)\)") @@ -228,6 +229,11 @@ def query( blobs: bool = False, numpy: bool = False, ): + if not re.match(general_select_pattern, query): + raise NotImplementedError( + "Only SELECT queries are supported. Please refer to the documentation for more information." + ) + conn_kwargs = { "uri": uri, "user_name": user_name, From 8cb26415a2b461733676236caaff9cc11555cbb1 Mon Sep 17 00:00:00 2001 From: Igor Date: Wed, 21 May 2025 14:57:19 +0200 Subject: [PATCH 03/19] add query tests, comapring with pandas results --- tests/test_dask.py | 102 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 4 deletions(-) diff --git a/tests/test_dask.py b/tests/test_dask.py index 814fe199..8a8e4636 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd +logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("test-dask") def _to_numpy_masked(xs): @@ -48,16 +49,109 @@ def _assert_df_equal(lhs, rhs): _assert_series_equal(lhs[col], rhs[col]) -def test_compare_pandas_dask_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): +def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: str = "*", use_tag: bool=False, query_range: tuple[pd.Timestamp, pd.Timestamp]=None, group_by: str=None): (_, _, df, table) = df_with_table qdbpd_write_fn(df, qdbd_connection, table, write_through=True) - table_name = table.get_name() + q = "SELECT {} ".format(columns) + + if use_tag: + table.attach_tag("dask_tag") + q += "FROM find(tag='dask_tag')" + else: + q += "FROM \"{}\"".format(table_name) + + if query_range: + q += " IN RANGE({}, {})".format(query_range[0], query_range[1]) - query = f"SELECT * FROM \"{table_name}\"" + if group_by: + q += " GROUP BY {}".format(group_by) + + return (df, table, q) + +### Query tests, we care about results of dask matching those of pandas + +def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + df, table, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*") + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + + _assert_df_equal(pandas_df, dask_df) + +# @pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1]) +def test_dask_df_select_star_in_range(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + _, _, df, _ = df_with_table + + start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") + # end_row = int(len(df)-1 * query_range_percentile) + end_str = (df.index[-1] + pd.Timedelta(microseconds=1)).to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") + + df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", query_range=(start_str, end_str)) + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + + _assert_df_equal(pandas_df, dask_df) + +def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + _, _, df, _ = df_with_table + columns = ", ".join([f"{col}" for col in df.columns]) + df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns) + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + + _assert_df_equal(pandas_df, dask_df) + +def test_dask_df_select_columns_with_alias_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + _, _, df, _ = df_with_table + columns = ", ".join([f"{col} as alias_{col}" for col in df.columns]) + df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns) + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + + _assert_df_equal(pandas_df, dask_df) + +@pytest.mark.parametrize("group_by", ["1s", "1min", "1d"]) +def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, group_by): + _, _, df, _ = df_with_table + columns = ", ".join([f"count({col})" for col in df.columns]) + df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns=f"$timestamp, {columns}", group_by=group_by) pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, uri="qdb://127.0.0.1:2836").compute() + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() _assert_df_equal(pandas_df, dask_df) + + +@pytest.mark.skip(reason="Not implemented yet") +def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): + _, _, df, _ = df_with_table + + df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", use_tag=True) + + pandas_df = qdbpd.query(qdbd_connection, query) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + + _assert_df_equal(pandas_df, dask_df) + +@pytest.mark.parametrize( + "query", [ + "INSERT INTO test ($timestamp, x) VALUES (now(), 2)", + "DROP TABLE test", + "DELETE FROM test", + "CREATE TABLE test (x INT64)", + "SHOW TABLE test", + "ALTER TABLE test ADD COLUMN y INT64", + "SHOW DISK USAGE ON test" + ] +) +def test_dask_exception_on_non_select_query(qdbd_settings, query): + """ + Tests that a non-select query raises an exception + """ + with pytest.raises(NotImplementedError): + qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) \ No newline at end of file From 0520247bbdea3c21f426d78a0f68b766b8c84518 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 May 2025 10:07:17 +0200 Subject: [PATCH 04/19] make sure df spans multiple shards --- tests/conftest.py | 7 +++++-- tests/test_dask.py | 27 ++++++++++++++++++++------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 4443dc38..cdfa805c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -763,11 +763,14 @@ def deduplicate(request, column_name): def deduplication_mode(request): return request.param +@pytest.fixture(params=["S"], ids=["frequency=S"]) +def frequency(request): + yield request.param @pytest.fixture -def gen_index(start_date, row_count): +def gen_index(start_date, row_count, frequency): return pd.Index( - pd.date_range(start_date, periods=row_count, freq="S"), name="$timestamp" + pd.date_range(start_date, periods=row_count, freq=frequency), name="$timestamp" ) diff --git a/tests/test_dask.py b/tests/test_dask.py index 8a8e4636..e1ba85e5 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -72,29 +72,35 @@ def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: ### Query tests, we care about results of dask matching those of pandas +@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): df, table, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*") pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) + _assert_df_equal(pandas_df, dask_df) -# @pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1]) -def test_dask_df_select_star_in_range(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): +@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1]) +def test_dask_df_select_star_in_range(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, query_range_percentile): _, _, df, _ = df_with_table start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") - # end_row = int(len(df)-1 * query_range_percentile) - end_str = (df.index[-1] + pd.Timedelta(microseconds=1)).to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") + end_row = int((len(df)-1) * query_range_percentile) + end_str = (df.index[end_row].to_pydatetime() + pd.Timedelta(microseconds=1)).strftime("%Y-%m-%dT%H:%M:%S.%f") df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", query_range=(start_str, end_str)) - + pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) +@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): _, _, df, _ = df_with_table columns = ", ".join([f"{col}" for col in df.columns]) @@ -102,6 +108,7 @@ def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) @@ -112,10 +119,13 @@ def test_dask_df_select_columns_with_alias_equals_pandas_df(qdbpd_write_fn, df_w pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) -@pytest.mark.parametrize("group_by", ["1s", "1min", "1d"]) + +@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("group_by", ["1h", "1d"]) def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, group_by): _, _, df, _ = df_with_table columns = ", ".join([f"count({col})" for col in df.columns]) @@ -123,10 +133,12 @@ def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_wi pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) +@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) @pytest.mark.skip(reason="Not implemented yet") def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): _, _, df, _ = df_with_table @@ -135,6 +147,7 @@ def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) @@ -154,4 +167,4 @@ def test_dask_exception_on_non_select_query(qdbd_settings, query): Tests that a non-select query raises an exception """ with pytest.raises(NotImplementedError): - qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) \ No newline at end of file + qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) From 75f961970e5a65b2ccffddc979770a1ca2b6efb6 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 May 2025 12:07:59 +0200 Subject: [PATCH 05/19] mock functions --- quasardb/cluster.cpp | 5 ++++- quasardb/cluster.hpp | 45 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/quasardb/cluster.cpp b/quasardb/cluster.cpp index a2f24987..67f468c2 100644 --- a/quasardb/cluster.cpp +++ b/quasardb/cluster.cpp @@ -191,7 +191,10 @@ void register_cluster(py::module_ & m) .def("compact_progress", &qdb::cluster::compact_progress) // .def("compact_abort", &qdb::cluster::compact_abort) // .def("wait_for_compaction", &qdb::cluster::wait_for_compaction) // - .def("endpoints", &qdb::cluster::endpoints); // + .def("endpoints", &qdb::cluster::endpoints) // + .def("validate_query", &qdb::cluster::validate_query) // + .def("split_query_range", &qdb::cluster::split_query_range); // + } }; // namespace qdb diff --git a/quasardb/cluster.hpp b/quasardb/cluster.hpp index b6867e88..205035e8 100644 --- a/quasardb/cluster.hpp +++ b/quasardb/cluster.hpp @@ -61,6 +61,10 @@ #include #include +// for validate_query mock +#include + + namespace qdb { @@ -496,6 +500,47 @@ class cluster return results; } + py::object validate_query(const std::string & query_string) + { + check_open(); + + std::string query = query_string; + const std::string limit_string = "LIMIT 1"; + query += " " + limit_string; + + // std::regex limit_pattern(R"(\bLIMIT\s+\d+)", std::regex_constants::icase); + // if (std::regex_search(query, limit_pattern)) { + // query = std::regex_replace(query, limit_pattern, limit_string); + // } else { + // query += " " + limit_string; + // } + + // TODO: + // should return dict of column names and dtypes + // currently returns numpy masked arrays + return py::cast(qdb::numpy_query(_handle, query)); + } + + py::object split_query_range(std::chrono::system_clock::time_point start, std::chrono::system_clock::time_point end, std::chrono::milliseconds delta) + { + // TODO: + // for now this accepts time ranges and delta size + // it should accept query string and do extraction and splitting + // + std::vector> ranges; + + for (auto current_start = start; current_start < end; ) { + auto current_end = current_start + delta; + if (current_end > end) { + current_end = end; + } + ranges.emplace_back(current_start, current_end); + current_start = current_end; + } + return py::cast(ranges); + } + + private: std::string _uri; handle_ptr _handle; From 5999a68838b54c7454926e46f3d1e869bd195a7d Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 May 2025 12:35:55 +0200 Subject: [PATCH 06/19] adjust tests --- quasardb/dask/__init__.py | 126 +++++++++++++++----------------------- tests/test_dask.py | 77 +++++++++++------------ 2 files changed, 84 insertions(+), 119 deletions(-) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index 260d2a05..faf3eee4 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -110,29 +110,28 @@ def _extract_range_from_query(conn, query: str, table_name: str) -> tuple: # if we can't do it we will query first() and last() from the table query_range = tuple() if match: - if len(match.group()) == 2: - start_str = match.group(1) - end_str = match.group(2) - logger.debug("Extracted strings: (%s, %s)", start_str, end_str) - parser_settings = { - "PREFER_DAY_OF_MONTH": "first", - "PREFER_MONTH_OF_YEAR": "first", - } - start_date = dateparser.parse(start_str, settings=parser_settings) - end_date = dateparser.parse(end_str, settings=parser_settings) - query_range = (start_date, end_date) - logger.debug("Parsed datetime: %s", query_range) - return query_range - - logger.debug( - "No range found in query, querying table for first and last timestamp" - ) - range_query = f"SELECT first($timestamp), last($timestamp) FROM {table_name}" - df = qdbpd.query(conn, range_query) - if not df.empty: - df.iloc[0]["last($timestamp)"] += datetime.timedelta(microseconds=1) - query_range += tuple(df.iloc[0]) - logger.debug("Extracted range from table: %s", query_range) + start_str = match.group(1) + end_str = match.group(2) + logger.debug("Extracted strings: (%s, %s)", start_str, end_str) + parser_settings = { + "PREFER_DAY_OF_MONTH": "first", + "PREFER_MONTH_OF_YEAR": "first", + } + start_date = dateparser.parse(start_str, settings=parser_settings) + end_date = dateparser.parse(end_str, settings=parser_settings) + query_range = (start_date, end_date) + logger.debug("Parsed datetime: %s", query_range) + else: + logger.debug( + "No range found in query, querying table for first and last timestamp" + ) + range_query = f"SELECT first($timestamp), last($timestamp) FROM {table_name}" + df = qdbpd.query(conn, range_query) + if not df.empty: + df.loc[0, "last($timestamp)"] += datetime.timedelta(microseconds=1) + query_range += tuple(df.iloc[0]) + logger.debug("Extracted range from table: %s", query_range) + return query_range @@ -168,48 +167,31 @@ def _create_subrange_query( return new_query -def _split_by_timedelta( - query_range: tuple[datetime.datetime, datetime.datetime], delta: datetime.timedelta -) -> tuple[datetime.datetime, datetime.datetime]: - """ - Splits passed range into smaller ranges of size of delta. - """ - - ranges = [] - - if len(query_range) != 2: - return ranges - - start = query_range[0] - end = query_range[1] - current_start = start +def _get_subqueries(conn, query: str, table_name: str) -> list[str]: + # TODO: all of this should be moved to c++ side + shard_size = conn.table(table_name.replace("\"", "")).get_shard_size() + start, end = _extract_range_from_query(conn, query, table_name) + ranges_to_query = conn.split_query_range(start, end, shard_size) - while current_start < end: - current_end = min(current_start + delta, end) - ranges.append((current_start, current_end)) - current_start = current_end - - return ranges + subqueries = [] + for rng in ranges_to_query: + subqueries.append(_create_subrange_query(query, rng)) + return subqueries -def _get_meta(conn, query: str) -> pd.DataFrame: +def _get_meta(conn, query: str, query_kwargs: dict) -> pd.DataFrame: """ - Meta is an empty dataframe with the expected schema of the query result. - Extract df schema from the first row of the query result. + Returns empty dataframe with the expected schema of the query result. """ - # XXX:igor we can use different approaches to get the meta - # 1. get the meta from the first row - # this will require us to modify passed query, we have to add a limit 1 (check for existing limit, check for offset) - # does this put a lot of pressure on the db? - # this approach provides the most accurate meta - # 2. get the meta from the table schema - # we will have to add $timestamp and $table for "select *" queries - # with more complicated queries we can run into different edge cases - # we will have to extract actual col names and aliases - # get types for col names and assign them to the aliases - # will be less accurate - query += " LIMIT 1" - return qdbpd.query(conn, query).iloc[:0] + np_res = conn.validate_query(query) + col_dtypes = {} + for id, column in enumerate(np_res): + col_dtypes[column[0]] = pd.Series(dtype=column[1].dtype) + + df = pd.DataFrame(col_dtypes) + if query_kwargs["index"]: + df.set_index(query_kwargs["index"], inplace=True) + return df def query( @@ -256,27 +238,17 @@ def query( table_name = _extract_table_name_from_query(query) with quasardb.Cluster(**conn_kwargs) as conn: - meta = _get_meta(conn, query) - shard_size = conn.table(table_name.replace("\"", "")).get_shard_size() - query_range = _extract_range_from_query(conn, query, table_name) - # XXX:igor this will work good for tables with a lot of data in all buckets - # for small tables we end up with a lot of small queries - # - # dd.read_sql_query function estimates data size from X first rows, - # then splits the data so it fits into "bytes_per_chunk" parameter - # (i think) it uses limit and offset to do this - ranges_to_query = _split_by_timedelta(query_range, shard_size) - - if len(ranges_to_query) == 0: - logging.warning("No ranges to query, returning empty dataframe") + meta = _get_meta(conn, query, query_kwargs) + subqueries = _get_subqueries(conn, query, table_name) + + if len(subqueries) == 0: + logging.warning("No subqueries, returning empty dataframe") return meta - logger.debug("Assembling subqueries for %d ranges", len(ranges_to_query)) parts = [] - for rng in ranges_to_query: - sub_query = _create_subrange_query(query, rng) + for subquery in subqueries: parts.append( - delayed(_read_dataframe)(sub_query, meta, conn_kwargs, query_kwargs) + delayed(_read_dataframe)(subquery, meta, conn_kwargs, query_kwargs) ) logger.debug("Assembled %d subqueries", len(parts)) diff --git a/tests/test_dask.py b/tests/test_dask.py index e1ba85e5..545eafc9 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -49,14 +49,14 @@ def _assert_df_equal(lhs, rhs): _assert_series_equal(lhs[col], rhs[col]) -def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: str = "*", use_tag: bool=False, query_range: tuple[pd.Timestamp, pd.Timestamp]=None, group_by: str=None): +def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: str = "*", query_range: tuple[pd.Timestamp, pd.Timestamp]=None, group_by: str=None, attach_tag: bool=False): (_, _, df, table) = df_with_table qdbpd_write_fn(df, qdbd_connection, table, write_through=True) table_name = table.get_name() q = "SELECT {} ".format(columns) - if use_tag: + if attach_tag: table.attach_tag("dask_tag") q += "FROM find(tag='dask_tag')" else: @@ -70,53 +70,48 @@ def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: return (df, table, q) -### Query tests, we care about results of dask matching those of pandas - -@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) -def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): - df, table, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*") - - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() - dask_df = dask_df.reset_index(drop=True) +def _get_subrange(df: pd.DataFrame, percent_of_original_df:int=0.1) -> tuple[pd.Timestamp, pd.Timestamp]: + """ + Returns a random subrange of the DataFrame. The size of the subrange is + """ + query_range = () + if percent_of_original_df != 1: + start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") + end_row = int((len(df)-1) * percent_of_original_df) + end_str = df.index[end_row].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") + query_range = (start_str, end_str) + return query_range - _assert_df_equal(pandas_df, dask_df) +### Query tests, we care about results of dask matching those of pandas -@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1]) -def test_dask_df_select_star_in_range(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, query_range_percentile): +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1], ids=["query_range_percentile=1", "query_range_percentile=0.5", "query_range_percentile=0.25", "query_range_percentile=0.1"]) +@pytest.mark.parametrize("query_options", [{"index": None}, {"index": "$timestamp"}], ids=["index=None", "index=$timestamp"]) +def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, query_options, query_range_percentile): _, _, df, _ = df_with_table + query_range = _get_subrange(df, query_range_percentile) + _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", query_range) - start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") - end_row = int((len(df)-1) * query_range_percentile) - end_str = (df.index[end_row].to_pydatetime() + pd.Timedelta(microseconds=1)).strftime("%Y-%m-%dT%H:%M:%S.%f") - - df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", query_range=(start_str, end_str)) + pandas_df = qdbpd.query(qdbd_connection, query, **query_options) + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure"), **query_options).compute() - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() - dask_df = dask_df.reset_index(drop=True) + if query_options.get("index") is None: + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) -@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) -def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): - _, _, df, _ = df_with_table - columns = ", ".join([f"{col}" for col in df.columns]) - df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns) - - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() - dask_df = dask_df.reset_index(drop=True) - - _assert_df_equal(pandas_df, dask_df) -def test_dask_df_select_columns_with_alias_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1], ids=["query_range_percentile=1", "query_range_percentile=0.5", "query_range_percentile=0.25", "query_range_percentile=0.1"]) +@pytest.mark.parametrize("use_alias", [False, True], ids=["use_alias=False", "use_alias=True"]) +def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, use_alias, query_range_percentile): _, _, df, _ = df_with_table - columns = ", ".join([f"{col} as alias_{col}" for col in df.columns]) - df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns) + columns = ", ".join([f"{col} as {col}_alias" if use_alias else f"{col}" for col in df.columns]) + + query_range = _get_subrange(df, query_range_percentile) + _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns, query_range) pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() dask_df = dask_df.reset_index(drop=True) @@ -124,7 +119,7 @@ def test_dask_df_select_columns_with_alias_equals_pandas_df(qdbpd_write_fn, df_w _assert_df_equal(pandas_df, dask_df) -@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.parametrize("group_by", ["1h", "1d"]) def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, group_by): _, _, df, _ = df_with_table @@ -138,12 +133,10 @@ def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_wi _assert_df_equal(pandas_df, dask_df) -@pytest.mark.parametrize("frequency", ["H"], ids=["frequency=H"], indirect=True) +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.skip(reason="Not implemented yet") def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): - _, _, df, _ = df_with_table - - df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", use_tag=True) + _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", attach_tag=True) pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() From 3d47e06d794a469d561fb95329dda5d9591ec988 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 May 2025 13:37:42 +0200 Subject: [PATCH 07/19] update dev requirements --- dev-requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index 801c0f52..7c7272e4 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -28,6 +28,8 @@ pytest pytest-runner pytest-benchmark +dateparser # needed for dask + # Seems like numpy>2 requires this in combination with pytest, # but is never set in the requirements. hypothesis From 24b990fe81c166a369a6a50b6e41d23817b9d684 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 May 2025 14:24:54 +0200 Subject: [PATCH 08/19] update dev requirements --- dev-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.txt b/dev-requirements.txt index 7c7272e4..7f63ecab 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -29,6 +29,7 @@ pytest-runner pytest-benchmark dateparser # needed for dask +dask[complete] # needed for dask # Seems like numpy>2 requires this in combination with pytest, # but is never set in the requirements. From 3cd86c1fb8dd6b631658ea6c54556cc4fd44b6b7 Mon Sep 17 00:00:00 2001 From: Igor Date: Fri, 23 May 2025 10:05:18 +0200 Subject: [PATCH 09/19] cleanup --- quasardb/cluster.hpp | 16 +--------------- quasardb/dask/__init__.py | 5 +++-- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/quasardb/cluster.hpp b/quasardb/cluster.hpp index 205035e8..74abffac 100644 --- a/quasardb/cluster.hpp +++ b/quasardb/cluster.hpp @@ -61,9 +61,6 @@ #include #include -// for validate_query mock -#include - namespace qdb { @@ -507,14 +504,7 @@ class cluster std::string query = query_string; const std::string limit_string = "LIMIT 1"; query += " " + limit_string; - - // std::regex limit_pattern(R"(\bLIMIT\s+\d+)", std::regex_constants::icase); - // if (std::regex_search(query, limit_pattern)) { - // query = std::regex_replace(query, limit_pattern, limit_string); - // } else { - // query += " " + limit_string; - // } - + // TODO: // should return dict of column names and dtypes // currently returns numpy masked arrays @@ -523,10 +513,6 @@ class cluster py::object split_query_range(std::chrono::system_clock::time_point start, std::chrono::system_clock::time_point end, std::chrono::milliseconds delta) { - // TODO: - // for now this accepts time ranges and delta size - // it should accept query string and do extraction and splitting - // std::vector> ranges; for (auto current_start = start; current_start < end; ) { diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index faf3eee4..b4e201f7 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -65,7 +65,7 @@ class DateParserRequired(ImportError): except ImportError as err: logger.exception(err) raise DaskRequired( - "dateparser required!" + "Dateparser library is required to use QuasarDB dask integration." ) from err general_select_pattern = re.compile(r'(?i)^\s*SELECT\b') @@ -168,7 +168,8 @@ def _create_subrange_query( def _get_subqueries(conn, query: str, table_name: str) -> list[str]: - # TODO: all of this should be moved to c++ side + # XXX: igor + # this will be moved to c++ functions in the future shard_size = conn.table(table_name.replace("\"", "")).get_shard_size() start, end = _extract_range_from_query(conn, query, table_name) ranges_to_query = conn.split_query_range(start, end, shard_size) From 690d4dfb2b7d19f1af51f1f223c12c283d20da0c Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 10:30:40 +0200 Subject: [PATCH 10/19] add more tests --- quasardb/dask/__init__.py | 7 +- tests/conftest.py | 2 + tests/test_dask.py | 252 ++++++++++++++++++++++++++------------ 3 files changed, 182 insertions(+), 79 deletions(-) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index b4e201f7..36efe845 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -56,9 +56,11 @@ class DaskRequired(ImportError): "The dask library is required to use QuasarDB dask integration." ) from err + class DateParserRequired(ImportError): pass + try: import dateparser @@ -68,7 +70,7 @@ class DateParserRequired(ImportError): "Dateparser library is required to use QuasarDB dask integration." ) from err -general_select_pattern = re.compile(r'(?i)^\s*SELECT\b') +general_select_pattern = re.compile(r"(?i)^\s*SELECT\b") table_pattern = re.compile(r"(?i)\bFROM\s+([`\"\[]?\w+[`\"\]]?)") range_pattern = re.compile(r"(?i)\bIN\s+RANGE\s*\(([^,]+),\s*([^,]+)\)") @@ -170,7 +172,7 @@ def _create_subrange_query( def _get_subqueries(conn, query: str, table_name: str) -> list[str]: # XXX: igor # this will be moved to c++ functions in the future - shard_size = conn.table(table_name.replace("\"", "")).get_shard_size() + shard_size = conn.table(table_name.replace('"', "")).get_shard_size() start, end = _extract_range_from_query(conn, query, table_name) ranges_to_query = conn.split_query_range(start, end, shard_size) @@ -236,7 +238,6 @@ def query( "numpy": numpy, } - table_name = _extract_table_name_from_query(query) with quasardb.Cluster(**conn_kwargs) as conn: meta = _get_meta(conn, query, query_kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index cdfa805c..71e48647 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -763,10 +763,12 @@ def deduplicate(request, column_name): def deduplication_mode(request): return request.param + @pytest.fixture(params=["S"], ids=["frequency=S"]) def frequency(request): yield request.param + @pytest.fixture def gen_index(start_date, row_count, frequency): return pd.Index( diff --git a/tests/test_dask.py b/tests/test_dask.py index 545eafc9..745c1381 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -1,58 +1,31 @@ +import math import pytest +import quasardb import quasardb.pandas as qdbpd import quasardb.dask as qdbdsk import numpy.ma as ma import logging import numpy as np import pandas as pd +import dask.dataframe as dd +from dask.distributed import LocalCluster, Client +import conftest +from test_pandas import _assert_df_equal -logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("test-dask") -def _to_numpy_masked(xs): - data = xs.to_numpy() - mask = xs.isna() - return ma.masked_array(data=data, mask=mask) - -def _assert_series_equal(lhs, rhs): - lhs_ = _to_numpy_masked(lhs) - rhs_ = _to_numpy_masked(rhs) - - assert ma.count_masked(lhs_) == ma.count_masked(rhs_) - - logger.debug("lhs: %s", lhs_[:10]) - logger.debug("rhs: %s", rhs_[:10]) - - lhs_ = lhs_.torecords() - rhs_ = rhs_.torecords() - - for (lval, lmask), (rval, rmask) in zip(lhs_, rhs_): - assert lmask == rmask - - if lmask is False: - assert lval == rval - - -def _assert_df_equal(lhs, rhs): - """ - Verifies DataFrames lhs and rhs are equal(ish). We're not pedantic that we're comparing - metadata and things like that. - - Typically one would use `lhs` for the DataFrame that was generated in code, and - `rhs` for the DataFrame that's returned by qdbpd. - """ - - np.testing.assert_array_equal(lhs.index.to_numpy(), rhs.index.to_numpy()) - assert len(lhs.columns) == len(rhs.columns) - for col in lhs.columns: - _assert_series_equal(lhs[col], rhs[col]) - - -def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: str = "*", query_range: tuple[pd.Timestamp, pd.Timestamp]=None, group_by: str=None, attach_tag: bool=False): +def _prepare_query_test( + df_with_table, + qdbd_connection, + columns: str = "*", + query_range: tuple[pd.Timestamp, pd.Timestamp] = None, + group_by: str = None, + attach_tag: bool = False, +): (_, _, df, table) = df_with_table - qdbpd_write_fn(df, qdbd_connection, table, write_through=True) + qdbpd.write_dataframe(df, qdbd_connection, table, write_through=True) table_name = table.get_name() q = "SELECT {} ".format(columns) @@ -60,41 +33,143 @@ def _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns: table.attach_tag("dask_tag") q += "FROM find(tag='dask_tag')" else: - q += "FROM \"{}\"".format(table_name) - + q += 'FROM "{}"'.format(table_name) + if query_range: q += " IN RANGE({}, {})".format(query_range[0], query_range[1]) - + if group_by: q += " GROUP BY {}".format(group_by) return (df, table, q) -def _get_subrange(df: pd.DataFrame, percent_of_original_df:int=0.1) -> tuple[pd.Timestamp, pd.Timestamp]: +def _get_subrange( + df: pd.DataFrame, slice_size: int = 0.1 +) -> tuple[pd.Timestamp, pd.Timestamp]: """ - Returns a random subrange of the DataFrame. The size of the subrange is + Returns slice of the Dataframe index to be used in the query. """ query_range = () - if percent_of_original_df != 1: + if slice_size != 1: start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") - end_row = int((len(df)-1) * percent_of_original_df) + end_row = int((len(df) - 1) * slice_size) end_str = df.index[end_row].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") query_range = (start_str, end_str) return query_range -### Query tests, we care about results of dask matching those of pandas + +#### Dask integration tests + +@conftest.override_cdtypes([np.dtype("float64")]) +@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) +@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) +def test_dask_query_meta_set(df_with_table, qdbd_connection, qdbd_settings): + """ + tests that the columns are set correctly in the dask DataFrame + """ + _, _, query = _prepare_query_test(df_with_table, qdbd_connection) + df = qdbpd.query(qdbd_connection, query) + ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + + dask_meta = ddf._meta_nonempty + pandas_cols = df.columns + + assert dask_meta.columns.names == pandas_cols.names, "column names do not match" + + for col_name in pandas_cols: + # treat string[pyarrow] as object + if dask_meta[col_name].dtype == "string[pyarrow]": + dask_meta[col_name] = dask_meta[col_name].astype("object") + + assert ( + dask_meta[col_name].dtype == df[col_name].dtype + ), f"dtype of column {col_name} does not match" + + +@conftest.override_cdtypes([np.dtype("float64")]) +@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) +@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) +def test_dask_query_lazy_evaluation(df_with_table, qdbd_connection, qdbd_settings): + """ + tests that the function is lazy and does not return a Dataframe immediately. + """ + + _, _, query = _prepare_query_test(df_with_table, qdbd_connection) + + ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + + assert isinstance(ddf, dd.DataFrame) + result = ddf.compute() + assert isinstance(result, pd.DataFrame) + + +@conftest.override_cdtypes([np.dtype("float64")]) +@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) +@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) +def test_dask_query_parallelized(df_with_table, qdbd_connection, qdbd_settings): + _, _, df, table = df_with_table + shard_size = table.get_shard_size() + start, end = df.index[0], df.index[-1] + + _, _, query = _prepare_query_test(df_with_table, qdbd_connection) + + ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + + # value of npartitions determines number of delayed tasks + # delayed tasks can be executed in parallel + # query range is split by shard size + expected_number_of_partitions = math.ceil( + (end - start).total_seconds() / shard_size.total_seconds() + ) + assert ddf.npartitions == expected_number_of_partitions + + +@conftest.override_cdtypes([np.dtype("float64")]) +@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) +@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) +def test_dask_compute_on_local_cluster(df_with_table, qdbd_connection, qdbd_settings): + _, _, query = _prepare_query_test(df_with_table, qdbd_connection) + + with LocalCluster(n_workers=2) as cluster: + with Client(cluster): + ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + res = ddf.compute() + res.head() + + +### Query tests, we care about results of dask query matching those of pandas +# when using default index, it has to be reset to match pandas DataFrame. +# +# index for a Dask DataFrame will not be monotonically increasing from 0. +# Instead, it will restart at 0 for each partition (e.g. index1 = [0, ..., 10], index2 = [0, ...]). +# This is due to the inability to statically know the full length of the index. +# https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.reset_index.html + @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1], ids=["query_range_percentile=1", "query_range_percentile=0.5", "query_range_percentile=0.25", "query_range_percentile=0.1"]) -@pytest.mark.parametrize("query_options", [{"index": None}, {"index": "$timestamp"}], ids=["index=None", "index=$timestamp"]) -def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, query_options, query_range_percentile): +@pytest.mark.parametrize( + "range_slice", + [1, 0.5, 0.25, 0.1], + ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25", "range_slice=0.1"], +) +@pytest.mark.parametrize( + "query_options", + [{"index": None}, {"index": "$timestamp"}], + ids=["index=None", "index=$timestamp"], +) +def test_dask_df_select_star_equals_pandas_df( + df_with_table, qdbd_connection, qdbd_settings, query_options, range_slice +): _, _, df, _ = df_with_table - query_range = _get_subrange(df, query_range_percentile) - _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", query_range) + query_range = _get_subrange(df, range_slice) + _, _, query = _prepare_query_test(df_with_table, qdbd_connection, "*", query_range) pandas_df = qdbpd.query(qdbd_connection, query, **query_options) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure"), **query_options).compute() + dask_df = qdbdsk.query( + query, qdbd_settings.get("uri").get("insecure"), **query_options + ).compute() if query_options.get("index") is None: dask_df = dask_df.reset_index(drop=True) @@ -103,15 +178,27 @@ def test_dask_df_select_star_equals_pandas_df(qdbpd_write_fn, df_with_table, qdb @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize("query_range_percentile", [1, 0.5, 0.25, 0.1], ids=["query_range_percentile=1", "query_range_percentile=0.5", "query_range_percentile=0.25", "query_range_percentile=0.1"]) -@pytest.mark.parametrize("use_alias", [False, True], ids=["use_alias=False", "use_alias=True"]) -def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, use_alias, query_range_percentile): +@pytest.mark.parametrize( + "range_slice", + [1, 0.5, 0.25, 0.1], + ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25", "range_slice=0.1"], +) +@pytest.mark.parametrize( + "use_alias", [False, True], ids=["use_alias=False", "use_alias=True"] +) +def test_dask_df_select_columns_equals_pandas_df( + df_with_table, qdbd_connection, qdbd_settings, use_alias, range_slice +): _, _, df, _ = df_with_table - columns = ", ".join([f"{col} as {col}_alias" if use_alias else f"{col}" for col in df.columns]) + columns = ", ".join( + [f"{col} as {col}_alias" if use_alias else f"{col}" for col in df.columns] + ) - query_range = _get_subrange(df, query_range_percentile) - _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns, query_range) + query_range = _get_subrange(df, range_slice) + _, _, query = _prepare_query_test( + df_with_table, qdbd_connection, columns, query_range + ) pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() dask_df = dask_df.reset_index(drop=True) @@ -121,10 +208,17 @@ def test_dask_df_select_columns_equals_pandas_df(qdbpd_write_fn, df_with_table, @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.parametrize("group_by", ["1h", "1d"]) -def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings, group_by): +def test_dask_df_select_agg_group_by_time_equals_pandas_df( + df_with_table, qdbd_connection, qdbd_settings, group_by +): _, _, df, _ = df_with_table columns = ", ".join([f"count({col})" for col in df.columns]) - df, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, columns=f"$timestamp, {columns}", group_by=group_by) + df, _, query = _prepare_query_test( + df_with_table, + qdbd_connection, + columns=f"$timestamp, {columns}", + group_by=group_by, + ) pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() @@ -135,8 +229,12 @@ def test_dask_df_select_agg_group_by_time_equals_pandas_df(qdbpd_write_fn, df_wi @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.skip(reason="Not implemented yet") -def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, qdbd_connection, qdbd_settings): - _, _, query = _prepare_query_test(qdbpd_write_fn, df_with_table, qdbd_connection, "*", attach_tag=True) +def test_dask_df_select_find_tag_equals_pandas_df( + df_with_table, qdbd_connection, qdbd_settings +): + _, _, query = _prepare_query_test( + df_with_table, qdbd_connection, "*", attach_tag=True + ) pandas_df = qdbpd.query(qdbd_connection, query) dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() @@ -144,18 +242,20 @@ def test_dask_df_select_find_tag_equals_pandas_df(qdbpd_write_fn, df_with_table, _assert_df_equal(pandas_df, dask_df) + @pytest.mark.parametrize( - "query", [ - "INSERT INTO test ($timestamp, x) VALUES (now(), 2)", - "DROP TABLE test", - "DELETE FROM test", - "CREATE TABLE test (x INT64)", - "SHOW TABLE test", - "ALTER TABLE test ADD COLUMN y INT64", - "SHOW DISK USAGE ON test" - ] + "query", + [ + "INSERT INTO test ($timestamp, x) VALUES (now(), 2)", + "DROP TABLE test", + "DELETE FROM test", + "CREATE TABLE test (x INT64)", + "SHOW TABLE test", + "ALTER TABLE test ADD COLUMN y INT64", + "SHOW DISK USAGE ON test", + ], ) -def test_dask_exception_on_non_select_query(qdbd_settings, query): +def test_dask_query_exception_on_non_select_query(qdbd_settings, query): """ Tests that a non-select query raises an exception """ From c2cbb04e2a64d501ee2c2ffb6e379cc72ca530ed Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 10:35:21 +0200 Subject: [PATCH 11/19] comments --- quasardb/dask/__init__.py | 3 ++- tests/test_dask.py | 16 ---------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index 36efe845..b70749c8 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -89,7 +89,8 @@ def _read_dataframe( def _extract_table_name_from_query(query: str) -> str: - # XXX:igor for now this works for queries using only one table + # XXX:igor for now this works for queries using one table + # tags and multiple tables are not supported yet logger.debug('Extracting table name from query: "%s"', query) match = re.search(table_pattern, query) diff --git a/tests/test_dask.py b/tests/test_dask.py index 745c1381..3e9b7863 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -227,22 +227,6 @@ def test_dask_df_select_agg_group_by_time_equals_pandas_df( _assert_df_equal(pandas_df, dask_df) -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.skip(reason="Not implemented yet") -def test_dask_df_select_find_tag_equals_pandas_df( - df_with_table, qdbd_connection, qdbd_settings -): - _, _, query = _prepare_query_test( - df_with_table, qdbd_connection, "*", attach_tag=True - ) - - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() - dask_df = dask_df.reset_index(drop=True) - - _assert_df_equal(pandas_df, dask_df) - - @pytest.mark.parametrize( "query", [ From 5726dbb048eb3c9425fa58f54517ec5024ac2f7f Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 10:42:06 +0200 Subject: [PATCH 12/19] check that dask dataframe is split for tests --- quasardb/dask/__init__.py | 1 - tests/test_dask.py | 26 +++++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index b70749c8..05c697e7 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -171,7 +171,6 @@ def _create_subrange_query( def _get_subqueries(conn, query: str, table_name: str) -> list[str]: - # XXX: igor # this will be moved to c++ functions in the future shard_size = conn.table(table_name.replace('"', "")).get_shard_size() start, end = _extract_range_from_query(conn, query, table_name) diff --git a/tests/test_dask.py b/tests/test_dask.py index 3e9b7863..04d48912 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -141,6 +141,7 @@ def test_dask_compute_on_local_cluster(df_with_table, qdbd_connection, qdbd_sett ### Query tests, we care about results of dask query matching those of pandas # when using default index, it has to be reset to match pandas DataFrame. +# we neeed to check that each query is split into multiple dask partitions # # index for a Dask DataFrame will not be monotonically increasing from 0. # Instead, it will restart at 0 for each partition (e.g. index1 = [0, ..., 10], index2 = [0, ...]). @@ -151,8 +152,8 @@ def test_dask_compute_on_local_cluster(df_with_table, qdbd_connection, qdbd_sett @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.parametrize( "range_slice", - [1, 0.5, 0.25, 0.1], - ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25", "range_slice=0.1"], + [1, 0.5, 0.25], + ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25"], ) @pytest.mark.parametrize( "query_options", @@ -169,7 +170,10 @@ def test_dask_df_select_star_equals_pandas_df( pandas_df = qdbpd.query(qdbd_connection, query, **query_options) dask_df = qdbdsk.query( query, qdbd_settings.get("uri").get("insecure"), **query_options - ).compute() + ) + + assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" + dask_df = dask_df.compute() if query_options.get("index") is None: dask_df = dask_df.reset_index(drop=True) @@ -180,8 +184,8 @@ def test_dask_df_select_star_equals_pandas_df( @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) @pytest.mark.parametrize( "range_slice", - [1, 0.5, 0.25, 0.1], - ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25", "range_slice=0.1"], + [1, 0.5, 0.25], + ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25"], ) @pytest.mark.parametrize( "use_alias", [False, True], ids=["use_alias=False", "use_alias=True"] @@ -200,7 +204,11 @@ def test_dask_df_select_columns_equals_pandas_df( df_with_table, qdbd_connection, columns, query_range ) pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + + assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" + dask_df = dask_df.compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) @@ -221,7 +229,11 @@ def test_dask_df_select_agg_group_by_time_equals_pandas_df( ) pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")).compute() + dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) + + assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" + dask_df = dask_df.compute() + dask_df = dask_df.reset_index(drop=True) _assert_df_equal(pandas_df, dask_df) From 6c16b28ddcdf0fda9ca5479918c3bcd12e235d54 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 10:46:40 +0200 Subject: [PATCH 13/19] remove tag from _prepare_query_test --- tests/test_dask.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/tests/test_dask.py b/tests/test_dask.py index 04d48912..533ceb21 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -21,19 +21,12 @@ def _prepare_query_test( columns: str = "*", query_range: tuple[pd.Timestamp, pd.Timestamp] = None, group_by: str = None, - attach_tag: bool = False, ): (_, _, df, table) = df_with_table qdbpd.write_dataframe(df, qdbd_connection, table, write_through=True) table_name = table.get_name() - q = "SELECT {} ".format(columns) - - if attach_tag: - table.attach_tag("dask_tag") - q += "FROM find(tag='dask_tag')" - else: - q += 'FROM "{}"'.format(table_name) + q = 'SELECT {} FROM "{}"'.format(columns, table_name) if query_range: q += " IN RANGE({}, {})".format(query_range[0], query_range[1]) @@ -119,7 +112,7 @@ def test_dask_query_parallelized(df_with_table, qdbd_connection, qdbd_settings): # value of npartitions determines number of delayed tasks # delayed tasks can be executed in parallel - # query range is split by shard size + # currently tasks are created for each shard expected_number_of_partitions = math.ceil( (end - start).total_seconds() / shard_size.total_seconds() ) From 825dcd1c452d1e6f202738f9ac6f83b5705edd14 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 10:47:58 +0200 Subject: [PATCH 14/19] format --- tests/test_dask.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_dask.py b/tests/test_dask.py index 533ceb21..2ea96b1d 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -54,6 +54,7 @@ def _get_subrange( #### Dask integration tests + @conftest.override_cdtypes([np.dtype("float64")]) @pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) @pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) From 13b47ee1c63b882aa45a69b9ac7ab5d8c24ac803 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 21:58:34 +0200 Subject: [PATCH 15/19] move dask integration to separate project --- quasardb/dask/__init__.py | 258 +------------------------------------- setup.py | 2 +- tests/test_dask.py | 253 ------------------------------------- 3 files changed, 2 insertions(+), 511 deletions(-) delete mode 100644 tests/test_dask.py diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index 05c697e7..d8bc2fe7 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -1,257 +1 @@ -# pylint: disable=C0103,C0111,C0302,R0903 - -# Copyright (c) 2009-2025, quasardb SAS. All rights reserved. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of quasardb nor the names of its contributors may -# be used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY QUASARDB AND CONTRIBUTORS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY -# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# - -import logging -import quasardb -import datetime -import re - -logger = logging.getLogger("quasardb.dask") - - -class DaskRequired(ImportError): - """ - Exception raised when trying to use QuasarDB dask integration, but - required packages are not installed has not been installed. - """ - - pass - - -try: - import dask.dataframe as dd - from dask.delayed import delayed - import quasardb.pandas as qdbpd - import pandas as pd - -except ImportError as err: - logger.exception(err) - raise DaskRequired( - "The dask library is required to use QuasarDB dask integration." - ) from err - - -class DateParserRequired(ImportError): - pass - - -try: - import dateparser - -except ImportError as err: - logger.exception(err) - raise DaskRequired( - "Dateparser library is required to use QuasarDB dask integration." - ) from err - -general_select_pattern = re.compile(r"(?i)^\s*SELECT\b") -table_pattern = re.compile(r"(?i)\bFROM\s+([`\"\[]?\w+[`\"\]]?)") -range_pattern = re.compile(r"(?i)\bIN\s+RANGE\s*\(([^,]+),\s*([^,]+)\)") - - -def _read_dataframe( - query: str, meta: pd.DataFrame, conn_kwargs: dict, query_kwargs: dict -) -> qdbpd.DataFrame: - logger.debug('Querying QuasarDB with query: "%s"', query) - with quasardb.Cluster(**conn_kwargs) as conn: - df = qdbpd.query(conn, query, **query_kwargs) - - if len(df) == 0: - return meta - else: - return df - - -def _extract_table_name_from_query(query: str) -> str: - # XXX:igor for now this works for queries using one table - # tags and multiple tables are not supported yet - - logger.debug('Extracting table name from query: "%s"', query) - match = re.search(table_pattern, query) - if match: - table_name = match.group(1) - logger.debug('Extracted table name: "%s"', table_name) - return table_name - else: - raise ValueError("Could not extract table name from query. ") - - -def _extract_range_from_query(conn, query: str, table_name: str) -> tuple: - """ - Extracts the range from the query, parses it to datetime and returns. - If no range is found in the query, it queries the table for the first and last timestamp. - """ - logger.debug('Extracting query range from: "%s"', query) - match = re.search(range_pattern, query) - # first we check try to extract "in range (start, end)" from query - # if we can't do it we will query first() and last() from the table - query_range = tuple() - if match: - start_str = match.group(1) - end_str = match.group(2) - logger.debug("Extracted strings: (%s, %s)", start_str, end_str) - parser_settings = { - "PREFER_DAY_OF_MONTH": "first", - "PREFER_MONTH_OF_YEAR": "first", - } - start_date = dateparser.parse(start_str, settings=parser_settings) - end_date = dateparser.parse(end_str, settings=parser_settings) - query_range = (start_date, end_date) - logger.debug("Parsed datetime: %s", query_range) - else: - logger.debug( - "No range found in query, querying table for first and last timestamp" - ) - range_query = f"SELECT first($timestamp), last($timestamp) FROM {table_name}" - df = qdbpd.query(conn, range_query) - if not df.empty: - df.loc[0, "last($timestamp)"] += datetime.timedelta(microseconds=1) - query_range += tuple(df.iloc[0]) - logger.debug("Extracted range from table: %s", query_range) - - return query_range - - -def _create_subrange_query( - query: str, query_range: tuple[datetime.datetime, datetime.datetime] -) -> str: - """ - Adds range to base query. - IF range is found in the query, it will be replaced with the new range. - IF no range is found, it will be added after the "FROM {table}" clause. - """ - new_query = query - range_match = re.search(range_pattern, query) - start_str = query_range[0].strftime("%Y-%m-%dT%H:%M:%S.%fZ") - end_str = query_range[1].strftime("%Y-%m-%dT%H:%M:%S.%fZ") - if range_match: - if len(range_match.groups()) == 2: - new_query = re.sub( - range_pattern, - f"IN RANGE ({start_str}, {end_str})", - query, - ) - logger.debug("Created subquery: %s", new_query) - return new_query - - table_match = re.search(table_pattern, query) - new_query = re.sub( - table_pattern, - f"FROM {table_match.group(1)} IN RANGE ({start_str}, {end_str})", - query, - ) - logger.debug("Created subquery: %s", new_query) - return new_query - - -def _get_subqueries(conn, query: str, table_name: str) -> list[str]: - # this will be moved to c++ functions in the future - shard_size = conn.table(table_name.replace('"', "")).get_shard_size() - start, end = _extract_range_from_query(conn, query, table_name) - ranges_to_query = conn.split_query_range(start, end, shard_size) - - subqueries = [] - for rng in ranges_to_query: - subqueries.append(_create_subrange_query(query, rng)) - return subqueries - - -def _get_meta(conn, query: str, query_kwargs: dict) -> pd.DataFrame: - """ - Returns empty dataframe with the expected schema of the query result. - """ - np_res = conn.validate_query(query) - col_dtypes = {} - for id, column in enumerate(np_res): - col_dtypes[column[0]] = pd.Series(dtype=column[1].dtype) - - df = pd.DataFrame(col_dtypes) - if query_kwargs["index"]: - df.set_index(query_kwargs["index"], inplace=True) - return df - - -def query( - query: str, - uri: str, - *, - user_name: str = "", - user_private_key: str = "", - cluster_public_key: str = "", - user_security_file: str = "", - cluster_public_key_file: str = "", - timeout: datetime.timedelta = datetime.timedelta(seconds=60), - do_version_check: bool = False, - enable_encryption: bool = False, - client_max_parallelism: int = 0, - index=None, - blobs: bool = False, - numpy: bool = False, -): - if not re.match(general_select_pattern, query): - raise NotImplementedError( - "Only SELECT queries are supported. Please refer to the documentation for more information." - ) - - conn_kwargs = { - "uri": uri, - "user_name": user_name, - "user_private_key": user_private_key, - "cluster_public_key": cluster_public_key, - "user_security_file": user_security_file, - "cluster_public_key_file": cluster_public_key_file, - "timeout": timeout, - "do_version_check": do_version_check, - "enable_encryption": enable_encryption, - "client_max_parallelism": client_max_parallelism, - } - - query_kwargs = { - "index": index, - "blobs": blobs, - "numpy": numpy, - } - - table_name = _extract_table_name_from_query(query) - with quasardb.Cluster(**conn_kwargs) as conn: - meta = _get_meta(conn, query, query_kwargs) - subqueries = _get_subqueries(conn, query, table_name) - - if len(subqueries) == 0: - logging.warning("No subqueries, returning empty dataframe") - return meta - - parts = [] - for subquery in subqueries: - parts.append( - delayed(_read_dataframe)(subquery, meta, conn_kwargs, query_kwargs) - ) - logger.debug("Assembled %d subqueries", len(parts)) - - return dd.from_delayed(parts, meta=meta) +from qdb_dask_integration import * diff --git a/setup.py b/setup.py index 17b4e9a3..fa2ac372 100644 --- a/setup.py +++ b/setup.py @@ -206,7 +206,7 @@ def run(self): extras_require={ "pandas": ["pandas"], "test": ["pytest"], - "dask": ["dask[dataframe]", "dask[delayed]", "pandas", "dateparser"], + "dask": ["qdb-dask-integration"], }, packages=packages, package_data={package_name: package_modules}, diff --git a/tests/test_dask.py b/tests/test_dask.py deleted file mode 100644 index 2ea96b1d..00000000 --- a/tests/test_dask.py +++ /dev/null @@ -1,253 +0,0 @@ -import math -import pytest -import quasardb -import quasardb.pandas as qdbpd -import quasardb.dask as qdbdsk -import numpy.ma as ma -import logging -import numpy as np -import pandas as pd -import dask.dataframe as dd -from dask.distributed import LocalCluster, Client -import conftest -from test_pandas import _assert_df_equal - -logger = logging.getLogger("test-dask") - - -def _prepare_query_test( - df_with_table, - qdbd_connection, - columns: str = "*", - query_range: tuple[pd.Timestamp, pd.Timestamp] = None, - group_by: str = None, -): - (_, _, df, table) = df_with_table - - qdbpd.write_dataframe(df, qdbd_connection, table, write_through=True) - table_name = table.get_name() - q = 'SELECT {} FROM "{}"'.format(columns, table_name) - - if query_range: - q += " IN RANGE({}, {})".format(query_range[0], query_range[1]) - - if group_by: - q += " GROUP BY {}".format(group_by) - - return (df, table, q) - - -def _get_subrange( - df: pd.DataFrame, slice_size: int = 0.1 -) -> tuple[pd.Timestamp, pd.Timestamp]: - """ - Returns slice of the Dataframe index to be used in the query. - """ - query_range = () - if slice_size != 1: - start_str = df.index[0].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") - end_row = int((len(df) - 1) * slice_size) - end_str = df.index[end_row].to_pydatetime().strftime("%Y-%m-%dT%H:%M:%S.%f") - query_range = (start_str, end_str) - return query_range - - -#### Dask integration tests - - -@conftest.override_cdtypes([np.dtype("float64")]) -@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) -@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) -def test_dask_query_meta_set(df_with_table, qdbd_connection, qdbd_settings): - """ - tests that the columns are set correctly in the dask DataFrame - """ - _, _, query = _prepare_query_test(df_with_table, qdbd_connection) - df = qdbpd.query(qdbd_connection, query) - ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - - dask_meta = ddf._meta_nonempty - pandas_cols = df.columns - - assert dask_meta.columns.names == pandas_cols.names, "column names do not match" - - for col_name in pandas_cols: - # treat string[pyarrow] as object - if dask_meta[col_name].dtype == "string[pyarrow]": - dask_meta[col_name] = dask_meta[col_name].astype("object") - - assert ( - dask_meta[col_name].dtype == df[col_name].dtype - ), f"dtype of column {col_name} does not match" - - -@conftest.override_cdtypes([np.dtype("float64")]) -@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) -@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) -def test_dask_query_lazy_evaluation(df_with_table, qdbd_connection, qdbd_settings): - """ - tests that the function is lazy and does not return a Dataframe immediately. - """ - - _, _, query = _prepare_query_test(df_with_table, qdbd_connection) - - ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - - assert isinstance(ddf, dd.DataFrame) - result = ddf.compute() - assert isinstance(result, pd.DataFrame) - - -@conftest.override_cdtypes([np.dtype("float64")]) -@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) -@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -def test_dask_query_parallelized(df_with_table, qdbd_connection, qdbd_settings): - _, _, df, table = df_with_table - shard_size = table.get_shard_size() - start, end = df.index[0], df.index[-1] - - _, _, query = _prepare_query_test(df_with_table, qdbd_connection) - - ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - - # value of npartitions determines number of delayed tasks - # delayed tasks can be executed in parallel - # currently tasks are created for each shard - expected_number_of_partitions = math.ceil( - (end - start).total_seconds() / shard_size.total_seconds() - ) - assert ddf.npartitions == expected_number_of_partitions - - -@conftest.override_cdtypes([np.dtype("float64")]) -@pytest.mark.parametrize("row_count", [224], ids=["row_count=224"], indirect=True) -@pytest.mark.parametrize("sparsify", [100], ids=["sparsify=none"], indirect=True) -def test_dask_compute_on_local_cluster(df_with_table, qdbd_connection, qdbd_settings): - _, _, query = _prepare_query_test(df_with_table, qdbd_connection) - - with LocalCluster(n_workers=2) as cluster: - with Client(cluster): - ddf = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - res = ddf.compute() - res.head() - - -### Query tests, we care about results of dask query matching those of pandas -# when using default index, it has to be reset to match pandas DataFrame. -# we neeed to check that each query is split into multiple dask partitions -# -# index for a Dask DataFrame will not be monotonically increasing from 0. -# Instead, it will restart at 0 for each partition (e.g. index1 = [0, ..., 10], index2 = [0, ...]). -# This is due to the inability to statically know the full length of the index. -# https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.reset_index.html - - -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize( - "range_slice", - [1, 0.5, 0.25], - ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25"], -) -@pytest.mark.parametrize( - "query_options", - [{"index": None}, {"index": "$timestamp"}], - ids=["index=None", "index=$timestamp"], -) -def test_dask_df_select_star_equals_pandas_df( - df_with_table, qdbd_connection, qdbd_settings, query_options, range_slice -): - _, _, df, _ = df_with_table - query_range = _get_subrange(df, range_slice) - _, _, query = _prepare_query_test(df_with_table, qdbd_connection, "*", query_range) - - pandas_df = qdbpd.query(qdbd_connection, query, **query_options) - dask_df = qdbdsk.query( - query, qdbd_settings.get("uri").get("insecure"), **query_options - ) - - assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" - dask_df = dask_df.compute() - - if query_options.get("index") is None: - dask_df = dask_df.reset_index(drop=True) - - _assert_df_equal(pandas_df, dask_df) - - -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize( - "range_slice", - [1, 0.5, 0.25], - ids=["range_slice=1", "range_slice=0.5", "range_slice=0.25"], -) -@pytest.mark.parametrize( - "use_alias", [False, True], ids=["use_alias=False", "use_alias=True"] -) -def test_dask_df_select_columns_equals_pandas_df( - df_with_table, qdbd_connection, qdbd_settings, use_alias, range_slice -): - _, _, df, _ = df_with_table - - columns = ", ".join( - [f"{col} as {col}_alias" if use_alias else f"{col}" for col in df.columns] - ) - - query_range = _get_subrange(df, range_slice) - _, _, query = _prepare_query_test( - df_with_table, qdbd_connection, columns, query_range - ) - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - - assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" - dask_df = dask_df.compute() - - dask_df = dask_df.reset_index(drop=True) - - _assert_df_equal(pandas_df, dask_df) - - -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -@pytest.mark.parametrize("group_by", ["1h", "1d"]) -def test_dask_df_select_agg_group_by_time_equals_pandas_df( - df_with_table, qdbd_connection, qdbd_settings, group_by -): - _, _, df, _ = df_with_table - columns = ", ".join([f"count({col})" for col in df.columns]) - df, _, query = _prepare_query_test( - df_with_table, - qdbd_connection, - columns=f"$timestamp, {columns}", - group_by=group_by, - ) - - pandas_df = qdbpd.query(qdbd_connection, query) - dask_df = qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) - - assert dask_df.npartitions > 1, "Dask DataFrame should have multiple partitions" - dask_df = dask_df.compute() - - dask_df = dask_df.reset_index(drop=True) - - _assert_df_equal(pandas_df, dask_df) - - -@pytest.mark.parametrize( - "query", - [ - "INSERT INTO test ($timestamp, x) VALUES (now(), 2)", - "DROP TABLE test", - "DELETE FROM test", - "CREATE TABLE test (x INT64)", - "SHOW TABLE test", - "ALTER TABLE test ADD COLUMN y INT64", - "SHOW DISK USAGE ON test", - ], -) -def test_dask_query_exception_on_non_select_query(qdbd_settings, query): - """ - Tests that a non-select query raises an exception - """ - with pytest.raises(NotImplementedError): - qdbdsk.query(query, qdbd_settings.get("uri").get("insecure")) From 7fe2ebe561191959787b2d2cb410063b356ed0c1 Mon Sep 17 00:00:00 2001 From: Igor Date: Mon, 26 May 2025 22:34:47 +0200 Subject: [PATCH 16/19] integration test --- tests/test_dask.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 tests/test_dask.py diff --git a/tests/test_dask.py b/tests/test_dask.py new file mode 100644 index 00000000..a7d45905 --- /dev/null +++ b/tests/test_dask.py @@ -0,0 +1,26 @@ +import pytest +import quasardb.pandas as qdbpd +import quasardb.dask as qdbdsk +import logging + +logger = logging.getLogger("test-dask") + + +def _prepare_query_test( + df_with_table, + qdbd_connection, +): + (_, _, df, table) = df_with_table + + qdbpd.write_dataframe(df, qdbd_connection, table, write_through=True) + return (df, table) + + +@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) +def test_can_query_from_dask_module( + df_with_table, qdbd_connection, qdbd_settings +): + df, table = _prepare_query_test(df_with_table, qdbd_connection) + query = f"SELECT * FROM {table.get_name()}" + + qdbdsk.query(query, cluster_uri=qdbd_settings.get("uri").get("insecure")).compute() From 456c587e1f2e61b628b0a5253b2f46dc7d7c5d1f Mon Sep 17 00:00:00 2001 From: Igor Date: Tue, 27 May 2025 08:34:15 +0200 Subject: [PATCH 17/19] adjust tests --- dev-requirements.txt | 3 --- quasardb/dask/__init__.py | 13 ++++++++++++- scripts/teamcity/20.test.sh | 1 + tests/test_dask.py | 8 +++++--- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 7f63ecab..801c0f52 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -28,9 +28,6 @@ pytest pytest-runner pytest-benchmark -dateparser # needed for dask -dask[complete] # needed for dask - # Seems like numpy>2 requires this in combination with pytest, # but is never set in the requirements. hypothesis diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index d8bc2fe7..5d6fcbb0 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -1 +1,12 @@ -from qdb_dask_integration import * +class QdbDaskIntegrationRequired(ImportError): + """ + Exception raised when trying to use QuasarDB dask integration, but + qdb_dask_integration has not been installed. + """ + + pass + +try: + from qdb_dask_connector import * +except ImportError: + raise QdbDaskIntegrationRequired("QuasarDB dask integration is not installed. Please qdb-dask-connector.") diff --git a/scripts/teamcity/20.test.sh b/scripts/teamcity/20.test.sh index ea3512f8..77e0bd7d 100755 --- a/scripts/teamcity/20.test.sh +++ b/scripts/teamcity/20.test.sh @@ -156,6 +156,7 @@ export QDB_TESTS_ENABLED=ON ${VENV_PYTHON} -m build -w ${VENV_PYTHON} -m pip install --no-deps --force-reinstall dist/quasardb-*.whl +${VENV_PYTHON} -m pip install --no-deps --force-reinstall qdb/qdb_dask_connector*.whl echo "Invoking pytest" diff --git a/tests/test_dask.py b/tests/test_dask.py index a7d45905..323354e3 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -20,7 +20,9 @@ def _prepare_query_test( def test_can_query_from_dask_module( df_with_table, qdbd_connection, qdbd_settings ): - df, table = _prepare_query_test(df_with_table, qdbd_connection) - query = f"SELECT * FROM {table.get_name()}" + _, table = _prepare_query_test(df_with_table, qdbd_connection) + query = f"SELECT * FROM \"{table.get_name()}\"" - qdbdsk.query(query, cluster_uri=qdbd_settings.get("uri").get("insecure")).compute() + ddf = qdbdsk.query(query, cluster_uri=qdbd_settings.get("uri").get("insecure")) + assert ddf.npartitions > 1 # we want to ensure that the query is distributed for this test + ddf.compute() \ No newline at end of file From 752d83dd31e60e3162be144620ab2c2ae6ad69c3 Mon Sep 17 00:00:00 2001 From: Igor Date: Tue, 27 May 2025 09:47:49 +0200 Subject: [PATCH 18/19] lint --- quasardb/dask/__init__.py | 7 +++++-- tests/test_dask.py | 12 ++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py index 5d6fcbb0..8a27f9be 100644 --- a/quasardb/dask/__init__.py +++ b/quasardb/dask/__init__.py @@ -1,12 +1,15 @@ class QdbDaskIntegrationRequired(ImportError): """ Exception raised when trying to use QuasarDB dask integration, but - qdb_dask_integration has not been installed. + qdb-dask-connector is not installed. """ pass + try: from qdb_dask_connector import * except ImportError: - raise QdbDaskIntegrationRequired("QuasarDB dask integration is not installed. Please qdb-dask-connector.") + raise QdbDaskIntegrationRequired( + "QuasarDB dask integration is not installed. Please qdb-dask-connector." + ) diff --git a/tests/test_dask.py b/tests/test_dask.py index 323354e3..0a322ef6 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -17,12 +17,12 @@ def _prepare_query_test( @pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -def test_can_query_from_dask_module( - df_with_table, qdbd_connection, qdbd_settings -): +def test_can_query_from_dask_module(df_with_table, qdbd_connection, qdbd_settings): _, table = _prepare_query_test(df_with_table, qdbd_connection) - query = f"SELECT * FROM \"{table.get_name()}\"" + query = f'SELECT * FROM "{table.get_name()}"' ddf = qdbdsk.query(query, cluster_uri=qdbd_settings.get("uri").get("insecure")) - assert ddf.npartitions > 1 # we want to ensure that the query is distributed for this test - ddf.compute() \ No newline at end of file + assert ( + ddf.npartitions > 1 + ) # we want to ensure that the query is distributed for this test + ddf.compute() From 179cb19db1f28ee3931c4c6d6dab64e055f46fdc Mon Sep 17 00:00:00 2001 From: Igor Date: Tue, 27 May 2025 09:58:12 +0200 Subject: [PATCH 19/19] remove dask --- quasardb/dask/__init__.py | 15 --------------- scripts/teamcity/20.test.sh | 1 - setup.py | 2 -- tests/test_dask.py | 28 ---------------------------- 4 files changed, 46 deletions(-) delete mode 100644 quasardb/dask/__init__.py delete mode 100644 tests/test_dask.py diff --git a/quasardb/dask/__init__.py b/quasardb/dask/__init__.py deleted file mode 100644 index 8a27f9be..00000000 --- a/quasardb/dask/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -class QdbDaskIntegrationRequired(ImportError): - """ - Exception raised when trying to use QuasarDB dask integration, but - qdb-dask-connector is not installed. - """ - - pass - - -try: - from qdb_dask_connector import * -except ImportError: - raise QdbDaskIntegrationRequired( - "QuasarDB dask integration is not installed. Please qdb-dask-connector." - ) diff --git a/scripts/teamcity/20.test.sh b/scripts/teamcity/20.test.sh index 77e0bd7d..ea3512f8 100755 --- a/scripts/teamcity/20.test.sh +++ b/scripts/teamcity/20.test.sh @@ -156,7 +156,6 @@ export QDB_TESTS_ENABLED=ON ${VENV_PYTHON} -m build -w ${VENV_PYTHON} -m pip install --no-deps --force-reinstall dist/quasardb-*.whl -${VENV_PYTHON} -m pip install --no-deps --force-reinstall qdb/qdb_dask_connector*.whl echo "Invoking pytest" diff --git a/setup.py b/setup.py index fa2ac372..62054071 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,6 @@ package_name, "quasardb.pandas", "quasardb.numpy", - "quasardb.dask", "quasardb.extensions", ] @@ -206,7 +205,6 @@ def run(self): extras_require={ "pandas": ["pandas"], "test": ["pytest"], - "dask": ["qdb-dask-integration"], }, packages=packages, package_data={package_name: package_modules}, diff --git a/tests/test_dask.py b/tests/test_dask.py deleted file mode 100644 index 0a322ef6..00000000 --- a/tests/test_dask.py +++ /dev/null @@ -1,28 +0,0 @@ -import pytest -import quasardb.pandas as qdbpd -import quasardb.dask as qdbdsk -import logging - -logger = logging.getLogger("test-dask") - - -def _prepare_query_test( - df_with_table, - qdbd_connection, -): - (_, _, df, table) = df_with_table - - qdbpd.write_dataframe(df, qdbd_connection, table, write_through=True) - return (df, table) - - -@pytest.mark.parametrize("frequency", ["h"], ids=["frequency=H"], indirect=True) -def test_can_query_from_dask_module(df_with_table, qdbd_connection, qdbd_settings): - _, table = _prepare_query_test(df_with_table, qdbd_connection) - query = f'SELECT * FROM "{table.get_name()}"' - - ddf = qdbdsk.query(query, cluster_uri=qdbd_settings.get("uri").get("insecure")) - assert ( - ddf.npartitions > 1 - ) # we want to ensure that the query is distributed for this test - ddf.compute()