From 9e985835f2f3f1be75d94750469a4fe7d835c96a Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Mon, 9 Jun 2025 16:43:33 -0500 Subject: [PATCH 1/8] Add initial accounting endpoints POST/GET --- cwms/projects/water_supply/accounting.py | 150 +++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 cwms/projects/water_supply/accounting.py diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py new file mode 100644 index 00000000..89adc693 --- /dev/null +++ b/cwms/projects/water_supply/accounting.py @@ -0,0 +1,150 @@ +# Copyright (c) 2024 +# United States Army Corps of Engineers - Hydrologic Engineering Center (USACE/HEC) +# All Rights Reserved. USACE PROPRIETARY/CONFIDENTIAL. +# Source may not be released without written approval from HEC +from typing import Optional + +import cwms.api as api +from cwms.cwms_types import JSON, Data + + +def get_pump_accounting( + office_id: str, + project_id: str, + water_user: str, + contract_name: str, + start: str, + end: str, + timezone: str = "UTC", + unit: str = "cms", + start_time_inclusive: bool = True, + end_time_inclusive: bool = True, + ascending: bool = True, + row_limit: int = 0, +) -> Data: + """ + Retrieves pump accounting entries associated with a water supply contract. + + Parameters + ---------- + office_id : str + The office ID the pump accounting is associated with. (Path) + project_id : str + The project ID the pump accounting is associated with. (Path) + water_user : str + The water user the pump accounting is associated with. (Path) + contract_name : str + The name of the contract associated with the pump accounting. (Path) + start : str + The start time of the time window for pump accounting entries to retrieve. + Format: ISO 8601 extended, with optional offset and timezone. (Query) + end : str + The end time of the time window for pump accounting entries to retrieve. + Format: ISO 8601 extended, with optional offset and timezone. (Query) + timezone : str, optional + The default timezone to use if `start` or `end` lacks offset/timezone info. + Defaults to "UTC". (Query) + unit : str, optional + Unit of flow rate for accounting entries. Defaults to "cms". (Query) + start_time_inclusive : bool, optional + Whether the start time is inclusive. Defaults to True. (Query) + end_time_inclusive : bool, optional + Whether the end time is inclusive. Defaults to True. (Query) + ascending : bool, optional + Whether entries should be returned in ascending order. Defaults to True. (Query) + row_limit : int, optional + Maximum number of rows to return. Defaults to 0, meaning no limit. (Query) + + Returns + ------- + Data + The JSON response from CWMS Data API wrapped in a Data object. + + Raises + ------ + ValueError + If any required path parameters are None. + ClientError + If a 400-level error occurs. + NoDataFoundError + If a 404-level error occurs. + ServerError + If a 500-level error occurs. + """ + if not all([office_id, project_id, water_user, contract_name, start, end]): + raise ValueError("All required parameters must be provided.") + + endpoint = f"projects/{office_id}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting" + + params: dict[str, str | int] = { + "start": start, + "end": end, + "timezone": timezone, + "unit": unit, + "start-time-inclusive": str(start_time_inclusive).lower(), + "end-time-inclusive": str(end_time_inclusive).lower(), + "ascending": str(ascending).lower(), + "row-limit": row_limit, + } + + response = api.get(endpoint, params, api_version=1) + return Data(response) + + +def store_pump_accounting( + office: str, + project_id: str, + water_user: str, + contract_name: str, + data: JSON, +) -> None: + """ + Creates a new pump accounting entry associated with a water supply contract. + + Parameters + ---------- + office : str + The office ID the accounting is associated with. (Path) + project_id : str + The project ID the accounting is associated with. (Path) + water_user : str + The water user the accounting is associated with. (Path) + contract_name : str + The name of the contract associated with the accounting. (Path) + data : dict + A dictionary representing the JSON data to be stored. This should match the + WaterSupplyAccounting structure as defined by the API. + fail_if_exists : bool, optional + Whether the request should fail if the accounting entry already exists. + Defaults to True. (Query) + + Returns + ------- + None + + Raises + ------ + ValueError + If any required argument is missing. + ClientError + If a 400 range error code response is returned from the server. + NoDataFoundError + If a 404 range error code response is returned from the server. + ServerError + If a 500 range error code response is returned from the server. + """ + if not all([office, project_id, water_user, contract_name]): + raise ValueError( + "Office, project_id, water_user, and contract_name must be provided." + ) + if not data: + raise ValueError("Data must be provided and cannot be empty.") + + endpoint = f"projects/{office}/{project_id}/water-user/{water_user}/contracts/{contract_name}/accounting" + params = { + "office": office, + "project-id": project_id, + "water-user": water_user, + "contract-name": contract_name, + } + api.post(endpoint, data, params) From 4bd46c5a70e627f9c24b25c18083c7ac37e320c6 Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Mon, 9 Jun 2025 16:44:54 -0500 Subject: [PATCH 2/8] No fail if exists param --- cwms/projects/water_supply/accounting.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py index 89adc693..b73bb99c 100644 --- a/cwms/projects/water_supply/accounting.py +++ b/cwms/projects/water_supply/accounting.py @@ -114,9 +114,6 @@ def store_pump_accounting( data : dict A dictionary representing the JSON data to be stored. This should match the WaterSupplyAccounting structure as defined by the API. - fail_if_exists : bool, optional - Whether the request should fail if the accounting entry already exists. - Defaults to True. (Query) Returns ------- From 2be0a5f5aa3828d1dfc8d076ba41846b7c03551d Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Tue, 17 Jun 2025 16:26:45 -0500 Subject: [PATCH 3/8] Add cwms import --- cwms/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cwms/__init__.py b/cwms/__init__.py index 0411b68b..77800045 100644 --- a/cwms/__init__.py +++ b/cwms/__init__.py @@ -17,6 +17,7 @@ from cwms.projects.project_lock_rights import * from cwms.projects.project_locks import * from cwms.projects.projects import * +from cwms.projects.water_supply.accounting import * from cwms.ratings.ratings import * from cwms.ratings.ratings_spec import * from cwms.ratings.ratings_template import * From 9004895fa52d65091192d19690100fe92b58d610 Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Tue, 17 Jun 2025 16:27:21 -0500 Subject: [PATCH 4/8] Remove unused imports --- cwms/projects/water_supply/accounting.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cwms/projects/water_supply/accounting.py b/cwms/projects/water_supply/accounting.py index b73bb99c..89fd4668 100644 --- a/cwms/projects/water_supply/accounting.py +++ b/cwms/projects/water_supply/accounting.py @@ -2,8 +2,6 @@ # United States Army Corps of Engineers - Hydrologic Engineering Center (USACE/HEC) # All Rights Reserved. USACE PROPRIETARY/CONFIDENTIAL. # Source may not be released without written approval from HEC -from typing import Optional - import cwms.api as api from cwms.cwms_types import JSON, Data From be5465abbcb01c82f33102242bab5c8e05e7bfc5 Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Tue, 17 Jun 2025 16:28:16 -0500 Subject: [PATCH 5/8] Create pump accounting tests --- .../projects/water_supply/accounting_test.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 tests/projects/water_supply/accounting_test.py diff --git a/tests/projects/water_supply/accounting_test.py b/tests/projects/water_supply/accounting_test.py new file mode 100644 index 00000000..424aaa5f --- /dev/null +++ b/tests/projects/water_supply/accounting_test.py @@ -0,0 +1,56 @@ +import pytest + +import cwms.api +from cwms.cwms_types import Data +from cwms.projects.water_supply.accounting import ( + get_pump_accounting, + store_pump_accounting, +) + + +@pytest.fixture(autouse=True) +def init_session(): + cwms.api.init_session(api_root="https://mockwebserver.cwms.gov") + + +def test_get_pump_accounting(requests_mock): + endpoint = ( + "https://mockwebserver.cwms.gov/" + "projects/SWT/KEYS/water-user/TEST_USER/contracts/TEST_CONTRACT/accounting" + ) + expected_json = {"entries": [{"timestamp": "2024-01-01T00:00:00Z", "value": 1.23}]} + + requests_mock.get(endpoint, json=expected_json) + + data = get_pump_accounting( + office_id="SWT", + project_id="KEYS", + water_user="TEST_USER", + contract_name="TEST_CONTRACT", + start="2024-01-01T00:00:00Z", + end="2024-01-02T00:00:00Z", + ) + + assert isinstance(data, Data) + assert data.json == expected_json + + +def test_store_pump_accounting(requests_mock): + endpoint = ( + "https://mockwebserver.cwms.gov/" + "projects/SWT/KEYS/water-user/TEST_USER/contracts/TEST_CONTRACT/accounting" + ) + mock_data = {"entries": [{"timestamp": "2024-01-01T00:00:00Z", "value": 1.23}]} + + requests_mock.post(endpoint, status_code=200) + + store_pump_accounting( + office="SWT", + project_id="KEYS", + water_user="TEST_USER", + contract_name="TEST_CONTRACT", + data=mock_data, + ) + + assert requests_mock.called + assert requests_mock.call_count == 1 From 542138e2f181305e12843c5e50e5a734a044431a Mon Sep 17 00:00:00 2001 From: Eric Novotny Date: Mon, 16 Jun 2025 09:45:16 -0500 Subject: [PATCH 6/8] add multi timeseries store --- cwms/timeseries/timeseries.py | 59 +++++++++++++++++++++++++++++++---- pyproject.toml | 3 +- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 82f8b52c..1d6911f1 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -58,13 +58,12 @@ def get_multi_timeseries_df( """ def get_ts_ids(ts_id: str) -> Any: - - if ":" in ts_id: - ts_id, version_date = ts_id.split(":", 1) - version_date_dt = pd.to_datetime(version_date) - else: - version_date_dt = None try: + if ":" in ts_id: + ts_id, version_date = ts_id.split(":", 1) + version_date_dt = pd.to_datetime(version_date) + else: + version_date_dt = None data = get_timeseries( ts_id=ts_id, office_id=office_id, @@ -260,6 +259,54 @@ def timeseries_df_to_json( return ts_dict +def store_multi_timeseries_df( + ts_data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30 +) -> None: + + def store_ts_ids( + data: pd.DataFrame, + ts_id: str, + office_id: str, + version_date: Optional[datetime] = None, + ) -> None: + units = data["units"].iloc[0] + data_json = timeseries_df_to_json( + data=data, + ts_id=ts_id, + units=units, + office_id=office_id, + version_date=version_date, + ) + store_timeseries(data=data_json) + return None + + unique_tsids = ( + ts_data["ts_id"].astype(str) + ":" + ts_data["version_date"].astype(str) + ).unique() + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + for ts_id_all in unique_tsids: + try: + ts_id, version_date = ts_id_all.split(":", 1) + if version_date != "NaT": + version_date_dt = pd.to_datetime(version_date) + data = ts_data[ + (ts_data["ts_id"] == ts_id) + & (ts_data["version_date"] == version_date_dt) + ] + else: + version_date_dt = None + data = ts_data[ + (ts_data["ts_id"] == ts_id) & ts_data["version_date"].isna() + ] + if not data.empty: + executor.submit( + store_ts_ids, data, ts_id, office_id, version_date_dt + ) + except Exception as e: + print(f"Error processing {ts_id}: {e}") + + def store_timeseries( data: JSON, create_as_ltrs: Optional[bool] = False, diff --git a/pyproject.toml b/pyproject.toml index 93b5b813..798ed736 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,7 @@ [tool.poetry] name = "cwms-python" repository = "https://github.com/HydrologicEngineeringCenter/cwms-python" - -version = "0.6.5" +version = "0.7.2" packages = [ From 3d8e1a88fadf9843779ffb9819755afed98cabb8 Mon Sep 17 00:00:00 2001 From: Eric Novotny Date: Tue, 17 Jun 2025 12:10:52 -0700 Subject: [PATCH 7/8] fix to save mutitimeseries call --- cwms/cwms_types.py | 4 +- cwms/timeseries/timeseries.py | 73 +++++++++++++++++++---------------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cwms/cwms_types.py b/cwms/cwms_types.py index 6eb18e62..701edfb4 100644 --- a/cwms/cwms_types.py +++ b/cwms/cwms_types.py @@ -62,8 +62,8 @@ def get_df_data(data: JSON, selector: str) -> JSON: def rating_type(data: JSON) -> DataFrame: # grab the correct point values for a rating table df = DataFrame(data["point"]) if data["point"] else DataFrame() - df = df.apply(to_numeric) - return df + df_numeric = df.apply(to_numeric, axis=0, result_type="expand") + return DataFrame(df_numeric) def timeseries_type(orig_json: JSON, value_json: JSON) -> DataFrame: # if timeseries values are present then grab the values and put into diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 1d6911f1..4d9a6eb0 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -247,64 +247,69 @@ def timeseries_df_to_json( df = df.reindex(columns=["date-time", "value", "quality-code"]) if df.isnull().values.any(): raise ValueError("Null/NaN data must be removed from the dataframe") - + if version_date: + version_date_iso = version_date.isoformat() + else: + version_date_iso = None ts_dict = { "name": ts_id, "office-id": office_id, "units": units, "values": df.values.tolist(), - "version-date": version_date, + "version-date": version_date_iso, } return ts_dict def store_multi_timeseries_df( - ts_data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30 + data: pd.DataFrame, office_id: str, max_workers: Optional[int] = 30 ) -> None: - def store_ts_ids( data: pd.DataFrame, ts_id: str, office_id: str, version_date: Optional[datetime] = None, ) -> None: - units = data["units"].iloc[0] - data_json = timeseries_df_to_json( - data=data, - ts_id=ts_id, - units=units, - office_id=office_id, - version_date=version_date, - ) - store_timeseries(data=data_json) + try: + units = data["units"].iloc[0] + data_json = timeseries_df_to_json( + data=data, + ts_id=ts_id, + units=units, + office_id=office_id, + version_date=version_date, + ) + store_timeseries(data=data_json) + except Exception as e: + print(f"Error processing {ts_id}: {e}") return None + ts_data_all = data.copy() + if "version_date" not in ts_data_all.columns: + ts_data_all = ts_data_all.assign(version_date=pd.to_datetime(pd.Series([]))) unique_tsids = ( - ts_data["ts_id"].astype(str) + ":" + ts_data["version_date"].astype(str) + ts_data_all["ts_id"].astype(str) + ":" + ts_data_all["version_date"].astype(str) ).unique() with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - for ts_id_all in unique_tsids: - try: - ts_id, version_date = ts_id_all.split(":", 1) - if version_date != "NaT": - version_date_dt = pd.to_datetime(version_date) - data = ts_data[ - (ts_data["ts_id"] == ts_id) - & (ts_data["version_date"] == version_date_dt) - ] - else: - version_date_dt = None - data = ts_data[ - (ts_data["ts_id"] == ts_id) & ts_data["version_date"].isna() - ] - if not data.empty: - executor.submit( - store_ts_ids, data, ts_id, office_id, version_date_dt - ) - except Exception as e: - print(f"Error processing {ts_id}: {e}") + for unique_tsid in unique_tsids: + ts_id, version_date = unique_tsid.split(":", 1) + if version_date != "NaT": + version_date_dt = pd.to_datetime(version_date) + ts_data = ts_data_all[ + (ts_data_all["ts_id"] == ts_id) + & (ts_data_all["version_date"] == version_date_dt) + ] + else: + version_date_dt = None + ts_data = ts_data_all[ + (ts_data_all["ts_id"] == ts_id) & ts_data_all["version_date"].isna() + ] + if not data.empty: + executor.submit( + store_ts_ids, ts_data, ts_id, office_id, version_date_dt + ) def store_timeseries( From 50da64331b1783d25ba9be9a7f9e353a7ae40d03 Mon Sep 17 00:00:00 2001 From: Charles Graham SWT Date: Tue, 17 Jun 2025 16:34:22 -0500 Subject: [PATCH 8/8] Bump to 0.7.3 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 798ed736..c8927b94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cwms-python" repository = "https://github.com/HydrologicEngineeringCenter/cwms-python" -version = "0.7.2" +version = "0.7.3" packages = [