-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_streaming.py
More file actions
219 lines (176 loc) · 6.26 KB
/
example_streaming.py
File metadata and controls
219 lines (176 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
"""
Example: Streaming Binary Data for Memory-Efficient Processing
This example demonstrates how to use the streaming API to handle large
binary bulk data without loading everything into memory at once.
Key Benefits:
- Memory efficient: Only stores one chunk at a time
- Progress tracking: Can report download progress
- Early termination: Can stop streaming if needed
- Large dataset friendly: Handles multi-GB datasets
"""
import os
from dotenv import load_dotenv
from dsis_model_sdk.models.common import HorizonData3D, LogCurve, SeismicDataSet3D
from dsis_model_sdk.protobuf import (
decode_horizon_data,
decode_log_curves,
decode_seismic_float_data,
)
from dsis_client import DSISClient, DSISConfig, Environment, QueryBuilder
# Load environment variables
load_dotenv()
# Configuration
config = DSISConfig(
environment=Environment.DEV,
tenant_id=os.getenv("TENANT_ID"),
client_id=os.getenv("CLIENT_ID"),
client_secret=os.getenv("CLIENT_SECRET"),
username=os.getenv("USERNAME"),
password=os.getenv("PASSWORD"),
subscription_key=os.getenv("SUBSCRIPTION_KEY"),
)
# Initialize client
client = DSISClient(config)
# Test parameters
district_id = os.getenv("DISTRICT_ID")
project = os.getenv("PROJECT")
print("=" * 80)
print("DSIS Python Client - Streaming Binary Data")
print("=" * 80)
# Example 1: Stream with Progress Tracking
print("\n\nExample 1: Stream Seismic Data with Progress Tracking")
print("-" * 80)
# Query for seismic metadata
query = (
QueryBuilder(
model_name="OW5000",
district_id=district_id,
project=project,
)
.schema(SeismicDataSet3D)
.select("seismic_dataset_name,native_uid")
)
seismic_datasets = list(client.execute_query(query, cast=True, max_pages=1))
if seismic_datasets:
seismic = seismic_datasets[0]
print(f"Seismic Dataset: {seismic.seismic_dataset_name}")
print("Streaming binary data in 10MB chunks (DSIS recommended)...")
chunks = []
total_bytes = 0
chunk_count = 0
# Stream data with progress tracking
for chunk in client.get_bulk_data_stream(
schema=SeismicDataSet3D, # Type-safe!
native_uid=seismic, # Pass entity directly
query=query,
chunk_size=10 * 1024 * 1024, # 10MB chunks (DSIS recommended)
):
chunks.append(chunk)
chunk_count += 1
total_bytes += len(chunk)
print(
f" Chunk {chunk_count}: {len(chunk):,} bytes (Total: {total_bytes / 1024 / 1024:.2f} MB)"
)
if chunks:
print(
f"\n✓ Downloaded {chunk_count} chunks, total: {total_bytes / 1024 / 1024:.2f} MB"
)
# Combine and decode
print("Decoding protobuf data...")
binary_data = b"".join(chunks)
decoded = decode_seismic_float_data(binary_data)
print(
f"✓ Decoded successfully: {decoded.length.i} x {decoded.length.j} x {decoded.length.k}"
)
else:
print("⚠ No bulk data available")
# Example 2: Stream to File (Save Without Loading to Memory)
print("\n\nExample 2: Stream Directly to File")
print("-" * 80)
query = (
QueryBuilder(
model_name="OW5000",
district_id=district_id,
project=project,
)
.schema(HorizonData3D)
.select("horizon_name,native_uid")
)
horizons = list(client.execute_query(query, cast=True, max_pages=1))
if horizons:
horizon = horizons[0]
print(f"Horizon: {horizon.horizon_name}")
print("Streaming to file...")
output_file = "/tmp/horizon_data.bin"
total_bytes = 0
with open(output_file, "wb") as f:
for chunk in client.get_bulk_data_stream(
schema=HorizonData3D, # Type-safe!
native_uid=horizon, # Pass entity directly
query=query,
chunk_size=10 * 1024 * 1024, # 10MB chunks (DSIS recommended)
):
f.write(chunk)
total_bytes += len(chunk)
if total_bytes > 0:
print(f"✓ Saved {total_bytes:,} bytes to {output_file}")
# Now read and decode from file
print("Reading from file and decoding...")
with open(output_file, "rb") as f:
binary_data = f.read()
decoded = decode_horizon_data(binary_data)
print(f"✓ Decoded: {decoded.numberOfRows} x {decoded.numberOfColumns} grid")
else:
print("⚠ No bulk data available")
# Example 3: Conditional Streaming (Early Termination)
print("\n\nExample 3: Conditional Streaming with Size Limit")
print("-" * 80)
query = (
QueryBuilder(
model_name="OW5000",
district_id=district_id,
project=project,
)
.schema(LogCurve)
.select("log_curve_name,native_uid")
)
log_curves = list(client.execute_query(query, cast=True, max_pages=1))
if log_curves:
log_curve = log_curves[0]
print(f"Log Curve: {log_curve.log_curve_name}")
max_size = 100 * 1024 * 1024 # 100MB limit
chunks = []
total_bytes = 0
print(f"Streaming with {max_size / 1024 / 1024:.1f}MB size limit...")
for chunk in client.get_bulk_data_stream(
schema=LogCurve, # Type-safe!
native_uid=log_curve, # Pass entity directly
query=query,
chunk_size=10 * 1024 * 1024, # 10MB chunks (DSIS recommended)
):
chunks.append(chunk)
total_bytes += len(chunk)
# Stop if we exceed size limit
if total_bytes > max_size:
print(
f"⚠ Size limit exceeded! Downloaded: {total_bytes / 1024 / 1024:.2f} MB"
)
print(" Stopping stream early...")
break
if chunks and total_bytes <= max_size:
print(f"✓ Complete download: {total_bytes:,} bytes")
binary_data = b"".join(chunks)
decoded = decode_log_curves(binary_data)
print(f"✓ Decoded: {decoded.index.number_of_index} samples")
else:
print("⚠ No data or size limit exceeded")
print("\n" + "=" * 80)
print("Streaming Examples Complete!")
print("=" * 80)
print("\nKey Advantages of Streaming:")
print("1. Memory efficient - only one chunk in memory at a time")
print("2. Progress tracking - see download progress in real-time")
print("3. Direct to file - save without loading into memory")
print("4. Early termination - stop if data exceeds limits")
print("5. Large datasets - handle multi-GB files without memory issues")
print("=" * 80)