From 85cb7b353a56f931d9e16a3bafe7df9a85eb2d52 Mon Sep 17 00:00:00 2001 From: lentitude2tk Date: Fri, 21 Nov 2025 11:03:03 +0800 Subject: [PATCH] Breaking change: Rename Stage to Volume Signed-off-by: lentitude2tk --- ..._stage.py => example_bulkwriter_volume.py} | 38 +- .../bulk_import/example_stage_file_manager.py | 12 - examples/bulk_import/example_stage_manager.py | 20 - .../example_volume_file_manager.py | 12 + .../bulk_import/example_volume_manager.py | 20 + pymilvus/bulk_writer/bulk_import.py | 10 +- pymilvus/bulk_writer/stage_manager.py | 39 -- ...e_bulk_writer.py => volume_bulk_writer.py} | 24 +- ...file_manager.py => volume_file_manager.py} | 89 +-- pymilvus/bulk_writer/volume_manager.py | 37 ++ .../{stage_restful.py => volume_restful.py} | 42 +- pyproject.toml | 7 + tests/test_bulk_writer_stage.py | 626 ++++++++++++++++++ 13 files changed, 808 insertions(+), 168 deletions(-) rename examples/bulk_import/{example_bulkwriter_stage.py => example_bulkwriter_volume.py} (90%) delete mode 100644 examples/bulk_import/example_stage_file_manager.py delete mode 100644 examples/bulk_import/example_stage_manager.py create mode 100644 examples/bulk_import/example_volume_file_manager.py create mode 100644 examples/bulk_import/example_volume_manager.py delete mode 100644 pymilvus/bulk_writer/stage_manager.py rename pymilvus/bulk_writer/{stage_bulk_writer.py => volume_bulk_writer.py} (82%) rename pymilvus/bulk_writer/{stage_file_manager.py => volume_file_manager.py} (70%) create mode 100644 pymilvus/bulk_writer/volume_manager.py rename pymilvus/bulk_writer/{stage_restful.py => volume_restful.py} (86%) create mode 100644 tests/test_bulk_writer_stage.py diff --git a/examples/bulk_import/example_bulkwriter_stage.py b/examples/bulk_import/example_bulkwriter_volume.py similarity index 90% rename from examples/bulk_import/example_bulkwriter_stage.py rename to examples/bulk_import/example_bulkwriter_volume.py index e89072538..f63c64e8d 100644 --- a/examples/bulk_import/example_bulkwriter_stage.py +++ b/examples/bulk_import/example_bulkwriter_volume.py @@ -16,7 +16,7 @@ import numpy as np from examples.bulk_import.data_gengerator import * -from pymilvus.bulk_writer.stage_bulk_writer import StageBulkWriter +from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter from pymilvus.orm import utility logging.basicConfig(level=logging.INFO) @@ -46,7 +46,7 @@ API_KEY = "_api_key_for_cluster_org_" # This is currently a private preview feature. If you need to use it, please submit a request and contact us. -STAGE_NAME = "_stage_name_for_project_" +VOLUME_NAME = "_volume_name_for_project_" CLUSTER_ID = "_your_cloud_cluster_id_" DB_NAME = "" # If db_name is not specified, use "" @@ -93,12 +93,12 @@ def build_all_type_schema(): return schema -def example_collection_remote_stage(file_type: BulkFileType): +def example_collection_remote_volume(file_type: BulkFileType): schema = build_all_type_schema() print(f"\n===================== all field types ({file_type.name}) ====================") create_collection(schema, False) - stage_upload_result = stage_remote_writer(file_type, schema) - call_stage_import(stage_upload_result['stage_name'], stage_upload_result['path']) + volume_upload_result = volume_remote_writer(file_type, schema) + call_volume_import(volume_upload_result['volume_name'], volume_upload_result['path']) retrieve_imported_data() @@ -111,16 +111,16 @@ def create_collection(schema: CollectionSchema, drop_if_exist: bool): print(f"Collection '{collection.name}' created") -def stage_remote_writer(file_type, schema): - with StageBulkWriter( +def volume_remote_writer(file_type, schema): + with VolumeBulkWriter( schema=schema, remote_path="bulk_data", file_type=file_type, chunk_size=512 * 1024 * 1024, cloud_endpoint=CLOUD_ENDPOINT, api_key=API_KEY, - stage_name=STAGE_NAME, - ) as stage_bulk_writer: + volume_name=VOLUME_NAME, + ) as volume_bulk_writer: print("Append rows") batch_count = 10000 for i in range(batch_count): @@ -146,12 +146,12 @@ def stage_remote_writer(file_type, schema): "array_int": [k for k in range(10)], "sparse_vector": gen_sparse_vector(False), } - stage_bulk_writer.append_row(row) + volume_bulk_writer.append_row(row) # append rows by numpy type for i in range(batch_count): id = i + batch_count - stage_bulk_writer.append_row({ + volume_bulk_writer.append_row({ "id": np.int64(id), "bool": True if i % 3 == 0 else False, "int8": np.int8(id % 128), @@ -174,12 +174,12 @@ def stage_remote_writer(file_type, schema): "sparse_vector": gen_sparse_vector(True), }) - print(f"{stage_bulk_writer.total_row_count} rows appends") - print(f"{stage_bulk_writer.buffer_row_count} rows in buffer not flushed") + print(f"{volume_bulk_writer.total_row_count} rows appends") + print(f"{volume_bulk_writer.buffer_row_count} rows in buffer not flushed") print("Generate data files...") - stage_bulk_writer.commit() - print(f"Data files have been uploaded: {stage_bulk_writer.batch_files}") - return stage_bulk_writer.get_stage_upload_result() + volume_bulk_writer.commit() + print(f"Data files have been uploaded: {volume_bulk_writer.batch_files}") + return volume_bulk_writer.get_volume_upload_result() def retrieve_imported_data(): @@ -217,7 +217,7 @@ def retrieve_imported_data(): print(item) -def call_stage_import(stage_name: str, path: str): +def call_volume_import(volume_name: str, path: str): print(f"\n===================== import files to cluster ====================") resp = bulk_import( url=CLOUD_ENDPOINT, @@ -225,7 +225,7 @@ def call_stage_import(stage_name: str, path: str): cluster_id=CLUSTER_ID, db_name=DB_NAME, collection_name=COLLECTION_NAME, - stage_name=stage_name, + volume_name=volume_name, data_paths=[[path]] ) print(resp.json()) @@ -270,4 +270,4 @@ def call_stage_import(stage_name: str, path: str): if __name__ == '__main__': create_connection() - example_collection_remote_stage(file_type=BulkFileType.PARQUET) + example_collection_remote_volume(file_type=BulkFileType.PARQUET) diff --git a/examples/bulk_import/example_stage_file_manager.py b/examples/bulk_import/example_stage_file_manager.py deleted file mode 100644 index f8f0163ec..000000000 --- a/examples/bulk_import/example_stage_file_manager.py +++ /dev/null @@ -1,12 +0,0 @@ -from pymilvus.bulk_writer.constants import ConnectType -from pymilvus.bulk_writer.stage_file_manager import StageFileManager - -if __name__ == "__main__": - stage_file_manager = StageFileManager( - cloud_endpoint='https://api.cloud.zilliz.com', - api_key='_api_key_for_cluster_org_', - stage_name='_stage_name_for_project_', - connect_type=ConnectType.AUTO, - ) - result = stage_file_manager.upload_file_to_stage("/Users/zilliz/data/", "data/") - print(f"\nuploadFileToStage results: {result}") diff --git a/examples/bulk_import/example_stage_manager.py b/examples/bulk_import/example_stage_manager.py deleted file mode 100644 index da51c221d..000000000 --- a/examples/bulk_import/example_stage_manager.py +++ /dev/null @@ -1,20 +0,0 @@ -from pymilvus.bulk_writer.stage_manager import StageManager - -PROJECT_ID = "_id_for_project_" -REGION_ID = "_id_for_region_" -STAGE_NAME = "_stage_name_for_project_" - -if __name__ == "__main__": - stage_manager = StageManager( - cloud_endpoint="https://api.cloud.zilliz.com", - api_key="_api_key_for_cluster_org_", - ) - - stage_manager.create_stage(PROJECT_ID, REGION_ID, STAGE_NAME) - print(f"\nStage {STAGE_NAME} created") - - stage_list = stage_manager.list_stages(PROJECT_ID, 1, 10) - print(f"\nlistStages results: ", stage_list.json()['data']) - - stage_manager.delete_stage(STAGE_NAME) - print(f"\nStage {STAGE_NAME} deleted") diff --git a/examples/bulk_import/example_volume_file_manager.py b/examples/bulk_import/example_volume_file_manager.py new file mode 100644 index 000000000..ebc157de8 --- /dev/null +++ b/examples/bulk_import/example_volume_file_manager.py @@ -0,0 +1,12 @@ +from pymilvus.bulk_writer.constants import ConnectType +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager + +if __name__ == "__main__": + volume_file_manager = VolumeFileManager( + cloud_endpoint='https://api.cloud.zilliz.com', + api_key='_api_key_for_cluster_org_', + volume_name='_volume_name_for_project_', + connect_type=ConnectType.AUTO, + ) + result = volume_file_manager.upload_file_to_volume("/Users/zilliz/data/", "data/") + print(f"\nuploadFileToVolume results: {result}") diff --git a/examples/bulk_import/example_volume_manager.py b/examples/bulk_import/example_volume_manager.py new file mode 100644 index 000000000..e36255cb7 --- /dev/null +++ b/examples/bulk_import/example_volume_manager.py @@ -0,0 +1,20 @@ +from pymilvus.bulk_writer.volume_manager import VolumeManager + +PROJECT_ID = "_id_for_project_" +REGION_ID = "_id_for_region_" +VOLUME_NAME = "_volume_name_for_project_" + +if __name__ == "__main__": + volume_manager = VolumeManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="_api_key_for_cluster_org_", + ) + + volume_manager.create_volume(PROJECT_ID, REGION_ID, VOLUME_NAME) + print(f"\nVolume {VOLUME_NAME} created") + + volume_list = volume_manager.list_volumes(PROJECT_ID, 1, 10) + print(f"\nlistVolumes results: ", volume_list.json()['data']) + + volume_manager.delete_volume(VOLUME_NAME) + print(f"\nVolume {VOLUME_NAME} deleted") diff --git a/pymilvus/bulk_writer/bulk_import.py b/pymilvus/bulk_writer/bulk_import.py index eb51f9469..d00dba28d 100644 --- a/pymilvus/bulk_writer/bulk_import.py +++ b/pymilvus/bulk_writer/bulk_import.py @@ -114,7 +114,7 @@ def bulk_import( access_key: str = "", secret_key: str = "", token: str = "", - stage_name: str = "", + volume_name: str = "", data_paths: [List[List[str]]] = None, verify: Optional[Union[bool, str]] = True, cert: Optional[Union[str, tuple]] = None, @@ -139,7 +139,7 @@ def bulk_import( secret_key (str): secret key to access the object storage(cloud) token (str): access token to access the object storage(cloud) - stage_name (str): name of the stage to import(cloud) + volume_name (str): name of the volume to import(cloud) data_paths (list of list of str): The paths of files that contain the data to import(cloud) verify (bool, str, optional): Either a boolean, to verify the server's TLS certificate or a string, which must be server's certificate path. Defaults to `True`. @@ -181,7 +181,7 @@ def bulk_import( ... token="your-token" # for short-term credentials, also include `token` ... ) - >>> # 3. Import multiple files or folders from a Zilliz Stage into a Zilliz Cloud instance + >>> # 3. Import multiple files or folders from a Zilliz Volume into a Zilliz Cloud instance >>> bulk_import( ... url="https://api.cloud.zilliz.com", # If regions in China, it is: https://api.cloud.zilliz.com.cn ... api_key="YOUR_API_KEY", @@ -189,7 +189,7 @@ def bulk_import( ... db_name="", # Only For Dedicated deployments: this parameter can be specified. ... collection_name="my_collection", ... partition_name="", # If Collection not enable partitionKey, can be specified. - ... stage_name="my_stage", + ... volume_name="my_volume", ... data_paths=[ ... ["parquet-folder/1.parquet"], ... ["parquet-folder-2/"] @@ -210,7 +210,7 @@ def bulk_import( "accessKey": access_key, "secretKey": secret_key, "token": token, - "stageName": stage_name, + "volumeName": volume_name, "dataPaths": data_paths, } diff --git a/pymilvus/bulk_writer/stage_manager.py b/pymilvus/bulk_writer/stage_manager.py deleted file mode 100644 index bd02a766d..000000000 --- a/pymilvus/bulk_writer/stage_manager.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging - -from pymilvus.bulk_writer.stage_restful import create_stage, delete_stage, list_stages - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -class StageManager: - def __init__(self, cloud_endpoint: str, api_key: str): - """ - private preview feature. Please submit a request and contact us if you need it. - - Args: - cloud_endpoint (str): The fixed cloud endpoint URL. - - For international regions: https://api.cloud.zilliz.com - - For regions in China: https://api.cloud.zilliz.com.cn - api_key (str): The API key associated with your organization or cluster. - """ - self.cloud_endpoint = cloud_endpoint - self.api_key = api_key - - def create_stage(self, project_id: str, region_id: str, stage_name: str): - """ - Create a stage under the specified project and regionId. - """ - create_stage(self.cloud_endpoint, self.api_key, project_id, region_id, stage_name) - - def delete_stage(self, stage_name: str): - """ - Delete a stage. - """ - delete_stage(self.cloud_endpoint, self.api_key, stage_name) - - def list_stages(self, project_id: str, current_page: int = 1, page_size: int = 10): - """ - Paginated query of the stage list under a specified projectId. - """ - return list_stages(self.cloud_endpoint, self.api_key, project_id, current_page, page_size) diff --git a/pymilvus/bulk_writer/stage_bulk_writer.py b/pymilvus/bulk_writer/volume_bulk_writer.py similarity index 82% rename from pymilvus/bulk_writer/stage_bulk_writer.py rename to pymilvus/bulk_writer/volume_bulk_writer.py index bfbde6c25..462cb9562 100644 --- a/pymilvus/bulk_writer/stage_bulk_writer.py +++ b/pymilvus/bulk_writer/volume_bulk_writer.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional -from pymilvus.bulk_writer.stage_file_manager import StageFileManager +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager from pymilvus.orm.schema import CollectionSchema from .constants import MB, BulkFileType, ConnectType @@ -12,8 +12,8 @@ logger = logging.getLogger(__name__) -class StageBulkWriter(LocalBulkWriter): - """StageBulkWriter handles writing local bulk files to a remote stage.""" +class VolumeBulkWriter(LocalBulkWriter): + """VolumeBulkWriter handles writing local bulk files to a remote volume.""" def __init__( self, @@ -21,7 +21,7 @@ def __init__( remote_path: str, cloud_endpoint: str, api_key: str, - stage_name: str, + volume_name: str, chunk_size: int = 1024 * MB, file_type: BulkFileType = BulkFileType.PARQUET, config: Optional[dict] = None, @@ -33,11 +33,11 @@ def __init__( remote_dir_path = Path(remote_path) / super().uuid self._remote_path = str(remote_dir_path) + "/" self._remote_files: List[List[str]] = [] - self._stage_name = stage_name - self._stage_file_manager = StageFileManager( + self._volume_name = volume_name + self._volume_file_manager = VolumeFileManager( cloud_endpoint=cloud_endpoint, api_key=api_key, - stage_name=stage_name, + volume_name=volume_name, connect_type=ConnectType.AUTO, ) @@ -50,7 +50,7 @@ def append_row(self, row: Dict[str, Any], **kwargs): super().append_row(row, **kwargs) def commit(self, **kwargs): - """Commit local bulk files and upload to remote stage.""" + """Commit local bulk files and upload to remote volume.""" super().commit(call_back=self._upload) @property @@ -61,8 +61,8 @@ def data_path(self) -> str: def batch_files(self) -> List[List[str]]: return self._remote_files - def get_stage_upload_result(self) -> Dict[str, str]: - return {"stage_name": self._stage_name, "path": str(self._remote_path)} + def get_volume_upload_result(self) -> Dict[str, str]: + return {"volume_name": self._volume_name, "path": str(self._remote_path)} def __exit__(self, exc_type: object, exc_val: object, exc_tb: object): super().__exit__(exc_type, exc_val, exc_tb) @@ -84,7 +84,7 @@ def _local_rm(self, file_path: str): logger.warning(f"Failed to delete local file: {file_path}") def _upload(self, file_list: List[str]) -> List[str]: - """Upload files to remote stage and remove local copies.""" + """Upload files to remote volume and remove local copies.""" uploaded_files: List[str] = [] for file_path in file_list: @@ -105,5 +105,5 @@ def _upload(self, file_list: List[str]) -> List[str]: def _upload_object(self, file_path: str, object_name: str): logger.info(f"Prepare to upload '{file_path}' to '{object_name}'") - self._stage_file_manager.upload_file_to_stage(file_path, self._remote_path) + self._volume_file_manager.upload_file_to_volume(file_path, self._remote_path) logger.info(f"Uploaded file '{file_path}' to '{object_name}'") diff --git a/pymilvus/bulk_writer/stage_file_manager.py b/pymilvus/bulk_writer/volume_file_manager.py similarity index 70% rename from pymilvus/bulk_writer/stage_file_manager.py rename to pymilvus/bulk_writer/volume_file_manager.py index f5b474e32..8320c1152 100644 --- a/pymilvus/bulk_writer/stage_file_manager.py +++ b/pymilvus/bulk_writer/volume_file_manager.py @@ -15,7 +15,7 @@ from pymilvus.bulk_writer.constants import ConnectType from pymilvus.bulk_writer.endpoint_resolver import EndpointResolver from pymilvus.bulk_writer.file_utils import FileUtils -from pymilvus.bulk_writer.stage_restful import apply_stage +from pymilvus.bulk_writer.volume_restful import apply_volume logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -56,23 +56,21 @@ def _url_open( ) -class StageFileManager: +class VolumeFileManager: def __init__( self, cloud_endpoint: str, api_key: str, - stage_name: str, + volume_name: str, connect_type: ConnectType = ConnectType.AUTO, ): """ - private preview feature. Please submit a request and contact us if you need it. - Args: cloud_endpoint (str): The fixed cloud endpoint URL. - For international regions: https://api.cloud.zilliz.com - For regions in China: https://api.cloud.zilliz.com.cn api_key (str): The API key associated with your organization - stage_name (str): The name of the Stage. + volume_name (str): The name of the Volume. connect_type: Current value is mainly for Aliyun OSS buckets, default is Auto. - Default case, if the OSS bucket is reachable via the internal endpoint, the internal endpoint will be used @@ -81,11 +79,11 @@ def __init__( """ self.cloud_endpoint = cloud_endpoint self.api_key = api_key - self.stage_name = stage_name + self.volume_name = volume_name self.connect_type = connect_type self.local_file_paths = [] self.total_bytes = 0 - self.stage_info = {} + self.volume_info = {} self._client = None def _convert_dir_path(self, input_path: str): @@ -95,19 +93,19 @@ def _convert_dir_path(self, input_path: str): return input_path return input_path + "/" - def _refresh_stage_and_client(self, path: str): - logger.info("refreshing stage info...") - response = apply_stage(self.cloud_endpoint, self.api_key, self.stage_name, path) - self.stage_info = response.json()["data"] - logger.info("stage info refreshed.") + def _refresh_volume_and_client(self, path: str): + logger.info("refreshing volume info...") + response = apply_volume(self.cloud_endpoint, self.api_key, self.volume_name, path) + self.volume_info = response.json()["data"] + logger.info("volume info refreshed.") - creds = self.stage_info["credentials"] + creds = self.volume_info["credentials"] http_client = urllib3.PoolManager(maxsize=100) - cloud = self.stage_info["cloud"] - region = self.stage_info["region"] + cloud = self.volume_info["cloud"] + region = self.volume_info["region"] endpoint = EndpointResolver.resolve_endpoint( - self.stage_info["endpoint"], + self.volume_info["endpoint"], cloud, region, self.connect_type, @@ -136,34 +134,45 @@ def _refresh_stage_and_client(self, path: str): def _validate_size(self): file_size_total = self.total_bytes - file_size_limit = self.stage_info["condition"]["maxContentLength"] + file_size_limit = self.volume_info["condition"]["maxContentLength"] if file_size_total > file_size_limit: error_message = ( f"localFileTotalSize {file_size_total} exceeds " f"the maximum contentLength limit {file_size_limit} defined in the condition." - f"If you want to upload larger files, please contact us to lift the restriction." + f"If you are using the free tier, " + f"you may switch to the pay-as-you-go volume plan to support uploading larger files." + ) + raise ValueError(error_message) + + file_number_limit = self.volume_info["condition"].get("maxFileNumber") + if file_number_limit is not None and len(self.local_file_paths) > file_number_limit: + error_message = ( + f"localFileTotalNumber {len(self.local_file_paths)} exceeds " + f"the maximum fileNumber limit {file_number_limit} defined in the condition." + f"If you are using the free tier, " + f"you may switch to the pay-as-you-go volume plan to support uploading larger files." ) raise ValueError(error_message) - def upload_file_to_stage(self, source_file_path: str, target_stage_path: str): + def upload_file_to_volume(self, source_file_path: str, target_volume_path: str): """ - uploads a local file or directory to the specified path within the Stage. + uploads a local file or directory to the specified path within the Volume. Args: source_file_path: the source local file or directory path - target_stage_path: the target directory path in the Stage + target_volume_path: the target directory path in the Volume Raises: Exception: If an error occurs during the upload process. """ self.local_file_paths, self.total_bytes = FileUtils.process_local_path(source_file_path) - stage_path = self._convert_dir_path(target_stage_path) - self._refresh_stage_and_client(stage_path) + volume_path = self._convert_dir_path(target_volume_path) + self._refresh_volume_and_client(volume_path) self._validate_size() file_count = len(self.local_file_paths) logger.info( - f"begin to upload file to stage, localDirOrFilePath:{source_file_path}, fileCount:{file_count} to stageName:{self.stage_name}, stagePath:{stage_path}" + f"begin to upload file to volume, localDirOrFilePath:{source_file_path}, fileCount:{file_count} to volumeName:{self.volume_name}, volumePath:{volume_path}" ) start_time = time.time() @@ -172,7 +181,7 @@ def upload_file_to_stage(self, source_file_path: str, target_stage_path: str): root_path = Path(source_file_path).resolve() uploaded_bytes_lock = threading.Lock() - def _upload_task(file_path: str, root_path: Path, stage_path: str): + def _upload_task(file_path: str, root_path: Path, volume_path: str): nonlocal uploaded_bytes nonlocal uploaded_count path_obj = Path(file_path).resolve() @@ -181,13 +190,13 @@ def _upload_task(file_path: str, root_path: Path, stage_path: str): else: relative_path = path_obj.relative_to(root_path).as_posix() - stage_prefix = f"{self.stage_info['stagePrefix']}" + volume_prefix = f"{self.volume_info['volumePrefix']}" file_start_time = time.time() try: size = Path(file_path).stat().st_size logger.info(f"uploading file, fileName:{file_path}, size:{size} bytes") - remote_file_path = stage_prefix + stage_path + relative_path - self._put_object(file_path, remote_file_path, stage_path) + remote_file_path = volume_prefix + volume_path + relative_path + self._put_object(file_path, remote_file_path, volume_path) with uploaded_bytes_lock: uploaded_bytes += size uploaded_count += 1 @@ -203,36 +212,36 @@ def _upload_task(file_path: str, root_path: Path, stage_path: str): with ThreadPoolExecutor(max_workers=20) as executor: futures = [] for _, file_path in enumerate(self.local_file_paths): - futures.append(executor.submit(_upload_task, file_path, root_path, stage_path)) + futures.append(executor.submit(_upload_task, file_path, root_path, volume_path)) for f in futures: f.result() # wait for all total_elapsed = time.time() - start_time logger.info( - f"All files in {source_file_path} uploaded to stage, " - f"stageName:{self.stage_info['stageName']}, stagePath: {stage_path}, " + f"All files in {source_file_path} uploaded to volume, " + f"volumeName:{self.volume_info['volumeName']}, volumePath: {volume_path}, " f"totalFileCount:{file_count}, totalFileSize:{self.total_bytes}, cost time:{total_elapsed}s" ) - return {"stageName": self.stage_info["stageName"], "path": stage_path} + return {"volumeName": self.volume_info["volumeName"], "path": volume_path} - def _put_object(self, file_path: str, remote_file_path: str, stage_path: str): - expire_time_str = self.stage_info["credentials"]["expireTime"] + def _put_object(self, file_path: str, remote_file_path: str, volume_path: str): + expire_time_str = self.volume_info["credentials"]["expireTime"] expire_time = datetime.fromisoformat(expire_time_str.replace("Z", "+00:00")) now = datetime.now(timezone.utc) if now > expire_time: - self._refresh_stage_and_client(stage_path) + self._refresh_volume_and_client(volume_path) - self._upload_with_retry(file_path, remote_file_path, stage_path) + self._upload_with_retry(file_path, remote_file_path, volume_path) def _upload_with_retry( - self, file_path: str, object_name: str, stage_path: str, max_retries: int = 5 + self, file_path: str, object_name: str, volume_path: str, max_retries: int = 5 ): attempt = 0 while attempt < max_retries: try: self._client.fput_object( - bucket_name=self.stage_info["bucketName"], + bucket_name=self.volume_info["bucketName"], object_name=object_name, file_path=file_path, ) @@ -240,7 +249,7 @@ def _upload_with_retry( except Exception as e: attempt += 1 logger.warning(f"Attempt {attempt} failed to upload {file_path}: {e}") - self._refresh_stage_and_client(stage_path) + self._refresh_volume_and_client(volume_path) if attempt == max_retries: error_message = f"Upload failed after {max_retries} attempts" diff --git a/pymilvus/bulk_writer/volume_manager.py b/pymilvus/bulk_writer/volume_manager.py new file mode 100644 index 000000000..4a94f6220 --- /dev/null +++ b/pymilvus/bulk_writer/volume_manager.py @@ -0,0 +1,37 @@ +import logging + +from pymilvus.bulk_writer.volume_restful import create_volume, delete_volume, list_volumes + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class VolumeManager: + def __init__(self, cloud_endpoint: str, api_key: str): + """ + Args: + cloud_endpoint (str): The fixed cloud endpoint URL. + - For international regions: https://api.cloud.zilliz.com + - For regions in China: https://api.cloud.zilliz.com.cn + api_key (str): The API key associated with your organization or cluster. + """ + self.cloud_endpoint = cloud_endpoint + self.api_key = api_key + + def create_volume(self, project_id: str, region_id: str, volume_name: str): + """ + Create a volume under the specified project and regionId. + """ + create_volume(self.cloud_endpoint, self.api_key, project_id, region_id, volume_name) + + def delete_volume(self, volume_name: str): + """ + Delete a volume. + """ + delete_volume(self.cloud_endpoint, self.api_key, volume_name) + + def list_volumes(self, project_id: str, current_page: int = 1, page_size: int = 10): + """ + Paginated query of the volume list under a specified projectId. + """ + return list_volumes(self.cloud_endpoint, self.api_key, project_id, current_page, page_size) diff --git a/pymilvus/bulk_writer/stage_restful.py b/pymilvus/bulk_writer/volume_restful.py similarity index 86% rename from pymilvus/bulk_writer/stage_restful.py rename to pymilvus/bulk_writer/volume_restful.py index 08d0558eb..77e5fc6d2 100644 --- a/pymilvus/bulk_writer/stage_restful.py +++ b/pymilvus/bulk_writer/volume_restful.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -def list_stages( +def list_volumes( url: str, api_key: str, project_id: str, @@ -16,7 +16,7 @@ def list_stages( page_size: int = 10, **kwargs, ) -> requests.Response: - """call listStages restful interface to list stages of project + """call listVolumes restful interface to list volumes of project Args: url (str): url of the server @@ -28,7 +28,7 @@ def list_stages( Returns: response of the restful interface """ - request_url = url + "/v2/stages" + request_url = url + "/v2/volumes" params = {"projectId": project_id, "currentPage": current_page, "pageSize": page_size} @@ -37,32 +37,32 @@ def list_stages( return resp -def create_stage( +def create_volume( url: str, api_key: str, project_id: str, region_id: str, - stage_name: str, + volume_name: str, **kwargs, ) -> requests.Response: - """call createStage restful interface to create new stage + """call createVolume restful interface to create new volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. project_id (str): id of the project region_id (str): id of the region - stage_name (str): name of the stage + volume_name (str): name of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/create" + request_url = url + "/v2/volumes/create" params = { "projectId": project_id, "regionId": region_id, - "stageName": stage_name, + "volumeName": volume_name, } resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs) @@ -70,50 +70,50 @@ def create_stage( return resp -def delete_stage( +def delete_volume( url: str, api_key: str, - stage_name: str, + volume_name: str, **kwargs, ) -> requests.Response: - """call deleteStage restful interface to create stage + """call deleteVolume restful interface to delete volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. - stage_name (str): name of the stage + volume_name (str): name of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/" + stage_name + request_url = url + "/v2/volumes/" + volume_name resp = _delete_request(url=request_url, api_key=api_key, **kwargs) _handle_response(request_url, resp.json()) return resp -def apply_stage( +def apply_volume( url: str, api_key: str, - stage_name: str, + volume_name: str, path: str, **kwargs, ) -> requests.Response: - """call applyStage restful interface to apply cred of stage + """call applyVolume restful interface to apply cred of volume Args: url (str): url of the server api_key (str): API key to authenticate your requests. - stage_name (str): name of the stage - path(str): path of the stage + volume_name (str): name of the volume + path(str): path of the volume Returns: response of the restful interface """ - request_url = url + "/v2/stages/apply" + request_url = url + "/v2/volumes/apply" - params = {"stageName": stage_name, "path": path} + params = {"volumeName": volume_name, "path": path} resp = _post_request(url=request_url, api_key=api_key, params=params, **kwargs) _handle_response(request_url, resp.json()) diff --git a/pyproject.toml b/pyproject.toml index 755e2efd4..6b4e9d4cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,13 @@ dev = [ "pytest-asyncio", "ruff>=0.12.9,<1", "black", + # develop bulk_writer + "requests", + "minio>=7.0.0", + "pyarrow>=12.0.0", + "azure-storage-blob", + "urllib3", + "scipy", ] [tool.setuptools.dynamic] diff --git a/tests/test_bulk_writer_stage.py b/tests/test_bulk_writer_stage.py new file mode 100644 index 000000000..4bd5021a8 --- /dev/null +++ b/tests/test_bulk_writer_stage.py @@ -0,0 +1,626 @@ +import tempfile +from pathlib import Path +from typing import Any, Dict +from unittest.mock import MagicMock, Mock, patch + +import pytest +import requests +from pymilvus.bulk_writer.constants import BulkFileType, ConnectType +from pymilvus.bulk_writer.volume_bulk_writer import VolumeBulkWriter +from pymilvus.bulk_writer.volume_file_manager import VolumeFileManager +from pymilvus.bulk_writer.volume_manager import VolumeManager +from pymilvus.bulk_writer.volume_restful import ( + apply_volume, + create_volume, + delete_volume, + list_volumes, +) +from pymilvus.client.types import DataType +from pymilvus.exceptions import MilvusException +from pymilvus.orm.schema import CollectionSchema, FieldSchema + + +class TestVolumeRestful: + """Test volume RESTful API functions.""" + + @pytest.fixture + def mock_response(self) -> Mock: + """Create a mock response object.""" + response = Mock(spec=requests.Response) + response.status_code = 200 + response.json.return_value = {"code": 0, "message": "success", "data": {}} + return response + + @pytest.fixture + def api_params(self) -> Dict[str, str]: + """Common API parameters.""" + return { + "url": "https://api.cloud.zilliz.com", + "api_key": "test_api_key", + } + + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_list_volumes_success( + self, mock_get: Mock, mock_response: Mock, api_params: Dict[str, str] + ) -> None: + """Test successful list_volumes call.""" + mock_get.return_value = mock_response + mock_response.json.return_value = { + "code": 0, + "message": "success", + "data": {"volumes": ["volume1", "volume2"]}, + } + + response = list_volumes( + **api_params, + project_id="test_project", + current_page=1, + page_size=10, + ) + + assert response.status_code == 200 + assert response.json()["data"]["volumes"] == ["volume1", "volume2"] + mock_get.assert_called_once() + + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_list_volumes_failure( + self, mock_get: Mock, mock_response: Mock, api_params: Dict[str, str] + ) -> None: + """Test failed list_volumes call.""" + mock_response.json.return_value = { + "code": 1001, + "message": "Invalid API key", + "data": {}, + } + mock_get.return_value = mock_response + + with pytest.raises(MilvusException, match="Invalid API key"): + list_volumes(**api_params, project_id="test_project") + + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + def test_create_volume_success( + self, mock_post: Mock, mock_response: Mock, api_params: Dict[str, str] + ) -> None: + """Test successful create_volume call.""" + mock_post.return_value = mock_response + mock_response.json.return_value = { + "code": 0, + "message": "success", + "data": {"volumeId": "volume123"}, + } + + response = create_volume( + **api_params, + project_id="test_project", + region_id="us-west-2", + volume_name="test_volume", + ) + + assert response.status_code == 200 + assert response.json()["data"]["volumeId"] == "volume123" + mock_post.assert_called_once() + + @patch("pymilvus.bulk_writer.volume_restful.requests.delete") + def test_delete_volume_success( + self, mock_delete: Mock, mock_response: Mock, api_params: Dict[str, str] + ) -> None: + """Test successful delete_volume call.""" + mock_delete.return_value = mock_response + + response = delete_volume(**api_params, volume_name="test_volume") + + assert response.status_code == 200 + mock_delete.assert_called_once() + + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + def test_apply_volume_success( + self, mock_post: Mock, mock_response: Mock, api_params: Dict[str, str] + ) -> None: + """Test successful apply_volume call.""" + mock_post.return_value = mock_response + mock_response.json.return_value = { + "code": 0, + "message": "success", + "data": { + "volumeName": "test_volume", + "volumePrefix": "prefix/", + "endpoint": "s3.amazonaws.com", + "bucketName": "test-bucket", + "region": "us-west-2", + "cloud": "aws", + "condition": {"maxContentLength": 1073741824}, + "credentials": { + "tmpAK": "test_access_key", + "tmpSK": "test_secret_key", + "sessionToken": "test_token", + "expireTime": "2024-12-31T23:59:59Z", + }, + }, + } + + response = apply_volume( + **api_params, + volume_name="test_volume", + path="data/", + ) + + assert response.status_code == 200 + data = response.json()["data"] + assert data["volumeName"] == "test_volume" + assert data["endpoint"] == "s3.amazonaws.com" + mock_post.assert_called_once() + + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_http_error_handling( + self, mock_get: Mock, api_params: Dict[str, str] + ) -> None: + """Test HTTP error handling.""" + mock_get.return_value.status_code = 404 + + with pytest.raises(MilvusException, match="status code: 404"): + list_volumes(**api_params, project_id="test_project") + + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_network_error_handling( + self, mock_get: Mock, api_params: Dict[str, str] + ) -> None: + """Test network error handling.""" + mock_get.side_effect = requests.exceptions.ConnectionError("Network error") + + with pytest.raises(MilvusException, match="Network error"): + list_volumes(**api_params, project_id="test_project") + + +class TestVolumeManager: + """Test VolumeManager class.""" + + @pytest.fixture + def volume_manager(self) -> VolumeManager: + """Create a VolumeManager instance.""" + return VolumeManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + ) + + @patch("pymilvus.bulk_writer.volume_manager.create_volume") + def test_create_volume(self, mock_create: Mock, volume_manager: VolumeManager) -> None: + """Test creating a volume.""" + volume_manager.create_volume( + project_id="test_project", + region_id="us-west-2", + volume_name="test_volume", + ) + + mock_create.assert_called_once_with( + volume_manager.cloud_endpoint, + volume_manager.api_key, + "test_project", + "us-west-2", + "test_volume", + ) + + @patch("pymilvus.bulk_writer.volume_manager.delete_volume") + def test_delete_volume(self, mock_delete: Mock, volume_manager: VolumeManager) -> None: + """Test deleting a volume.""" + volume_manager.delete_volume(volume_name="test_volume") + + mock_delete.assert_called_once_with( + volume_manager.cloud_endpoint, + volume_manager.api_key, + "test_volume", + ) + + @patch("pymilvus.bulk_writer.volume_manager.list_volumes") + def test_list_volumes(self, mock_list: Mock, volume_manager: VolumeManager) -> None: + """Test listing volumes.""" + mock_response = Mock() + mock_response.json.return_value = {"data": {"volumes": ["volume1", "volume2"]}} + mock_list.return_value = mock_response + + result = volume_manager.list_volumes(project_id="test_project", current_page=1, page_size=10) + + assert result.json()["data"]["volumes"] == ["volume1", "volume2"] + mock_list.assert_called_once_with( + volume_manager.cloud_endpoint, + volume_manager.api_key, + "test_project", + 1, + 10, + ) + + +class TestVolumeFileManager: + """Test VolumeFileManager class.""" + + @pytest.fixture + def volume_file_manager(self) -> VolumeFileManager: + """Create a VolumeFileManager instance.""" + return VolumeFileManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + volume_name="test_volume", + connect_type=ConnectType.AUTO, + ) + + @pytest.fixture + def mock_volume_info(self) -> Dict[str, Any]: + """Mock volume information.""" + return { + "volumeName": "test_volume", + "volumePrefix": "prefix/", + "endpoint": "s3.amazonaws.com", + "bucketName": "test-bucket", + "region": "us-west-2", + "cloud": "aws", + "condition": {"maxContentLength": 1073741824}, + "credentials": { + "tmpAK": "test_access_key", + "tmpSK": "test_secret_key", + "sessionToken": "test_token", + "expireTime": "2099-12-31T23:59:59Z", + }, + } + + def test_convert_dir_path(self, volume_file_manager: VolumeFileManager) -> None: + """Test directory path conversion.""" + assert volume_file_manager._convert_dir_path("") == "" + assert volume_file_manager._convert_dir_path("/") == "" + assert volume_file_manager._convert_dir_path("data") == "data/" + assert volume_file_manager._convert_dir_path("data/") == "data/" + + @patch("pymilvus.bulk_writer.volume_file_manager.apply_volume") + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") + def test_refresh_volume_and_client( + self, + mock_minio: Mock, + mock_apply: Mock, + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], + ) -> None: + """Test refreshing volume info and client.""" + mock_response = Mock() + mock_response.json.return_value = {"data": mock_volume_info} + mock_apply.return_value = mock_response + + volume_file_manager._refresh_volume_and_client("data/") + + assert volume_file_manager.volume_info == mock_volume_info + mock_apply.assert_called_once() + mock_minio.assert_called_once() + + def test_validate_size_success( + self, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] + ) -> None: + """Test successful size validation.""" + volume_file_manager.volume_info = mock_volume_info + volume_file_manager.total_bytes = 1000000 # 1MB + + # Should not raise any exception + volume_file_manager._validate_size() + + def test_validate_size_failure( + self, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] + ) -> None: + """Test size validation failure.""" + volume_file_manager.volume_info = mock_volume_info + volume_file_manager.total_bytes = 2147483648 # 2GB + + with pytest.raises(ValueError, match="exceeds the maximum contentLength limit"): + volume_file_manager._validate_size() + + @patch("pymilvus.bulk_writer.volume_file_manager.FileUtils.process_local_path") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") + @patch.object(VolumeFileManager, "_validate_size") + @patch.object(VolumeFileManager, "_put_object") + def test_upload_file_to_volume( + self, + mock_put_object: Mock, + mock_validate: Mock, + mock_refresh: Mock, + mock_process: Mock, + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], + ) -> None: + """Test uploading file to volume.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create test files + test_file = Path(temp_dir) / "test.txt" + test_file.write_text("test content") + + mock_process.return_value = ([str(test_file)], 12) + volume_file_manager.volume_info = mock_volume_info + + result = volume_file_manager.upload_file_to_volume(str(test_file), "data/") + + assert result["volumeName"] == "test_volume" + assert result["path"] == "data/" + mock_refresh.assert_called_once_with("data/") + mock_validate.assert_called_once() + + @patch.object(VolumeFileManager, "_upload_with_retry") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") + def test_put_object_refresh_on_expiry( + self, + mock_refresh: Mock, + mock_upload: Mock, + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], + ) -> None: + """Test that credentials are refreshed when expired.""" + # Set expired credentials + expired_info = mock_volume_info.copy() + expired_info["credentials"]["expireTime"] = "2020-01-01T00:00:00Z" + volume_file_manager.volume_info = expired_info + + volume_file_manager._put_object("test.txt", "remote/test.txt", "data/") + + mock_refresh.assert_called_once_with("data/") + mock_upload.assert_called_once() + + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") + def test_upload_with_retry_success( + self, mock_minio: Mock, volume_file_manager: VolumeFileManager, mock_volume_info: Dict[str, Any] + ) -> None: + """Test successful upload with retry.""" + volume_file_manager.volume_info = mock_volume_info + volume_file_manager._client = mock_minio.return_value + + volume_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/") + + volume_file_manager._client.fput_object.assert_called_once_with( + bucket_name="test-bucket", + object_name="remote/test.txt", + file_path="test.txt", + ) + + @patch("pymilvus.bulk_writer.volume_file_manager.Minio") + @patch.object(VolumeFileManager, "_refresh_volume_and_client") + def test_upload_with_retry_failure( + self, + mock_refresh: Mock, + mock_minio: Mock, + volume_file_manager: VolumeFileManager, + mock_volume_info: Dict[str, Any], + ) -> None: + """Test upload failure after max retries.""" + volume_file_manager.volume_info = mock_volume_info + mock_client = mock_minio.return_value + mock_client.fput_object.side_effect = Exception("Upload failed") + volume_file_manager._client = mock_client + + with pytest.raises(RuntimeError, match="Upload failed after 2 attempts"): + volume_file_manager._upload_with_retry("test.txt", "remote/test.txt", "data/", max_retries=2) + + assert mock_client.fput_object.call_count == 2 + assert mock_refresh.call_count == 2 # Refreshed on each retry + + +class TestVolumeBulkWriter: + """Test VolumeBulkWriter class.""" + + @pytest.fixture + def simple_schema(self) -> CollectionSchema: + """Create a simple collection schema.""" + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=512), + ] + return CollectionSchema(fields=fields) + + @pytest.fixture + def volume_bulk_writer(self, simple_schema: CollectionSchema) -> VolumeBulkWriter: + """Create a VolumeBulkWriter instance.""" + with patch("pymilvus.bulk_writer.volume_bulk_writer.VolumeFileManager"): + return VolumeBulkWriter( + schema=simple_schema, + remote_path="test/data", + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + volume_name="test_volume", + chunk_size=1024, + file_type=BulkFileType.PARQUET, + ) + + def test_init(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test VolumeBulkWriter initialization.""" + assert volume_bulk_writer._remote_path.endswith("/") + assert volume_bulk_writer._volume_name == "test_volume" + assert isinstance(volume_bulk_writer._volume_file_manager, MagicMock) + + def test_append_row(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test appending a row.""" + row = { + "id": 1, + "vector": [1.0] * 128, + "text": "test text", + } + volume_bulk_writer.append_row(row) + assert volume_bulk_writer.total_row_count == 1 + + @patch.object(VolumeBulkWriter, "_upload") + def test_commit(self, mock_upload: Mock, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test committing data.""" + # Add some data + for i in range(10): + volume_bulk_writer.append_row({ + "id": i, + "vector": [float(i)] * 128, + "text": f"text_{i}", + }) + + # Mock the upload to return file paths + mock_upload.return_value = ["file1.parquet", "file2.parquet"] + + # Commit the data + volume_bulk_writer.commit() + + # Upload should have been called during commit + assert mock_upload.called + + def test_data_path_property(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test data_path property.""" + assert isinstance(volume_bulk_writer.data_path, str) + assert "/" in volume_bulk_writer.data_path + + def test_batch_files_property(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test batch_files property.""" + assert volume_bulk_writer.batch_files == [] + volume_bulk_writer._remote_files = [["file1.parquet"], ["file2.parquet"]] + assert volume_bulk_writer.batch_files == [["file1.parquet"], ["file2.parquet"]] + + def test_get_volume_upload_result(self, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test getting volume upload result.""" + result = volume_bulk_writer.get_volume_upload_result() + assert result["volume_name"] == "test_volume" + assert "path" in result + + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") + def test_local_rm(self, mock_path: Mock, volume_bulk_writer: VolumeBulkWriter) -> None: + """Test local file removal.""" + # Test successful removal + mock_file = mock_path.return_value + mock_file.parent.iterdir.return_value = [] + + volume_bulk_writer._local_rm("test_file.parquet") + + mock_file.unlink.assert_called_once() + + @patch.object(VolumeBulkWriter, "_upload_object") + @patch.object(VolumeBulkWriter, "_local_rm") + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") + def test_upload( + self, mock_path_class: Mock, mock_rm: Mock, mock_upload_object: Mock, volume_bulk_writer: VolumeBulkWriter + ) -> None: + """Test uploading files.""" + # Mock Path behavior + mock_path = Mock() + mock_path_class.return_value = mock_path + mock_path.relative_to.return_value = Path("test.parquet") + + file_list = ["test_file.parquet"] + result = volume_bulk_writer._upload(file_list) + + assert len(result) == 1 + mock_upload_object.assert_called_once() + # The actual call will be with the mock path object + assert mock_rm.called + + @patch.object(VolumeFileManager, "upload_file_to_volume") + def test_upload_object( + self, mock_upload_to_volume: Mock, volume_bulk_writer: VolumeBulkWriter + ) -> None: + """Test uploading a single object.""" + volume_bulk_writer._upload_object("local_file.parquet", "remote_file.parquet") + + volume_bulk_writer._volume_file_manager.upload_file_to_volume.assert_called_once_with( + "local_file.parquet", volume_bulk_writer._remote_path + ) + + def test_context_manager(self, simple_schema: CollectionSchema) -> None: + """Test VolumeBulkWriter as context manager.""" + with patch("pymilvus.bulk_writer.volume_bulk_writer.VolumeFileManager"), VolumeBulkWriter( + schema=simple_schema, + remote_path="test/data", + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + volume_name="test_volume", + ) as writer: + assert writer is not None + writer.append_row({ + "id": 1, + "vector": [1.0] * 128, + "text": "test", + }) + + @patch.object(VolumeBulkWriter, "_upload_object") + @patch("pymilvus.bulk_writer.volume_bulk_writer.Path") + def test_upload_error_handling( + self, mock_path_class: Mock, mock_upload_object: Mock, volume_bulk_writer: VolumeBulkWriter + ) -> None: + """Test error handling during upload.""" + mock_upload_object.side_effect = Exception("Upload error") + + # Mock Path behavior + mock_path = Mock() + mock_path_class.return_value = mock_path + mock_path.relative_to.return_value = Path("test.parquet") + + with pytest.raises(MilvusException, match="Failed to upload file"): + volume_bulk_writer._upload(["test_file.parquet"]) + + +class TestIntegration: + """Integration tests for volume operations.""" + + @pytest.fixture + def mock_server_responses(self) -> Dict[str, Any]: + """Mock server responses for integration testing.""" + return { + "apply_volume": { + "code": 0, + "message": "success", + "data": { + "volumeName": "test_volume", + "volumePrefix": "prefix/", + "endpoint": "s3.amazonaws.com", + "bucketName": "test-bucket", + "region": "us-west-2", + "cloud": "aws", + "condition": {"maxContentLength": 1073741824}, + "credentials": { + "tmpAK": "test_access_key", + "tmpSK": "test_secret_key", + "sessionToken": "test_token", + "expireTime": "2099-12-31T23:59:59Z", + }, + }, + }, + "list_volumes": { + "code": 0, + "message": "success", + "data": {"volumes": ["volume1", "volume2", "test_volume"]}, + }, + } + + @patch("pymilvus.bulk_writer.volume_restful.requests.post") + @patch("pymilvus.bulk_writer.volume_restful.requests.get") + def test_full_volume_workflow( + self, + mock_get: Mock, + mock_post: Mock, + mock_server_responses: Dict[str, Any], + ) -> None: + """Test complete volume workflow from creation to upload.""" + # Setup mock responses + mock_post_response = Mock() + mock_post_response.status_code = 200 + mock_post_response.json.return_value = mock_server_responses["apply_volume"] + mock_post.return_value = mock_post_response + + mock_get_response = Mock() + mock_get_response.status_code = 200 + mock_get_response.json.return_value = mock_server_responses["list_volumes"] + mock_get.return_value = mock_get_response + + # Create volume manager + volume_manager = VolumeManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + ) + + # List volumes + result = volume_manager.list_volumes(project_id="test_project") + assert "test_volume" in result.json()["data"]["volumes"] + + # Create volume file manager + file_manager = VolumeFileManager( + cloud_endpoint="https://api.cloud.zilliz.com", + api_key="test_api_key", + volume_name="test_volume", + connect_type=ConnectType.AUTO, + ) + + # Verify volume info can be refreshed + file_manager._refresh_volume_and_client("data/") + assert file_manager.volume_info["volumeName"] == "test_volume"