Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions docs/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ client = DSISClient(cfg)
data = client.get("OW5000", "<record-id>")
```

## Request Timeout

All request methods (`execute_query()`, `get()`, `get_bulk_data()`, `get_bulk_data_stream()`) accept an optional `timeout` parameter:

```python
# Single value: both connect and read timeout (seconds)
items = list(client.execute_query(query, timeout=300))

# Tuple: (connect_timeout, read_timeout)
items = list(client.execute_query(query, timeout=(5, 300)))

# Also on get()
data = client.get("OW5000", "5000107", schema="Well", timeout=60)
```

- `None` (default): no timeout
- `float`: both connect and read timeout
- `(float, float)`: separate connect and read timeouts

## Error Handling Hint

Treat non-200 responses as exceptions; inspect message for status cues (401/403/404). Refresh tokens on auth failures.
Expand All @@ -96,13 +115,14 @@ Headers assembled internally include both tokens + subscription key; pass only e

## Binary Data Methods

### `get_bulk_data(query, *, accept="application/json")`
### `get_bulk_data(query, *, accept="application/json", timeout=None)`

Fetch binary bulk data (protobuf) for an entity. Loads entire response into memory.

**Parameters:**
- `query`: QueryBuilder instance configured with `.schema()` and `.entity()` calls
- `accept`: Accept header value (default: `"application/json"`). Use `"application/octet-stream"` for raw binary endpoints (e.g., SurfaceGrid/$value)
- `timeout`: Request timeout in seconds. `float` for both connect/read, `(float, float)` tuple for separate connect/read timeouts, or `None` for no timeout (default)

**Returns:** `Optional[bytes]` - Binary protobuf data or None if no data

Expand All @@ -124,14 +144,15 @@ bulk_query = query.entity(grids[0]["native_uid"], data_field="$value")
binary_data = client.get_bulk_data(bulk_query, accept="application/octet-stream")
```

### `get_bulk_data_stream(query, *, chunk_size=10*1024*1024, accept="application/json")`
### `get_bulk_data_stream(query, *, chunk_size=10*1024*1024, accept="application/json", timeout=None)`

Stream binary bulk data in chunks for memory-efficient processing.

**Parameters:**
- `query`: QueryBuilder instance configured with `.schema()` and `.entity()` calls
- `chunk_size`: Size of chunks to yield (default: 10MB, DSIS recommended)
- `accept`: Accept header value (default: `"application/json"`)
- `timeout`: Request timeout in seconds. `float` for both connect/read, `(float, float)` tuple for separate connect/read timeouts, or `None` for no timeout (default)

**Yields:** Binary data chunks as bytes

Expand Down
30 changes: 30 additions & 0 deletions docs/guides/query-builder.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,35 @@ print(f"First two pages: {len(two_pages_items)} wells")
- `1`: You only need a sample, or want to implement custom pagination
- `N>1`: You want to process data in page-sized chunks

## Request Timeout

You can set an optional `timeout` parameter on `execute_query()`, `get()`, `get_bulk_data()`, and `get_bulk_data_stream()` to control how long each HTTP request waits before raising an error. By default, no timeout is applied.

```python
# Single timeout value (seconds) — applies to both connect and read
for item in client.execute_query(query, timeout=300):
process(item)

# Tuple timeout — (connect_timeout, read_timeout) in seconds
for item in client.execute_query(query, timeout=(5, 300)):
process(item)

# Also works with get(), get_bulk_data(), and get_bulk_data_stream()
data = client.get("OW5000", "5000107", "123", "SNORRE", schema="Well", timeout=60)
binary = client.get_bulk_data(bulk_query, timeout=600)
```

**timeout Parameter:**

- `timeout=None` (default): No timeout — wait indefinitely
- `timeout=300`: Both connect and read timeout set to 300 seconds
- `timeout=(5, 300)`: Connect timeout of 5 seconds, read timeout of 300 seconds

The timeout applies to **each individual HTTP request**, including pagination requests. If a query fetches multiple pages, each page request uses the same timeout.

!!! tip
For large paginated queries, use a generous read timeout (e.g., `timeout=(5, 300)`) to allow time for the server to process each page while still failing fast on connection issues.

## Execution Patterns

