From 5fdd2d159741dde8cc999490f0f783cb5d8c9d15 Mon Sep 17 00:00:00 2001 From: Masato Onodera Date: Thu, 8 Jan 2026 23:04:42 -1000 Subject: [PATCH 1/3] Add Web API support for target data transfer Implement transfer_data_from_uploader_via_webapi function to download target data via Web API as an alternative to rsync. This enables data transfer from environments without SSH access. Key features: - Download and extract ZIP archives from Web API endpoints - Support Bearer token authentication (optional) - SSL certificate verification control (verify_ssl config option) - Automatic detection and skipping of existing directories - Proper handling of empty/invalid upload_ids - Streaming downloads for large files - Comprehensive error handling (HTTP, network, ZIP errors) - Status reporting compatible with rsync version Changes: - Add requests dependency to pyproject.toml - Implement transfer_data_from_uploader_via_webapi in utils.py - Add transfer-targets-api CLI command - Update both transfer functions to create local_dir if not exists - Improve status reporting to distinguish skipped vs failed transfers - Add [webapi] configuration section documentation Co-Authored-By: Claude Sonnet 4.5 --- docs/reference/cli.md | 32 ++++ pyproject.toml | 1 + src/targetdb/cli/cli_main.py | 81 ++++++++-- src/targetdb/utils.py | 289 ++++++++++++++++++++++++++++++++++- 4 files changed, 386 insertions(+), 17 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 8926bc4..c508cf9 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -28,9 +28,17 @@ The command-line interface (CLI) tool `pfs-targetdb-cli` is provided to work wit host = "" user = "" data_dir = "" + + # Optional section for Web API access + # The following parameters are used to download data via Web API instead of rsync. + [webapi] + url = "" # e.g., "https://example.com/get-upload/" + api_key = "" # Optional: leave empty ("") for no authentication + verify_ssl = true # Optional: set to false to disable SSL certificate verification ``` The `schemacrawler` section is required only if you want to draw an ER diagram of the database schema with SchemaCrawler. + The `webapi` section is used by the `transfer-targets-api` command to download data via Web API. ## `pfs-targetdb-cli` @@ -60,6 +68,7 @@ $ pfs-targetdb-cli [OPTIONS] COMMAND [ARGS]... - `update`: Update rows in a table in the PFS Target... - `parse-alloc`: Parse an Excel file containing time... - `transfer-targets`: Download target lists from the uploader to... +- `transfer-targets-api`: Download target lists from the uploader to... - `insert-targets`: Insert targets using a list of input... - `insert-pointings`: Insert user-defined pointings using a list... - `update-catalog-active`: Update active flag in the input_catalog... @@ -329,6 +338,29 @@ $ pfs-targetdb-cli transfer-targets [OPTIONS] INPUT_FILE --- +### `transfer-targets-api` + +Download target lists from the uploader to the local machine via Web API. + +**Usage**: + +```console +$ pfs-targetdb-cli transfer-targets-api [OPTIONS] INPUT_FILE +``` + +**Arguments**: + +- `INPUT_FILE`: Input catalog list file (csv). [required] + +**Options**: + +- `-c, --config TEXT`: Database configuration file in the TOML format. [required] +- `--local-dir PATH`: Path to the data directory in the local machine [default: .] +- `--force / --no-force`: Force download. [default: no-force] +- `--help`: Show this message and exit. + +--- + ### `insert-targets` Insert targets using a list of input catalogs and upload IDs. diff --git a/pyproject.toml b/pyproject.toml index e1451b6..2a84401 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pandas", "psycopg2-binary", "pyarrow", + "requests", "sqlalchemy", # "setuptools", "sqlalchemy-utils", diff --git a/src/targetdb/cli/cli_main.py b/src/targetdb/cli/cli_main.py index f3860ed..3231aa1 100644 --- a/src/targetdb/cli/cli_main.py +++ b/src/targetdb/cli/cli_main.py @@ -25,6 +25,7 @@ parse_allocation_file, prep_fluxstd_data, transfer_data_from_uploader, + transfer_data_from_uploader_via_webapi, update_input_catalog_active, ) @@ -196,7 +197,7 @@ def checkdups( typer.Option("--skip-save-merged", help="Do not save the merged DataFrame."), ] = False, additional_columns: Annotated[ - List[str], + List[str] | None, typer.Option( "--additional-columns", help="Additional columns to output for the merged file. (e.g., 'psf_mag_g' 'psf_mag_r'). " @@ -217,7 +218,7 @@ def checkdups( "--format", help="File format of the merged data file.", ), - ] = "parquet", + ] = PyArrowFileFormat.parquet, ): if additional_columns is None: additional_columns = [] @@ -260,7 +261,7 @@ def prep_fluxstd( ), ], input_catalog_id: Annotated[ - int, + int | None, typer.Option( "--input_catalog_id", show_default=False, @@ -268,7 +269,7 @@ def prep_fluxstd( ), ] = None, input_catalog_name: Annotated[ - str, + str | None, typer.Option( "--input_catalog_name", show_default=False, @@ -276,7 +277,7 @@ def prep_fluxstd( ), ] = None, rename_cols: Annotated[ - str, + str | None, typer.Option( "--rename-cols", help='Dictionary to rename columns (e.g., \'{"fstar_gaia": "is_fstar_gaia"}\').', @@ -288,7 +289,7 @@ def prep_fluxstd( "--format", help="File format of the output data file.", ), - ] = "parquet", + ] = PyArrowFileFormat.parquet, ): if input_catalog_id is None and input_catalog_name is None: @@ -419,7 +420,7 @@ def insert( ), ] = FluxType.total, upload_id: Annotated[ - str, + str | None, typer.Option( "--upload_id", show_default=False, @@ -427,7 +428,7 @@ def insert( ), ] = None, proposal_id: Annotated[ - str, + str | None, typer.Option( "--proposal_id", show_default=False, @@ -502,7 +503,7 @@ def update( ), ] = False, upload_id: Annotated[ - str, + str | None, typer.Option( "--upload_id", show_default=False, @@ -510,7 +511,7 @@ def update( ), ] = None, proposal_id: Annotated[ - str, + str | None, typer.Option( "--proposal_id", show_default=False, @@ -564,9 +565,9 @@ def parse_alloc( writable=True, help="Directory path to save output files.", ), - ] = ".", + ] = Path("."), outfile_prefix: Annotated[ - str, + str | None, typer.Option( show_default=False, help="Prefix to the output files.", @@ -603,12 +604,11 @@ def transfer_targets( local_dir: Annotated[ Path, typer.Option( - exists=True, dir_okay=True, writable=True, help="Path to the data directory in the local machine", ), - ] = ".", + ] = Path("."), force: Annotated[bool, typer.Option(help="Force download.")] = False, ): @@ -627,6 +627,55 @@ def transfer_targets( ) +@app.command( + help="Download target lists from the uploader to the local machine via Web API." +) +def transfer_targets_api( + input_file: Annotated[ + Path, + typer.Argument( + exists=True, + file_okay=True, + dir_okay=False, + readable=True, + show_default=False, + help="Input catalog list file (csv).", + ), + ], + config_file: Annotated[ + str, + typer.Option( + "-c", + "--config", + show_default=False, + help=config_help_msg, + ), + ], + local_dir: Annotated[ + Path, + typer.Option( + dir_okay=True, + writable=True, + help="Path to the data directory in the local machine", + ), + ] = Path("."), + force: Annotated[bool, typer.Option(help="Force download.")] = False, +): + + logger.info(f"Loading config file: {config_file}") + config = load_config(config_file) + + logger.info(f"Loading input data from {input_file} into a DataFrame") + df = load_input_data(input_file) + + transfer_data_from_uploader_via_webapi( + df, + config, + local_dir=local_dir, + force=force, + ) + + @app.command(help="Insert targets using a list of input catalogs and upload IDs.") def insert_targets( input_catalogs: Annotated[ @@ -657,7 +706,7 @@ def insert_targets( readable=True, help="Path to the data directory.", ), - ] = ".", + ] = Path("."), flux_type: Annotated[ FluxType, typer.Option( @@ -724,7 +773,7 @@ def insert_pointings( readable=True, help="Path to the data directory.", ), - ] = ".", + ] = Path("."), commit: Annotated[ bool, typer.Option("--commit", help="Commit changes to the database."), diff --git a/src/targetdb/utils.py b/src/targetdb/utils.py index 5e2f5b4..810969c 100644 --- a/src/targetdb/utils.py +++ b/src/targetdb/utils.py @@ -2,13 +2,18 @@ import glob import os +import re +import shutil import subprocess +import tempfile import time +import zipfile from datetime import datetime from pathlib import Path import numpy as np import pandas as pd +import requests from astropy.table import Table from loguru import logger from sqlalchemy import URL @@ -1235,11 +1240,23 @@ def transfer_data_from_uploader( local_dir=Path("."), force=False, ): + # Create local directory if it doesn't exist + if not local_dir.exists(): + logger.info(f"Creating local directory: {local_dir}") + local_dir.mkdir(parents=True, exist_ok=True) + status = [] n_transfer = [] # is_user_ppc = [] for upload_id in df["upload_id"]: + # Skip empty or invalid upload_id + if pd.isna(upload_id) or str(upload_id).strip() == "": + logger.warning("Skipping empty or invalid upload_id") + status.append("skipped") + n_transfer.append(0) + continue + # datadirs = glob.glob(local_dir / f"????????-??????-{upload_id}") # datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}")) datadirs = list(local_dir.glob(f"????????-??????-{upload_id}")) @@ -1352,7 +1369,7 @@ def transfer_data_from_uploader( n_transfer.append(0) # is_user_ppc.append(False) - custom_status_dict = {"success": 0, "WARNING": 1, "FAILED": 3} + custom_status_dict = {"success": 0, "WARNING": 1, "skipped": 2, "FAILED": 3} df_status = pd.DataFrame( { "upload_id": df["upload_id"], @@ -1375,6 +1392,276 @@ def transfer_data_from_uploader( ) +def transfer_data_from_uploader_via_webapi( + df, + config, + local_dir=Path("."), + force=False, +): + """ + Transfer data from the uploader server to local machine via Web API. + + This function downloads target data as ZIP archives from a Web API endpoint + using Bearer token authentication, extracts them, and validates the directory + structure matches the expected pattern: YYYYMMDD-HHMMSS-{upload_id}. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame containing at least an 'upload_id' column with upload IDs to transfer. + config : dict + Configuration dictionary containing Web API credentials: + - config["webapi"]["url"]: Base URL of the Web API endpoint + - config["webapi"]["api_key"]: Bearer token for authentication + local_dir : Path, optional + Local directory where data should be downloaded. Defaults to Path("."). + force : bool, optional + If True, re-download even if directory already exists locally. + If False, skip transfer if directory exists. Defaults to False. + + Returns + ------- + None + The function logs transfer status and errors. It does not return a value, + but prints a status DataFrame showing results for each upload_id. + + Raises + ------ + ValueError + If multiple directories matching the same upload_id are found locally. + KeyError + If required config keys are missing. + + Notes + ----- + - Downloads are streamed to handle large ZIP files efficiently + - Temporary files are created in local_dir and cleaned up after extraction + - Status tracking mirrors transfer_data_from_uploader() for consistency: + * "success": 1 directory transferred successfully + * "WARNING": 0 or >1 directories in ZIP archive + * "FAILED": Error during HTTP request or extraction + * "skipped": Directory exists locally and force=False + - The function continues processing remaining upload_ids even if one fails + + Examples + -------- + >>> df = pd.DataFrame({'upload_id': ['abc123', 'def456']}) + >>> config = { + ... 'webapi': { + ... 'url': 'http://localhost:8000/get-upload/', + ... 'api_key': 'my-secret-token' + ... } + ... } + >>> transfer_data_from_uploader_via_webapi(df, config, local_dir=Path('./data')) + """ + # Create local directory if it doesn't exist + if not local_dir.exists(): + logger.info(f"Creating local directory: {local_dir}") + local_dir.mkdir(parents=True, exist_ok=True) + + status = [] + n_transfer = [] + + # Extract config values (move outside loop to avoid bug) + webapi_url = config["webapi"]["url"] + api_key = config["webapi"].get("api_key", "") + verify_ssl = config["webapi"].get("verify_ssl", True) + + # Normalize URL (ensure trailing slash) + if not webapi_url.endswith("/"): + webapi_url += "/" + + # Setup request headers (only add Authorization if api_key is provided) + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Log SSL verification status + if not verify_ssl: + logger.warning("SSL certificate verification is disabled") + import urllib3 + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + for upload_id in df["upload_id"]: + # Skip empty or invalid upload_id + if pd.isna(upload_id) or str(upload_id).strip() == "": + logger.warning("Skipping empty or invalid upload_id") + status.append("skipped") + n_transfer.append(0) + continue + + # Check local directory existence + # Pattern: pfs_target-YYYYMMDD-HHMMSS-{upload_id} + datadirs = list(local_dir.glob(f"*-*-{upload_id}")) + skip_transfer = False + + if len(datadirs) == 1: + skip_transfer = True if not force else False + if skip_transfer: + logger.info( + f"Data directory, {datadirs[0]}, is found locally. Skip transfer" + ) + else: + logger.info( + f"Data directory, {datadirs[0]}, is found locally, but force transfer" + ) + elif len(datadirs) > 1: + logger.error( + f"Multiple data directories are found in the destination directory: {datadirs}." + ) + raise ValueError( + f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}" + ) + else: + logger.info( + f"Data directory for upload_id: {upload_id} is not found locally. Try transfer" + ) + + if not skip_transfer: + logger.info(f"Downloading data for upload_id: {upload_id} from the Web API") + + full_url = f"{webapi_url}{upload_id}" + logger.info(f"API endpoint: {full_url}") + + try: + # Step 1: HTTP GET request with streaming for large files + response = requests.get( + full_url, + headers=headers, + stream=True, + timeout=300, + verify=verify_ssl, + ) + response.raise_for_status() # Raises HTTPError for bad status codes + + # Step 2: Save ZIP to temporary file + with tempfile.NamedTemporaryFile( + mode="wb", + suffix=".zip", + delete=False, + dir=local_dir, # Keep temp file in destination for atomic move + ) as tmp_zip: + tmp_zip_path = Path(tmp_zip.name) + # Write in chunks to handle large files + for chunk in response.iter_content(chunk_size=8192): + tmp_zip.write(chunk) + + logger.info(f"Downloaded ZIP file to {tmp_zip_path}") + + # Step 3: Extract ZIP to temporary directory + with tempfile.TemporaryDirectory(dir=local_dir) as tmp_extract_dir: + tmp_extract_path = Path(tmp_extract_dir) + + with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref: + zip_ref.extractall(tmp_extract_path) + + logger.info("Extracted ZIP to temporary directory") + + # Step 4: Validate directory structure + # Expected pattern: pfs_target-YYYYMMDD-HHMMSS-{upload_id} + pattern = re.compile( + r"^pfs_target-\d{8}-\d{6}-" + re.escape(upload_id) + r"$" + ) + extracted_dirs = [ + d + for d in tmp_extract_path.iterdir() + if d.is_dir() and pattern.match(d.name) + ] + + if len(extracted_dirs) == 0: + logger.error( + f"No directory matching pattern 'pfs_target-????????-??????-{upload_id}' found in ZIP" + ) + n_transfer.append(0) + status.append("WARNING") + elif len(extracted_dirs) > 1: + logger.error( + f"Multiple directories matching pattern found in ZIP: {extracted_dirs}" + ) + n_transfer.append(len(extracted_dirs)) + status.append("WARNING") + else: + # Step 5: Move extracted directory to destination + extracted_dir = extracted_dirs[0] + final_dest = local_dir / extracted_dir.name + + # If force=True and directory exists, remove it first + if final_dest.exists(): + logger.info(f"Removing existing directory: {final_dest}") + shutil.rmtree(final_dest) + + shutil.move(str(extracted_dir), str(final_dest)) + logger.info(f"Moved extracted directory to {final_dest}") + + n_transfer.append(1) + status.append("success") + + # Step 6: Cleanup temporary ZIP file + tmp_zip_path.unlink() + logger.info("Cleaned up temporary ZIP file") + + except requests.exceptions.HTTPError as e: + logger.error( + f"HTTP error while downloading data for upload_id: {upload_id}" + ) + logger.error( + f"Status code: {e.response.status_code}, Message: {str(e)}" + ) + n_transfer.append(0) + status.append("FAILED") + except requests.exceptions.RequestException as e: + logger.error( + f"Network error while downloading data for upload_id: {upload_id}" + ) + logger.error(str(e)) + n_transfer.append(0) + status.append("FAILED") + except zipfile.BadZipFile as e: + logger.error(f"Invalid ZIP file for upload_id: {upload_id}") + logger.error(str(e)) + n_transfer.append(0) + status.append("FAILED") + except Exception as e: + logger.error( + f"Unexpected error while transferring data for upload_id: {upload_id}" + ) + logger.error(str(e)) + n_transfer.append(0) + status.append("FAILED") + else: + status.append("skipped") + n_transfer.append(0) + + # Return status DataFrame (identical to rsync version) + custom_status_dict = {"success": 0, "WARNING": 1, "skipped": 2, "FAILED": 3} + df_status = pd.DataFrame( + { + "upload_id": df["upload_id"], + "status": status, + "n_transfer": n_transfer, + } + ) + df_status_out = df_status.sort_values( + by=["status"], key=lambda x: x.map(custom_status_dict) + ) + logger.info(f"Transfer status: \n{df_status_out.to_string(index=False)}") + + # Check for actual issues (FAILED or WARNING), skipped is not an issue + has_failures = np.any( + (df_status["status"] == "FAILED") | (df_status["status"] == "WARNING") + ) + + if np.all(df_status["status"] == "success"): + logger.info("All data transfer is successful.") + elif not has_failures: + logger.info("All transfers completed (some skipped).") + else: + logger.error( + "There are some issues with data transfer. Please check the status." + ) + + def insert_targets_from_uploader( df_input_catalogs, config, From 8778f63111765d8beb140c175b85ac59a55424a7 Mon Sep 17 00:00:00 2001 From: Masato Onodera Date: Thu, 8 Jan 2026 23:13:59 -1000 Subject: [PATCH 2/3] Refactor transfer_data_from_uploader_via_webapi to reduce complexity Break down the main function into smaller helper functions to improve readability and reduce cyclomatic complexity (from 22 to ~7). Changes: - Extract _setup_webapi_config() for configuration initialization - Extract _check_existing_directory() for directory checking logic - Extract _download_zip_from_api() for ZIP download - Extract _extract_and_validate_zip() for extraction and validation - Extract _process_single_upload() to orchestrate single upload processing - Main function now only handles setup, iteration, and status reporting This addresses Codacy complexity warning (MC0001) while maintaining identical functionality. Co-Authored-By: Claude Sonnet 4.5 --- src/targetdb/utils.py | 339 +++++++++++++++++++++--------------------- 1 file changed, 169 insertions(+), 170 deletions(-) diff --git a/src/targetdb/utils.py b/src/targetdb/utils.py index 810969c..a55972e 100644 --- a/src/targetdb/utils.py +++ b/src/targetdb/utils.py @@ -1392,6 +1392,165 @@ def transfer_data_from_uploader( ) +def _setup_webapi_config(config): + """Setup Web API configuration and headers.""" + webapi_url = config["webapi"]["url"] + api_key = config["webapi"].get("api_key", "") + verify_ssl = config["webapi"].get("verify_ssl", True) + + # Normalize URL (ensure trailing slash) + if not webapi_url.endswith("/"): + webapi_url += "/" + + # Setup request headers (only add Authorization if api_key is provided) + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Log SSL verification status + if not verify_ssl: + logger.warning("SSL certificate verification is disabled") + import urllib3 + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + return webapi_url, headers, verify_ssl + + +def _check_existing_directory(local_dir, upload_id, force): + """Check for existing directory and determine if transfer should be skipped.""" + datadirs = list(local_dir.glob(f"*-*-{upload_id}")) + + if len(datadirs) == 1: + skip_transfer = not force + if skip_transfer: + logger.info( + f"Data directory, {datadirs[0]}, is found locally. Skip transfer" + ) + else: + logger.info( + f"Data directory, {datadirs[0]}, is found locally, but force transfer" + ) + return skip_transfer + elif len(datadirs) > 1: + logger.error( + f"Multiple data directories are found in the destination directory: {datadirs}." + ) + raise ValueError( + f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}" + ) + else: + logger.info( + f"Data directory for upload_id: {upload_id} is not found locally. Try transfer" + ) + return False + + +def _download_zip_from_api(url, headers, verify_ssl, local_dir): + """Download ZIP file from Web API.""" + response = requests.get(url, headers=headers, stream=True, timeout=300, verify=verify_ssl) + response.raise_for_status() + + with tempfile.NamedTemporaryFile( + mode="wb", suffix=".zip", delete=False, dir=local_dir + ) as tmp_zip: + tmp_zip_path = Path(tmp_zip.name) + for chunk in response.iter_content(chunk_size=8192): + tmp_zip.write(chunk) + + logger.info(f"Downloaded ZIP file to {tmp_zip_path}") + return tmp_zip_path + + +def _extract_and_validate_zip(zip_path, upload_id, local_dir, force): + """Extract ZIP and validate directory structure.""" + with tempfile.TemporaryDirectory(dir=local_dir) as tmp_extract_dir: + tmp_extract_path = Path(tmp_extract_dir) + + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmp_extract_path) + + logger.info("Extracted ZIP to temporary directory") + + # Validate directory structure + pattern = re.compile(r"^pfs_target-\d{8}-\d{6}-" + re.escape(upload_id) + r"$") + extracted_dirs = [ + d for d in tmp_extract_path.iterdir() if d.is_dir() and pattern.match(d.name) + ] + + if len(extracted_dirs) == 0: + logger.error( + f"No directory matching pattern 'pfs_target-????????-??????-{upload_id}' found in ZIP" + ) + return 0, "WARNING" + elif len(extracted_dirs) > 1: + logger.error( + f"Multiple directories matching pattern found in ZIP: {extracted_dirs}" + ) + return len(extracted_dirs), "WARNING" + else: + # Move extracted directory to destination + extracted_dir = extracted_dirs[0] + final_dest = local_dir / extracted_dir.name + + if final_dest.exists(): + logger.info(f"Removing existing directory: {final_dest}") + shutil.rmtree(final_dest) + + shutil.move(str(extracted_dir), str(final_dest)) + logger.info(f"Moved extracted directory to {final_dest}") + + return 1, "success" + + +def _process_single_upload(upload_id, webapi_url, headers, verify_ssl, local_dir, force): + """Process a single upload_id download.""" + # Skip empty or invalid upload_id + if pd.isna(upload_id) or str(upload_id).strip() == "": + logger.warning("Skipping empty or invalid upload_id") + return "skipped", 0 + + # Check existing directory + try: + skip_transfer = _check_existing_directory(local_dir, upload_id, force) + except ValueError: + raise + + if skip_transfer: + return "skipped", 0 + + # Download and extract + logger.info(f"Downloading data for upload_id: {upload_id} from the Web API") + full_url = f"{webapi_url}{upload_id}" + logger.info(f"API endpoint: {full_url}") + + try: + zip_path = _download_zip_from_api(full_url, headers, verify_ssl, local_dir) + try: + n_transfer, status = _extract_and_validate_zip(zip_path, upload_id, local_dir, force) + return status, n_transfer + finally: + zip_path.unlink() + logger.info("Cleaned up temporary ZIP file") + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error while downloading data for upload_id: {upload_id}") + logger.error(f"Status code: {e.response.status_code}, Message: {str(e)}") + return "FAILED", 0 + except requests.exceptions.RequestException as e: + logger.error(f"Network error while downloading data for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + except zipfile.BadZipFile as e: + logger.error(f"Invalid ZIP file for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + except Exception as e: + logger.error(f"Unexpected error while transferring data for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + + def transfer_data_from_uploader_via_webapi( df, config, @@ -1459,181 +1618,21 @@ def transfer_data_from_uploader_via_webapi( logger.info(f"Creating local directory: {local_dir}") local_dir.mkdir(parents=True, exist_ok=True) + # Setup Web API configuration + webapi_url, headers, verify_ssl = _setup_webapi_config(config) + + # Process each upload_id status = [] n_transfer = [] - # Extract config values (move outside loop to avoid bug) - webapi_url = config["webapi"]["url"] - api_key = config["webapi"].get("api_key", "") - verify_ssl = config["webapi"].get("verify_ssl", True) - - # Normalize URL (ensure trailing slash) - if not webapi_url.endswith("/"): - webapi_url += "/" - - # Setup request headers (only add Authorization if api_key is provided) - headers = {} - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - - # Log SSL verification status - if not verify_ssl: - logger.warning("SSL certificate verification is disabled") - import urllib3 - - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - for upload_id in df["upload_id"]: - # Skip empty or invalid upload_id - if pd.isna(upload_id) or str(upload_id).strip() == "": - logger.warning("Skipping empty or invalid upload_id") - status.append("skipped") - n_transfer.append(0) - continue - - # Check local directory existence - # Pattern: pfs_target-YYYYMMDD-HHMMSS-{upload_id} - datadirs = list(local_dir.glob(f"*-*-{upload_id}")) - skip_transfer = False - - if len(datadirs) == 1: - skip_transfer = True if not force else False - if skip_transfer: - logger.info( - f"Data directory, {datadirs[0]}, is found locally. Skip transfer" - ) - else: - logger.info( - f"Data directory, {datadirs[0]}, is found locally, but force transfer" - ) - elif len(datadirs) > 1: - logger.error( - f"Multiple data directories are found in the destination directory: {datadirs}." - ) - raise ValueError( - f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}" - ) - else: - logger.info( - f"Data directory for upload_id: {upload_id} is not found locally. Try transfer" - ) - - if not skip_transfer: - logger.info(f"Downloading data for upload_id: {upload_id} from the Web API") - - full_url = f"{webapi_url}{upload_id}" - logger.info(f"API endpoint: {full_url}") - - try: - # Step 1: HTTP GET request with streaming for large files - response = requests.get( - full_url, - headers=headers, - stream=True, - timeout=300, - verify=verify_ssl, - ) - response.raise_for_status() # Raises HTTPError for bad status codes - - # Step 2: Save ZIP to temporary file - with tempfile.NamedTemporaryFile( - mode="wb", - suffix=".zip", - delete=False, - dir=local_dir, # Keep temp file in destination for atomic move - ) as tmp_zip: - tmp_zip_path = Path(tmp_zip.name) - # Write in chunks to handle large files - for chunk in response.iter_content(chunk_size=8192): - tmp_zip.write(chunk) - - logger.info(f"Downloaded ZIP file to {tmp_zip_path}") - - # Step 3: Extract ZIP to temporary directory - with tempfile.TemporaryDirectory(dir=local_dir) as tmp_extract_dir: - tmp_extract_path = Path(tmp_extract_dir) - - with zipfile.ZipFile(tmp_zip_path, "r") as zip_ref: - zip_ref.extractall(tmp_extract_path) - - logger.info("Extracted ZIP to temporary directory") - - # Step 4: Validate directory structure - # Expected pattern: pfs_target-YYYYMMDD-HHMMSS-{upload_id} - pattern = re.compile( - r"^pfs_target-\d{8}-\d{6}-" + re.escape(upload_id) + r"$" - ) - extracted_dirs = [ - d - for d in tmp_extract_path.iterdir() - if d.is_dir() and pattern.match(d.name) - ] - - if len(extracted_dirs) == 0: - logger.error( - f"No directory matching pattern 'pfs_target-????????-??????-{upload_id}' found in ZIP" - ) - n_transfer.append(0) - status.append("WARNING") - elif len(extracted_dirs) > 1: - logger.error( - f"Multiple directories matching pattern found in ZIP: {extracted_dirs}" - ) - n_transfer.append(len(extracted_dirs)) - status.append("WARNING") - else: - # Step 5: Move extracted directory to destination - extracted_dir = extracted_dirs[0] - final_dest = local_dir / extracted_dir.name - - # If force=True and directory exists, remove it first - if final_dest.exists(): - logger.info(f"Removing existing directory: {final_dest}") - shutil.rmtree(final_dest) - - shutil.move(str(extracted_dir), str(final_dest)) - logger.info(f"Moved extracted directory to {final_dest}") - - n_transfer.append(1) - status.append("success") - - # Step 6: Cleanup temporary ZIP file - tmp_zip_path.unlink() - logger.info("Cleaned up temporary ZIP file") - - except requests.exceptions.HTTPError as e: - logger.error( - f"HTTP error while downloading data for upload_id: {upload_id}" - ) - logger.error( - f"Status code: {e.response.status_code}, Message: {str(e)}" - ) - n_transfer.append(0) - status.append("FAILED") - except requests.exceptions.RequestException as e: - logger.error( - f"Network error while downloading data for upload_id: {upload_id}" - ) - logger.error(str(e)) - n_transfer.append(0) - status.append("FAILED") - except zipfile.BadZipFile as e: - logger.error(f"Invalid ZIP file for upload_id: {upload_id}") - logger.error(str(e)) - n_transfer.append(0) - status.append("FAILED") - except Exception as e: - logger.error( - f"Unexpected error while transferring data for upload_id: {upload_id}" - ) - logger.error(str(e)) - n_transfer.append(0) - status.append("FAILED") - else: - status.append("skipped") - n_transfer.append(0) + upload_status, upload_n_transfer = _process_single_upload( + upload_id, webapi_url, headers, verify_ssl, local_dir, force + ) + status.append(upload_status) + n_transfer.append(upload_n_transfer) - # Return status DataFrame (identical to rsync version) + # Generate status report custom_status_dict = {"success": 0, "WARNING": 1, "skipped": 2, "FAILED": 3} df_status = pd.DataFrame( { From 3e21edb0655a565366bad9b1f99e7152fc6b15e8 Mon Sep 17 00:00:00 2001 From: Masato Onodera Date: Thu, 8 Jan 2026 23:19:57 -1000 Subject: [PATCH 3/3] Add .markdownlintrc to configure Markdown linting Configure markdownlint to allow patterns common in technical and auto-generated documentation: - MD013: Allow long lines for CLI docs and URLs - MD014: Allow dollar signs in shell commands - MD033: Allow inline HTML for documentation formatting - MD034: Allow bare URLs - MD041: Don't require first line to be a heading These settings accommodate auto-generated CLI documentation from typer and standard technical documentation practices. Co-Authored-By: Claude Sonnet 4.5 --- .markdownlintrc | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .markdownlintrc diff --git a/.markdownlintrc b/.markdownlintrc new file mode 100644 index 0000000..67d1329 --- /dev/null +++ b/.markdownlintrc @@ -0,0 +1,17 @@ +# Markdownlint configuration for PFS Target Database documentation +default: true + +# MD013: Line length - disable for auto-generated CLI docs and long URLs +MD013: false + +# MD014: Dollar signs in shell commands - disable for CLI documentation +MD014: false + +# MD033: Inline HTML - allow for documentation formatting (e.g., details/summary) +MD033: false + +# MD034: Bare URLs - allow for reference links +MD034: false + +# MD041: First line in file should be top-level heading - not always needed +MD041: false