Skip to content

Commit a1f2287

Browse files
committed
Breaking change: Rename Stage to Volume
Signed-off-by: lentitude2tk <xushuang.hu@zilliz.com>
1 parent afff26c commit a1f2287

12 files changed

+638
-358
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# Copyright (C) 2019-2023 Zilliz. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4+
# in compliance with the License. You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software distributed under the License
9+
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10+
# or implied. See the License for the specific language governing permissions and limitations under
11+
# the License.
12+
13+
import json
14+
import logging
15+
import time
16+
import numpy as np
17+
18+
from examples.bulk_import.data_gengerator import *
19+
from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter
20+
from pymilvus.orm import utility
21+
22+
logging.basicConfig(level=logging.INFO)
23+
24+
from pymilvus import (
25+
connections,
26+
FieldSchema, CollectionSchema, DataType,
27+
Collection,
28+
)
29+
30+
from pymilvus.bulk_writer import (
31+
BulkFileType,
32+
list_import_jobs,
33+
bulk_import,
34+
get_import_progress,
35+
)
36+
37+
# zilliz cluster
38+
PUBLIC_ENDPOINT = "cluster.endpoint"
39+
USER_NAME = "user.name"
40+
PASSWORD = "password"
41+
42+
# The value of the URL is fixed.
43+
# For overseas regions, it is: https://api.cloud.zilliz.com
44+
# For regions in China, it is: https://api.cloud.zilliz.com.cn
45+
CLOUD_ENDPOINT = "https://api.cloud.zilliz.com"
46+
API_KEY = "_api_key_for_cluster_org_"
47+
48+
# This is currently a private preview feature. If you need to use it, please submit a request and contact us.
49+
VOLUME_NAME = "_volume_name_for_project_"
50+
51+
CLUSTER_ID = "_your_cloud_cluster_id_"
52+
DB_NAME = "" # If db_name is not specified, use ""
53+
COLLECTION_NAME = "_collection_name_on_the_db_"
54+
PARTITION_NAME = "" # If partition_name is not specified, use ""
55+
DIM = 512
56+
57+
58+
def create_connection():
59+
print(f"\nCreate connection...")
60+
connections.connect(uri=PUBLIC_ENDPOINT, user=USER_NAME, password=PASSWORD)
61+
print(f"\nConnected")
62+
63+
64+
def build_all_type_schema():
65+
print(f"\n===================== build all types schema ====================")
66+
fields = [
67+
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
68+
FieldSchema(name="bool", dtype=DataType.BOOL),
69+
FieldSchema(name="int8", dtype=DataType.INT8),
70+
FieldSchema(name="int16", dtype=DataType.INT16),
71+
FieldSchema(name="int32", dtype=DataType.INT32),
72+
FieldSchema(name="int64", dtype=DataType.INT64),
73+
FieldSchema(name="float", dtype=DataType.FLOAT),
74+
FieldSchema(name="double", dtype=DataType.DOUBLE),
75+
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=512),
76+
FieldSchema(name="json", dtype=DataType.JSON),
77+
# from 2.4.0, milvus supports multiple vector fields in one collection
78+
# FieldSchema(name="float_vector", dtype=DataType.FLOAT_VECTOR, dim=DIM),
79+
FieldSchema(name="binary_vector", dtype=DataType.BINARY_VECTOR, dim=DIM),
80+
FieldSchema(name="float16_vector", dtype=DataType.FLOAT16_VECTOR, dim=DIM),
81+
FieldSchema(name="bfloat16_vector", dtype=DataType.BFLOAT16_VECTOR, dim=DIM),
82+
]
83+
84+
# milvus doesn't support parsing array/sparse_vector from numpy file
85+
fields.append(
86+
FieldSchema(name="array_str", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.VARCHAR,
87+
max_length=128))
88+
fields.append(
89+
FieldSchema(name="array_int", dtype=DataType.ARRAY, max_capacity=100, element_type=DataType.INT64))
90+
fields.append(FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR))
91+
92+
schema = CollectionSchema(fields=fields, enable_dynamic_field=True)
93+
return schema
94+
95+
96+
def example_collection_remote_volume(file_type: BulkFileType):
97+
schema = build_all_type_schema()
98+
print(f"\n===================== all field types ({file_type.name}) ====================")
99+
create_collection(schema, False)
100+
volume_upload_result = volume_remote_writer(file_type, schema)
101+
call_volume_import(volume_upload_result['volume_name'], volume_upload_result['path'])
102+
retrieve_imported_data()
103+
104+
105+
def create_collection(schema: CollectionSchema, drop_if_exist: bool):
106+
if utility.has_collection(COLLECTION_NAME):
107+
if drop_if_exist:
108+
utility.drop_collection(COLLECTION_NAME)
109+
else:
110+
collection = Collection(name=COLLECTION_NAME, schema=schema)
111+
print(f"Collection '{collection.name}' created")
112+
113+
114+
def volume_remote_writer(file_type, schema):
115+
with VolumeBulkWriter(
116+
schema=schema,
117+
remote_path="bulk_data",
118+
file_type=file_type,
119+
chunk_size=512 * 1024 * 1024,
120+
cloud_endpoint=CLOUD_ENDPOINT,
121+
api_key=API_KEY,
122+
volume_name=VOLUME_NAME,
123+
) as volume_bulk_writer:
124+
print("Append rows")
125+
batch_count = 10000
126+
for i in range(batch_count):
127+
row = {
128+
"id": i,
129+
"bool": True if i % 5 == 0 else False,
130+
"int8": i % 128,
131+
"int16": i % 1000,
132+
"int32": i % 100000,
133+
"int64": i,
134+
"float": i / 3,
135+
"double": i / 7,
136+
"varchar": f"varchar_{i}",
137+
"json": {"dummy": i, "ok": f"name_{i}"},
138+
# "float_vector": gen_float_vector(False),
139+
"binary_vector": gen_binary_vector(False, DIM),
140+
"float16_vector": gen_fp16_vector(False, DIM),
141+
"bfloat16_vector": gen_bf16_vector(False, DIM),
142+
f"dynamic_{i}": i,
143+
# bulkinsert doesn't support import npy with array field and sparse vector,
144+
# if file_type is numpy, the below values will be stored into dynamic field
145+
"array_str": [f"str_{k}" for k in range(5)],
146+
"array_int": [k for k in range(10)],
147+
"sparse_vector": gen_sparse_vector(False),
148+
}
149+
volume_bulk_writer.append_row(row)
150+
151+
# append rows by numpy type
152+
for i in range(batch_count):
153+
id = i + batch_count
154+
volume_bulk_writer.append_row({
155+
"id": np.int64(id),
156+
"bool": True if i % 3 == 0 else False,
157+
"int8": np.int8(id % 128),
158+
"int16": np.int16(id % 1000),
159+
"int32": np.int32(id % 100000),
160+
"int64": np.int64(id),
161+
"float": np.float32(id / 3),
162+
"double": np.float64(id / 7),
163+
"varchar": f"varchar_{id}",
164+
"json": json.dumps({"dummy": id, "ok": f"name_{id}"}),
165+
# "float_vector": gen_float_vector(True),
166+
"binary_vector": gen_binary_vector(True, DIM),
167+
"float16_vector": gen_fp16_vector(True, DIM),
168+
"bfloat16_vector": gen_bf16_vector(True, DIM),
169+
f"dynamic_{id}": id,
170+
# bulkinsert doesn't support import npy with array field and sparse vector,
171+
# if file_type is numpy, the below values will be stored into dynamic field
172+
"array_str": np.array([f"str_{k}" for k in range(5)], np.dtype("str")),
173+
"array_int": np.array([k for k in range(10)], np.dtype("int64")),
174+
"sparse_vector": gen_sparse_vector(True),
175+
})
176+
177+
print(f"{volume_bulk_writer.total_row_count} rows appends")
178+
print(f"{volume_bulk_writer.buffer_row_count} rows in buffer not flushed")
179+
print("Generate data files...")
180+
volume_bulk_writer.commit()
181+
print(f"Data files have been uploaded: {volume_bulk_writer.batch_files}")
182+
return volume_bulk_writer.get_volume_upload_result()
183+
184+
185+
def retrieve_imported_data():
186+
collection = Collection(name=COLLECTION_NAME)
187+
print("Create index...")
188+
for field in collection.schema.fields:
189+
if (field.dtype == DataType.FLOAT_VECTOR or field.dtype == DataType.FLOAT16_VECTOR
190+
or field.dtype == DataType.BFLOAT16_VECTOR):
191+
collection.create_index(field_name=field.name, index_params={
192+
"index_type": "FLAT",
193+
"params": {},
194+
"metric_type": "L2"
195+
})
196+
elif field.dtype == DataType.BINARY_VECTOR:
197+
collection.create_index(field_name=field.name, index_params={
198+
"index_type": "BIN_FLAT",
199+
"params": {},
200+
"metric_type": "HAMMING"
201+
})
202+
elif field.dtype == DataType.SPARSE_FLOAT_VECTOR:
203+
collection.create_index(field_name=field.name, index_params={
204+
"index_type": "SPARSE_INVERTED_INDEX",
205+
"metric_type": "IP",
206+
"params": {"drop_ratio_build": 0.2}
207+
})
208+
209+
ids = [100, 15000]
210+
print(f"Load collection and query items {ids}")
211+
collection.load()
212+
expr = f"id in {ids}"
213+
print(expr)
214+
results = collection.query(expr=expr, output_fields=["*", "vector"])
215+
print("Query results:")
216+
for item in results:
217+
print(item)
218+
219+
220+
def call_volume_import(volume_name: str, path: str):
221+
print(f"\n===================== import files to cluster ====================")
222+
resp = bulk_import(
223+
url=CLOUD_ENDPOINT,
224+
api_key=API_KEY,
225+
cluster_id=CLUSTER_ID,
226+
db_name=DB_NAME,
227+
collection_name=COLLECTION_NAME,
228+
volume_name=volume_name,
229+
data_paths=[[path]]
230+
)
231+
print(resp.json())
232+
job_id = resp.json()['data']['jobId']
233+
print(f"Create a cloudImport job, job id: {job_id}")
234+
235+
print(f"\n===================== list import jobs ====================")
236+
resp = list_import_jobs(
237+
url=CLOUD_ENDPOINT,
238+
cluster_id=CLUSTER_ID,
239+
api_key=API_KEY,
240+
page_size=10,
241+
current_page=1,
242+
)
243+
print(resp.json())
244+
245+
while True:
246+
print("Wait 5 second to check cloudImport job state...")
247+
time.sleep(5)
248+
249+
print(f"\n===================== get import job progress ====================")
250+
resp = get_import_progress(
251+
url=CLOUD_ENDPOINT,
252+
cluster_id=CLUSTER_ID,
253+
api_key=API_KEY,
254+
job_id=job_id,
255+
)
256+
257+
state = resp.json()['data']['state']
258+
progress = resp.json()['data']['progress']
259+
if state == "Importing":
260+
print(f"The job {job_id} is importing... {progress}%")
261+
continue
262+
if state == "Failed":
263+
reason = resp.json()['data']['reason']
264+
print(f"The job {job_id} failed, reason: {reason}")
265+
break
266+
if state == "Completed" and progress == 100:
267+
print(f"The job {job_id} completed")
268+
break
269+
270+
271+
if __name__ == '__main__':
272+
create_connection()
273+
example_collection_remote_volume(file_type=BulkFileType.PARQUET)