### ⚠️ Critical: Schema Requirement for `cast=True`
Expand Down Expand Up @@ -498,6 +527,7 @@ print(f"First two pages: {len(two_pages_wells)} wells")
4. **Reuse QueryBuilder**: Use `.reset()` to clear and rebuild queries instead of creating new instances
5. **Enable auto-casting**: Use `cast=True` with model classes for type-safe results
6. **Test connection first**: Call `client.test_connection()` when setting up to see if credentials are correct
7. **Set timeouts for production**: Use `timeout` to prevent requests from hanging indefinitely (e.g., `timeout=300` for 5 minutes)

## See Also

Expand Down
17 changes: 17 additions & 0 deletions docs/guides/working-with-binary-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,23 @@ for i, el in enumerate(lgc.elements[:5]): # Show first 5

## Important Notes

### Request Timeout

Both `get_bulk_data()` and `get_bulk_data_stream()` accept an optional `timeout` parameter to control how long each HTTP request waits:

```python
# Set a 10-minute timeout for large binary downloads
binary_data = client.get_bulk_data(bulk_query, timeout=600)

# Separate connect and read timeouts
for chunk in client.get_bulk_data_stream(bulk_query, timeout=(5, 600)):
chunks.append(chunk)
```

- `timeout=None` (default): No timeout — wait indefinitely
- `timeout=600`: Both connect and read timeout set to 600 seconds
- `timeout=(5, 600)`: Connect timeout of 5s, read timeout of 600s

### Memory Management

- **Small data (< 100MB)**: Use `get_bulk_data()` - simpler, loads everything at once
Expand Down
15 changes: 12 additions & 3 deletions src/dsis_client/api/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
access without runtime overhead.
"""

from typing import TYPE_CHECKING, Any, Dict, Generator, Optional
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Union

if TYPE_CHECKING:
from ..config import DSISConfig
Expand All @@ -18,7 +18,10 @@ class _RequestBase:
config: "DSISConfig"

def _request(
self, endpoint: str, params: Optional[Dict[str, Any]] = None
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Dict[str, Any]: ...


Expand All @@ -28,7 +31,11 @@ class _PaginationBase(_RequestBase):
if TYPE_CHECKING:

def _yield_nextlink_pages(
self, response: Dict[str, Any], endpoint: str, max_pages: int = -1
self,
response: Dict[str, Any],
endpoint: str,
max_pages: int = -1,
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Generator[Dict[str, Any], None, None]: ...

def _extract_nextlink_from_text(self, response_text: str) -> Optional[str]: ...
Expand All @@ -45,6 +52,7 @@ def _request_binary(
endpoint: str,
params: Optional[Dict[str, Any]] = None,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Optional[bytes]: ...

def _request_binary_stream(
Expand All @@ -53,4 +61,5 @@ def _request_binary_stream(
params: Optional[Dict[str, Any]] = None,
chunk_size: int = 10 * 1024 * 1024,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Generator[bytes, None, None]: ...
14 changes: 11 additions & 3 deletions src/dsis_client/api/client/_bulk_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

import logging
from typing import TYPE_CHECKING, Generator, Optional
from typing import TYPE_CHECKING, Generator, Optional, Union

from ._base import _BinaryRequestBase

Expand All @@ -26,6 +26,7 @@ def get_bulk_data(
query: "QueryBuilder",
*,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Optional[bytes]:
"""Fetch binary bulk data (protobuf) for a specific entity.

