diff --git a/docs/api/index.md b/docs/api/index.md index 66d8f2a..9a2180f 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -86,6 +86,25 @@ client = DSISClient(cfg) data = client.get("OW5000", "") ``` +## Request Timeout + +All request methods (`execute_query()`, `get()`, `get_bulk_data()`, `get_bulk_data_stream()`) accept an optional `timeout` parameter: + +```python +# Single value: both connect and read timeout (seconds) +items = list(client.execute_query(query, timeout=300)) + +# Tuple: (connect_timeout, read_timeout) +items = list(client.execute_query(query, timeout=(5, 300))) + +# Also on get() +data = client.get("OW5000", "5000107", schema="Well", timeout=60) +``` + +- `None` (default): no timeout +- `float`: both connect and read timeout +- `(float, float)`: separate connect and read timeouts + ## Error Handling Hint Treat non-200 responses as exceptions; inspect message for status cues (401/403/404). Refresh tokens on auth failures. @@ -96,13 +115,14 @@ Headers assembled internally include both tokens + subscription key; pass only e ## Binary Data Methods -### `get_bulk_data(query, *, accept="application/json")` +### `get_bulk_data(query, *, accept="application/json", timeout=None)` Fetch binary bulk data (protobuf) for an entity. Loads entire response into memory. **Parameters:** - `query`: QueryBuilder instance configured with `.schema()` and `.entity()` calls - `accept`: Accept header value (default: `"application/json"`). Use `"application/octet-stream"` for raw binary endpoints (e.g., SurfaceGrid/$value) +- `timeout`: Request timeout in seconds. `float` for both connect/read, `(float, float)` tuple for separate connect/read timeouts, or `None` for no timeout (default) **Returns:** `Optional[bytes]` - Binary protobuf data or None if no data @@ -124,7 +144,7 @@ bulk_query = query.entity(grids[0]["native_uid"], data_field="$value") binary_data = client.get_bulk_data(bulk_query, accept="application/octet-stream") ``` -### `get_bulk_data_stream(query, *, chunk_size=10*1024*1024, accept="application/json")` +### `get_bulk_data_stream(query, *, chunk_size=10*1024*1024, accept="application/json", timeout=None)` Stream binary bulk data in chunks for memory-efficient processing. @@ -132,6 +152,7 @@ Stream binary bulk data in chunks for memory-efficient processing. - `query`: QueryBuilder instance configured with `.schema()` and `.entity()` calls - `chunk_size`: Size of chunks to yield (default: 10MB, DSIS recommended) - `accept`: Accept header value (default: `"application/json"`) +- `timeout`: Request timeout in seconds. `float` for both connect/read, `(float, float)` tuple for separate connect/read timeouts, or `None` for no timeout (default) **Yields:** Binary data chunks as bytes diff --git a/docs/guides/query-builder.md b/docs/guides/query-builder.md index 96da121..f5ba424 100644 --- a/docs/guides/query-builder.md +++ b/docs/guides/query-builder.md @@ -265,6 +265,35 @@ print(f"First two pages: {len(two_pages_items)} wells") - `1`: You only need a sample, or want to implement custom pagination - `N>1`: You want to process data in page-sized chunks +## Request Timeout + +You can set an optional `timeout` parameter on `execute_query()`, `get()`, `get_bulk_data()`, and `get_bulk_data_stream()` to control how long each HTTP request waits before raising an error. By default, no timeout is applied. + +```python +# Single timeout value (seconds) — applies to both connect and read +for item in client.execute_query(query, timeout=300): + process(item) + +# Tuple timeout — (connect_timeout, read_timeout) in seconds +for item in client.execute_query(query, timeout=(5, 300)): + process(item) + +# Also works with get(), get_bulk_data(), and get_bulk_data_stream() +data = client.get("OW5000", "5000107", "123", "SNORRE", schema="Well", timeout=60) +binary = client.get_bulk_data(bulk_query, timeout=600) +``` + +**timeout Parameter:** + +- `timeout=None` (default): No timeout — wait indefinitely +- `timeout=300`: Both connect and read timeout set to 300 seconds +- `timeout=(5, 300)`: Connect timeout of 5 seconds, read timeout of 300 seconds + +The timeout applies to **each individual HTTP request**, including pagination requests. If a query fetches multiple pages, each page request uses the same timeout. + +!!! tip + For large paginated queries, use a generous read timeout (e.g., `timeout=(5, 300)`) to allow time for the server to process each page while still failing fast on connection issues. + ## Execution Patterns ### ⚠️ Critical: Schema Requirement for `cast=True` @@ -498,6 +527,7 @@ print(f"First two pages: {len(two_pages_wells)} wells") 4. **Reuse QueryBuilder**: Use `.reset()` to clear and rebuild queries instead of creating new instances 5. **Enable auto-casting**: Use `cast=True` with model classes for type-safe results 6. **Test connection first**: Call `client.test_connection()` when setting up to see if credentials are correct +7. **Set timeouts for production**: Use `timeout` to prevent requests from hanging indefinitely (e.g., `timeout=300` for 5 minutes) ## See Also diff --git a/docs/guides/working-with-binary-data.md b/docs/guides/working-with-binary-data.md index 82c6d8f..2b4b07a 100644 --- a/docs/guides/working-with-binary-data.md +++ b/docs/guides/working-with-binary-data.md @@ -245,6 +245,23 @@ for i, el in enumerate(lgc.elements[:5]): # Show first 5 ## Important Notes +### Request Timeout + +Both `get_bulk_data()` and `get_bulk_data_stream()` accept an optional `timeout` parameter to control how long each HTTP request waits: + +```python +# Set a 10-minute timeout for large binary downloads +binary_data = client.get_bulk_data(bulk_query, timeout=600) + +# Separate connect and read timeouts +for chunk in client.get_bulk_data_stream(bulk_query, timeout=(5, 600)): + chunks.append(chunk) +``` + +- `timeout=None` (default): No timeout — wait indefinitely +- `timeout=600`: Both connect and read timeout set to 600 seconds +- `timeout=(5, 600)`: Connect timeout of 5s, read timeout of 600s + ### Memory Management - **Small data (< 100MB)**: Use `get_bulk_data()` - simpler, loads everything at once diff --git a/src/dsis_client/api/client/_base.py b/src/dsis_client/api/client/_base.py index 178ded5..f64bfb8 100644 --- a/src/dsis_client/api/client/_base.py +++ b/src/dsis_client/api/client/_base.py @@ -5,7 +5,7 @@ access without runtime overhead. """ -from typing import TYPE_CHECKING, Any, Dict, Generator, Optional +from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Union if TYPE_CHECKING: from ..config import DSISConfig @@ -18,7 +18,10 @@ class _RequestBase: config: "DSISConfig" def _request( - self, endpoint: str, params: Optional[Dict[str, Any]] = None + self, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Dict[str, Any]: ... @@ -28,7 +31,11 @@ class _PaginationBase(_RequestBase): if TYPE_CHECKING: def _yield_nextlink_pages( - self, response: Dict[str, Any], endpoint: str, max_pages: int = -1 + self, + response: Dict[str, Any], + endpoint: str, + max_pages: int = -1, + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Generator[Dict[str, Any], None, None]: ... def _extract_nextlink_from_text(self, response_text: str) -> Optional[str]: ... @@ -45,6 +52,7 @@ def _request_binary( endpoint: str, params: Optional[Dict[str, Any]] = None, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Optional[bytes]: ... def _request_binary_stream( @@ -53,4 +61,5 @@ def _request_binary_stream( params: Optional[Dict[str, Any]] = None, chunk_size: int = 10 * 1024 * 1024, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Generator[bytes, None, None]: ... diff --git a/src/dsis_client/api/client/_bulk_data.py b/src/dsis_client/api/client/_bulk_data.py index cfeaf47..fea9552 100644 --- a/src/dsis_client/api/client/_bulk_data.py +++ b/src/dsis_client/api/client/_bulk_data.py @@ -4,7 +4,7 @@ """ import logging -from typing import TYPE_CHECKING, Generator, Optional +from typing import TYPE_CHECKING, Generator, Optional, Union from ._base import _BinaryRequestBase @@ -26,6 +26,7 @@ def get_bulk_data( query: "QueryBuilder", *, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Optional[bytes]: """Fetch binary bulk data (protobuf) for a specific entity. @@ -43,6 +44,9 @@ def get_bulk_data( accept: Accept header value for the HTTP request (default: ``"application/json"``). Use ``"application/octet-stream"`` for endpoints that serve raw binary data (e.g., SurfaceGrid/$value). + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Returns: Binary protobuf data as bytes, or None if the entity has no bulk data @@ -80,7 +84,7 @@ def get_bulk_data( endpoint = query.build_endpoint() logger.info(f"Fetching bulk data from: {endpoint}") - return self._request_binary(endpoint, accept=accept) + return self._request_binary(endpoint, accept=accept, timeout=timeout) def get_bulk_data_stream( self, @@ -88,6 +92,7 @@ def get_bulk_data_stream( *, chunk_size: int = 10 * 1024 * 1024, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Generator[bytes, None, None]: """Stream binary bulk data (protobuf) in chunks for memory-efficient processing. @@ -106,6 +111,9 @@ def get_bulk_data_stream( accept: Accept header value for the HTTP request (default: ``"application/json"``). Use ``"application/octet-stream"`` for endpoints that serve raw binary data (e.g., SurfaceGrid/$value). + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Yields: Binary data chunks as bytes. Returns immediately if no bulk data (404). @@ -140,5 +148,5 @@ def get_bulk_data_stream( endpoint = query.build_endpoint() logger.info(f"Streaming bulk data from: {endpoint} (chunk_size={chunk_size})") yield from self._request_binary_stream( - endpoint, chunk_size=chunk_size, accept=accept + endpoint, chunk_size=chunk_size, accept=accept, timeout=timeout ) diff --git a/src/dsis_client/api/client/_http.py b/src/dsis_client/api/client/_http.py index 6561b10..25c6562 100644 --- a/src/dsis_client/api/client/_http.py +++ b/src/dsis_client/api/client/_http.py @@ -5,7 +5,7 @@ import json import logging -from typing import TYPE_CHECKING, Any, Dict, Generator, Optional +from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Union from urllib.parse import urljoin from ..exceptions import DSISAPIError, DSISJSONParseError @@ -40,6 +40,7 @@ def _make_request_with_retry( extra_headers: Optional[Dict[str, str]] = None, stream: bool = False, request_type: str = "standard", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> "requests.Response": """Make an HTTP GET request with automatic token refresh retry. @@ -52,6 +53,9 @@ def _make_request_with_retry( extra_headers: Additional headers to merge with auth headers stream: Whether to stream the response request_type: Description for logging (e.g., "binary", "streaming") + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Returns: The HTTP response object (after potential retry) @@ -60,7 +64,9 @@ def _make_request_with_retry( if extra_headers: headers.update(extra_headers) - response = self._session.get(url, headers=headers, params=params, stream=stream) + response = self._session.get( + url, headers=headers, params=params, stream=stream, timeout=timeout + ) if response.status_code in _RETRY_STATUS_CODES: logger.warning( @@ -74,13 +80,16 @@ def _make_request_with_retry( if extra_headers: headers.update(extra_headers) response = self._session.get( - url, headers=headers, params=params, stream=stream + url, headers=headers, params=params, stream=stream, timeout=timeout ) return response def _request( - self, endpoint: str, params: Optional[Dict[str, Any]] = None + self, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Dict[str, Any]: """Make an authenticated GET request to the DSIS API. @@ -91,6 +100,9 @@ def _request( Args: endpoint: API endpoint path params: Query parameters + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Returns: Parsed JSON response as dictionary @@ -100,7 +112,7 @@ def _request( """ url = urljoin(f"{self.config.data_endpoint}/", endpoint) logger.info(f"Making request to {url}") - response = self._make_request_with_retry(url, params) + response = self._make_request_with_retry(url, params, timeout=timeout) if response.status_code != 200: error_msg = ( @@ -135,6 +147,7 @@ def _request_binary( endpoint: str, params: Optional[Dict[str, Any]] = None, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Optional[bytes]: """Make an authenticated GET request for binary data. @@ -145,6 +158,9 @@ def _request_binary( endpoint: API endpoint path params: Query parameters accept: Accept header value (default: "application/json") + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Returns: Binary response content, or None if the entity has no bulk data (404) @@ -159,6 +175,7 @@ def _request_binary( params, extra_headers={"Accept": accept}, request_type="binary", + timeout=timeout, ) if response.status_code == 404: @@ -181,6 +198,7 @@ def _request_binary_stream( params: Optional[Dict[str, Any]] = None, chunk_size: int = 10 * 1024 * 1024, accept: str = "application/json", + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> Generator[bytes, None, None]: """Stream binary data in chunks to avoid loading large datasets into memory. @@ -192,6 +210,9 @@ def _request_binary_stream( params: Query parameters chunk_size: Size of chunks to yield (default: 10MB, recommended by DSIS) accept: Accept header value (default: "application/json") + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Yields: Binary data chunks as bytes @@ -208,6 +229,7 @@ def _request_binary_stream( extra_headers={"Accept": accept}, stream=True, request_type="streaming", + timeout=timeout, ) if response.status_code == 404: diff --git a/src/dsis_client/api/client/_pagination.py b/src/dsis_client/api/client/_pagination.py index 387ce9c..4db0714 100644 --- a/src/dsis_client/api/client/_pagination.py +++ b/src/dsis_client/api/client/_pagination.py @@ -5,7 +5,7 @@ import logging import re -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from ..exceptions import DSISJSONParseError from ._base import _RequestBase @@ -114,7 +114,10 @@ def _build_nextlink_endpoint(self, endpoint: str, next_link: str) -> str: return next_link def _fetch_next_page( - self, endpoint: str, next_key: str + self, + endpoint: str, + next_key: str, + timeout: Optional[Union[float, tuple[float, float]]] = None, ) -> tuple[list[Any], Optional[str]]: """Fetch the next page of results with fallback handling. @@ -124,6 +127,9 @@ def _fetch_next_page( Args: endpoint: The full endpoint path for the request. next_key: The key to look for the next link in the response. + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Returns: A tuple of (items, next_link). Items may be empty if JSON parsing @@ -134,7 +140,7 @@ def _fetch_next_page( also fails. """ try: - response = self._request(endpoint, params=None) + response = self._request(endpoint, params=None, timeout=timeout) items = response.get("value", []) next_link = response.get(next_key) return items, next_link @@ -162,7 +168,11 @@ def _fetch_next_page( raise def _yield_nextlink_pages( - self, response: Dict[str, Any], endpoint: str, max_pages: int = -1 + self, + response: Dict[str, Any], + endpoint: str, + max_pages: int = -1, + timeout: Optional[Union[float, tuple[float, float]]] = None, ): """Generator that yields items from pages following OData nextLinks. @@ -172,6 +182,9 @@ def _yield_nextlink_pages( response: Initial API response dict endpoint: Full endpoint path from initial request (without query params) max_pages: Maximum number of pages to yield. -1 means unlimited (all pages). + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Yields: Individual items from each page's 'value' array @@ -196,7 +209,9 @@ def _yield_nextlink_pages( logger.info(f"Following nextLink: {next_link}") temp_endpoint = self._build_nextlink_endpoint(endpoint, next_link) - items, next_link = self._fetch_next_page(temp_endpoint, next_key) + items, next_link = self._fetch_next_page( + temp_endpoint, next_key, timeout=timeout + ) for item in items: yield item diff --git a/src/dsis_client/api/client/_query.py b/src/dsis_client/api/client/_query.py index bf89ea7..fe71c81 100644 --- a/src/dsis_client/api/client/_query.py +++ b/src/dsis_client/api/client/_query.py @@ -5,7 +5,7 @@ import json import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from ..exceptions import DSISJSONParseError from ..models import cast_results as _cast_results @@ -102,7 +102,11 @@ def _extract_value_array_from_text( return [], None def execute_query( - self, query: "QueryBuilder", cast: bool = False, max_pages: int = -1 + self, + query: "QueryBuilder", + cast: bool = False, + max_pages: int = -1, + timeout: Optional[Union[float, tuple[float, float]]] = None, ): """Execute a DSIS query. @@ -112,6 +116,9 @@ def execute_query( to model instances max_pages: Maximum number of pages to fetch. -1 (default) fetches all pages. Use 1 for a single page, 2 for two pages, etc. + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). Yields: Items from the result pages (or model instances if cast=True) @@ -133,6 +140,9 @@ def execute_query( >>> >>> # Fetch two pages >>> two_pages = list(client.execute_query(query, max_pages=2)) + >>> + >>> # With a 5 minute timeout per request + >>> items = list(client.execute_query(query, timeout=300)) """ # Import here to avoid circular imports from ..query import QueryBuilder @@ -150,7 +160,7 @@ def execute_query( # Try to make the request and handle JSON parsing errors try: - response = self._request(endpoint, params) + response = self._request(endpoint, params, timeout=timeout) except DSISJSONParseError as e: logger.warning( "JSON parsing failed. Attempting fallback: extracting data from raw text." @@ -177,10 +187,14 @@ def execute_query( "Cannot cast results: query has no schema class. " "Use .schema(ModelClass) when building the query." ) - for item in self._yield_nextlink_pages(response, endpoint, max_pages): + for item in self._yield_nextlink_pages( + response, endpoint, max_pages, timeout=timeout + ): yield query._schema_class(**item) else: - for item in self._yield_nextlink_pages(response, endpoint, max_pages): + for item in self._yield_nextlink_pages( + response, endpoint, max_pages, timeout=timeout + ): yield item def cast_results(self, results: List[Dict[str, Any]], schema_class) -> List[Any]: diff --git a/src/dsis_client/api/client/base_client.py b/src/dsis_client/api/client/base_client.py index ed74b2f..77724bf 100644 --- a/src/dsis_client/api/client/base_client.py +++ b/src/dsis_client/api/client/base_client.py @@ -93,6 +93,7 @@ def get( expand: Optional[str] = None, filter: Optional[str] = None, validate_schema: bool = True, + timeout: Optional[Union[float, tuple[float, float]]] = None, **extra_query: Any, ) -> Dict[str, Any]: """Make a GET request to the DSIS OData API. @@ -115,6 +116,9 @@ def get( filter: OData $filter parameter for filtering (OData filter expression) validate_schema: If True, validates that schema is a known model (default: True) + timeout: Request timeout in seconds. Can be a single float for both + connect and read timeouts, or a (connect, read) tuple. + None means no timeout (default). **extra_query: Additional OData query parameters Returns: @@ -163,4 +167,4 @@ def get( if extra_query: query.update(extra_query) - return self._request(endpoint, query) + return self._request(endpoint, query, timeout=timeout) diff --git a/tests/test_dsis_client_execute_query.py b/tests/test_dsis_client_execute_query.py index 798f6cb..9319c0d 100644 --- a/tests/test_dsis_client_execute_query.py +++ b/tests/test_dsis_client_execute_query.py @@ -64,7 +64,7 @@ def _make_client_and_patch(monkeypatch, district, project, response): pages = response if isinstance(response, list) else [response] state = {"idx": 0} - def fake_request(endpoint, params=None): + def fake_request(endpoint, params=None, timeout=None): i = state["idx"] if i < len(pages): page = pages[i]