examples/bulk_import/example_stage_file_manager.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

examples/bulk_import/example_stage_manager.py

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from pymilvus.bulk_writer.constants import ConnectType
2+
from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager
3+
4+
if __name__ == "__main__":
5+
volume_file_manager = VolumeFileManager(
6+
cloud_endpoint='https://api.cloud.zilliz.com',
7+
api_key='_api_key_for_cluster_org_',
8+
volume_name='_volume_name_for_project_',
9+
connect_type=ConnectType.AUTO,
10+
)
11+
result = volume_file_manager.upload_file_to_volume("/Users/zilliz/data/", "data/")
12+
print(f"\nuploadFileToVolume results: {result}")
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pymilvus.bulk_writer.volume_manager import VolumeManager
2+
3+
PROJECT_ID = "_id_for_project_"
4+
REGION_ID = "_id_for_region_"
5+
VOLUME_NAME = "_volume_name_for_project_"
6+
7+
if __name__ == "__main__":
8+
volume_manager = VolumeManager(
9+
cloud_endpoint="https://api.cloud.zilliz.com",
10+
api_key="_api_key_for_cluster_org_",
11+
)
12+
13+
volume_manager.create_volume(PROJECT_ID, REGION_ID, VOLUME_NAME)
14+
print(f"\nVolume {VOLUME_NAME} created")
15+
16+
volume_list = volume_manager.list_volumes(PROJECT_ID, 1, 10)
17+
print(f"\nlistVolumes results: ", volume_list.json()['data'])
18+
19+
volume_manager.delete_volume(VOLUME_NAME)
20+
print(f"\nVolume {VOLUME_NAME} deleted")