Expand All @@ -43,6 +44,9 @@ def get_bulk_data(
accept: Accept header value for the HTTP request
(default: ``"application/json"``). Use ``"application/octet-stream"``
for endpoints that serve raw binary data (e.g., SurfaceGrid/$value).
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Returns:
Binary protobuf data as bytes, or None if the entity has no bulk data
Expand Down Expand Up @@ -80,14 +84,15 @@ def get_bulk_data(

endpoint = query.build_endpoint()
logger.info(f"Fetching bulk data from: {endpoint}")
return self._request_binary(endpoint, accept=accept)
return self._request_binary(endpoint, accept=accept, timeout=timeout)

def get_bulk_data_stream(
self,
query: "QueryBuilder",
*,
chunk_size: int = 10 * 1024 * 1024,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Generator[bytes, None, None]:
"""Stream binary bulk data (protobuf) in chunks for memory-efficient processing.

Expand All @@ -106,6 +111,9 @@ def get_bulk_data_stream(
accept: Accept header value for the HTTP request
(default: ``"application/json"``). Use ``"application/octet-stream"``
for endpoints that serve raw binary data (e.g., SurfaceGrid/$value).
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Yields:
Binary data chunks as bytes. Returns immediately if no bulk data (404).
Expand Down Expand Up @@ -140,5 +148,5 @@ def get_bulk_data_stream(
endpoint = query.build_endpoint()
logger.info(f"Streaming bulk data from: {endpoint} (chunk_size={chunk_size})")
yield from self._request_binary_stream(
endpoint, chunk_size=chunk_size, accept=accept
endpoint, chunk_size=chunk_size, accept=accept, timeout=timeout
)
32 changes: 27 additions & 5 deletions src/dsis_client/api/client/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import json
import logging
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional
from typing import TYPE_CHECKING, Any, Dict, Generator, Optional, Union
from urllib.parse import urljoin

from ..exceptions import DSISAPIError, DSISJSONParseError
Expand Down Expand Up @@ -40,6 +40,7 @@ def _make_request_with_retry(
extra_headers: Optional[Dict[str, str]] = None,
stream: bool = False,
request_type: str = "standard",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> "requests.Response":
"""Make an HTTP GET request with automatic token refresh retry.

Expand All @@ -52,6 +53,9 @@ def _make_request_with_retry(
extra_headers: Additional headers to merge with auth headers
stream: Whether to stream the response
request_type: Description for logging (e.g., "binary", "streaming")
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Returns:
The HTTP response object (after potential retry)
Expand All @@ -60,7 +64,9 @@ def _make_request_with_retry(
if extra_headers:
headers.update(extra_headers)

response = self._session.get(url, headers=headers, params=params, stream=stream)
response = self._session.get(
url, headers=headers, params=params, stream=stream, timeout=timeout
)

if response.status_code in _RETRY_STATUS_CODES:
logger.warning(
Expand All @@ -74,13 +80,16 @@ def _make_request_with_retry(
if extra_headers:
headers.update(extra_headers)
response = self._session.get(
url, headers=headers, params=params, stream=stream
url, headers=headers, params=params, stream=stream, timeout=timeout
)

return response

def _request(
self, endpoint: str, params: Optional[Dict[str, Any]] = None
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Dict[str, Any]:
"""Make an authenticated GET request to the DSIS API.

Expand All @@ -91,6 +100,9 @@ def _request(
Args:
endpoint: API endpoint path
params: Query parameters
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Returns:
Parsed JSON response as dictionary
Expand All @@ -100,7 +112,7 @@ def _request(
"""
url = urljoin(f"{self.config.data_endpoint}/", endpoint)
logger.info(f"Making request to {url}")
response = self._make_request_with_retry(url, params)
response = self._make_request_with_retry(url, params, timeout=timeout)

if response.status_code != 200:
error_msg = (
Expand Down Expand Up @@ -135,6 +147,7 @@ def _request_binary(
endpoint: str,
params: Optional[Dict[str, Any]] = None,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Optional[bytes]:
"""Make an authenticated GET request for binary data.

Expand All @@ -145,6 +158,9 @@ def _request_binary(
endpoint: API endpoint path
params: Query parameters
accept: Accept header value (default: "application/json")
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Returns:
Binary response content, or None if the entity has no bulk data (404)
Expand All @@ -159,6 +175,7 @@ def _request_binary(
params,
extra_headers={"Accept": accept},
request_type="binary",
timeout=timeout,
)

if response.status_code == 404:
Expand All @@ -181,6 +198,7 @@ def _request_binary_stream(
params: Optional[Dict[str, Any]] = None,
chunk_size: int = 10 * 1024 * 1024,
accept: str = "application/json",
timeout: Optional[Union[float, tuple[float, float]]] = None,
) -> Generator[bytes, None, None]:
"""Stream binary data in chunks to avoid loading large datasets into memory.

Expand All @@ -192,6 +210,9 @@ def _request_binary_stream(
params: Query parameters
chunk_size: Size of chunks to yield (default: 10MB, recommended by DSIS)
accept: Accept header value (default: "application/json")
timeout: Request timeout in seconds. Can be a single float for both
connect and read timeouts, or a (connect, read) tuple.
None means no timeout (default).

Yields:
Binary data chunks as bytes
Expand All @@ -208,6 +229,7 @@ def _request_binary_stream(
extra_headers={"Accept": accept},
stream=True,
request_type="streaming",
timeout=timeout,
)

if response.status_code == 404:
Expand Down
Loading