pymilvus/bulk_writer/bulk_import.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def bulk_import(
114114
access_key: str = "",
115115
secret_key: str = "",
116116
token: str = "",
117-
stage_name: str = "",
117+
volume_name: str = "",
118118
data_paths: [List[List[str]]] = None,
119119
verify: Optional[Union[bool, str]] = True,
120120
cert: Optional[Union[str, tuple]] = None,
@@ -139,7 +139,7 @@ def bulk_import(
139139
secret_key (str): secret key to access the object storage(cloud)
140140
token (str): access token to access the object storage(cloud)
141141
142-
stage_name (str): name of the stage to import(cloud)
142+
volume_name (str): name of the volume to import(cloud)
143143
data_paths (list of list of str): The paths of files that contain the data to import(cloud)
144144
verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate
145145
or a string, which must be server's certificate path. Defaults to `True`.
@@ -181,15 +181,15 @@ def bulk_import(
181181
... token="your-token" # for short-term credentials, also include `token`
182182
... )
183183
184-
>>> # 3. Import multiple files or folders from a Zilliz Stage into a Zilliz Cloud instance
184+
>>> # 3. Import multiple files or folders from a Zilliz Volume into a Zilliz Cloud instance
185185
>>> bulk_import(
186186
... url="https://api.cloud.zilliz.com", # If regions in China, it is: https://api.cloud.zilliz.com.cn
187187
... api_key="YOUR_API_KEY",
188188
... cluster_id="in0x-xxx",
189189
... db_name="", # Only For Dedicated deployments: this parameter can be specified.
190190
... collection_name="my_collection",
191191
... partition_name="", # If Collection not enable partitionKey, can be specified.
192-
... stage_name="my_stage",
192+
... volume_name="my_volume",
193193
... data_paths=[
194194
... ["parquet-folder/1.parquet"],
195195
... ["parquet-folder-2/"]
@@ -210,7 +210,7 @@ def bulk_import(
210210
"accessKey": access_key,
211211
"secretKey": secret_key,
212212
"token": token,
213-
"stageName": stage_name,
213+
"volumeName": volume_name,
214214
"dataPaths": data_paths,
215215
}
216216

0 commit comments

Comments
 (0)