From 9415da901a6a335af8954f9fabadcb03f6e466b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E7=9D=BF?= Date: Sat, 4 Apr 2026 07:07:27 +0800 Subject: [PATCH] feat(v2.2.4): Validation fix --- CHANGELOG.md | 23 ++ MIGRATIONS.md | 13 + README.md | 31 +- README_cn.md | 38 ++- docs/architecture.md | 6 +- docs/public-api.md | 10 + docs/releases/2.2.3.md | 16 +- examples/README.md | 89 ++++++ examples/annotated_schema.py | 10 +- examples/create_or_update_import.py | 144 +++++++++ examples/custom_storage.py | 2 - examples/date_and_range_fields.py | 54 ++++ examples/employee_import_workflow.py | 120 +++++++ examples/export_workflow.py | 50 +++ examples/fastapi_upload.py | 2 - examples/minio_storage.py | 42 +++ examples/selection_fields.py | 53 ++++ src/excelalchemy/__init__.py | 2 +- src/excelalchemy/_primitives/constants.py | 3 - src/excelalchemy/_primitives/identity.py | 6 +- src/excelalchemy/codecs/base.py | 49 ++- src/excelalchemy/codecs/boolean.py | 22 +- src/excelalchemy/codecs/date.py | 65 ++-- src/excelalchemy/codecs/date_range.py | 31 +- src/excelalchemy/codecs/money.py | 21 +- src/excelalchemy/codecs/multi_checkbox.py | 22 +- src/excelalchemy/codecs/number.py | 80 +++-- src/excelalchemy/codecs/number_range.py | 6 +- src/excelalchemy/codecs/organization.py | 20 +- src/excelalchemy/codecs/phone_number.py | 4 +- src/excelalchemy/codecs/radio.py | 41 ++- src/excelalchemy/codecs/staff.py | 22 +- src/excelalchemy/codecs/string.py | 30 +- src/excelalchemy/codecs/tree.py | 30 +- src/excelalchemy/codecs/url.py | 5 +- src/excelalchemy/core/alchemy.py | 34 +- src/excelalchemy/core/import_session.py | 30 +- src/excelalchemy/core/rows.py | 11 +- src/excelalchemy/core/schema.py | 36 ++- src/excelalchemy/core/storage.py | 4 +- src/excelalchemy/core/writer.py | 28 +- src/excelalchemy/exceptions.py | 8 +- src/excelalchemy/helper/pydantic.py | 31 +- src/excelalchemy/metadata.py | 295 ++++++++++++------ src/excelalchemy/util/file.py | 4 +- tests/integration/test_examples_smoke.py | 137 ++++++++ .../test_excelalchemy_workflows.py | 5 + tests/unit/test_field_metadata.py | 26 ++ 48 files changed, 1468 insertions(+), 343 deletions(-) create mode 100644 examples/README.md create mode 100644 examples/create_or_update_import.py create mode 100644 examples/date_and_range_fields.py create mode 100644 examples/employee_import_workflow.py create mode 100644 examples/export_workflow.py create mode 100644 examples/minio_storage.py create mode 100644 examples/selection_fields.py create mode 100644 tests/integration/test_examples_smoke.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 13280d8..4c4cd8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,23 @@ the Pydantic adapter layer. - Added a regression test for unsupported annotated declarations to prevent native Python annotations from slipping through the workbook schema path +### Changed + +- Clarified `FieldMetaInfo` as a compatibility facade over layered metadata + objects instead of treating it as the primary internal metadata model +- Moved more core consumers and built-in codecs onto the layered metadata + objects (`declared`, `runtime`, `presentation`, and `constraints`) +- Continued reducing the effective responsibility carried by the flat + `FieldMetaInfo` compatibility surface in the 2.x implementation +- Concentrated necessary dynamic typing boundaries into explicit aliases in the + codec and metadata layers instead of leaving ad hoc `Any` usage scattered + across the codebase +- Replaced a number of remaining loose `Any` annotations in the runtime path + with more explicit `object` or workbook-boundary aliases where the behavior + was already concrete +- Added smoke coverage for the repository examples so the annotated schema and + custom storage examples are exercised directly in tests + ### Compatibility Notes - No public import or export workflow API was removed in this release @@ -27,12 +44,18 @@ the Pydantic adapter layer. to work unchanged - Unsupported native annotations with `ExcelMeta(...)` now fail early with the intended `ProgrammaticError` +- `FieldMeta(...)` and `ExcelMeta(...)` remain the stable public metadata entry + points while internal metadata continues to consolidate behind them ### Release Summary - unsupported annotated declarations now fail with the intended error again - codec resolution is stricter and easier to reason about - the validation fix is protected by an explicit integration regression test +- metadata internals continue to move toward layered objects rather than a flat + central record +- runtime typing boundaries are more explicit without changing the public API +- repository examples now have direct smoke coverage in the test suite ## [2.2.2] - 2026-04-03 diff --git a/MIGRATIONS.md b/MIGRATIONS.md index d65bd6e..3c35f66 100644 --- a/MIGRATIONS.md +++ b/MIGRATIONS.md @@ -124,6 +124,19 @@ Additional top-level module guidance: - `excelalchemy.header_models` is internal and should not be imported in application code - `docs/public-api.md` summarizes stable public modules, compatibility modules, and internal modules +## Import Inspection Names + +The 2.2 line also clarifies the recommended names for inspecting import-run +state from the facade: + +- prefer `worksheet_table` over `df` +- prefer `header_table` over `header_df` +- prefer `cell_error_map` over `cell_errors` +- prefer `row_error_map` over `row_errors` + +The old names still work as compatibility aliases in the 2.x line, but new +code should use the clearer names above. + ## Recommended Upgrade Checklist 1. Upgrade your Python runtime to 3.12+. diff --git a/README.md b/README.md index 6fa3a18..c8b801b 100755 --- a/README.md +++ b/README.md @@ -178,8 +178,18 @@ pip install "ExcelAlchemy[minio]" Practical examples live in the repository: - [`examples/annotated_schema.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/annotated_schema.py) +- [`examples/employee_import_workflow.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/employee_import_workflow.py) +- [`examples/create_or_update_import.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/create_or_update_import.py) +- [`examples/date_and_range_fields.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/date_and_range_fields.py) +- [`examples/selection_fields.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/selection_fields.py) - [`examples/custom_storage.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/custom_storage.py) +- [`examples/export_workflow.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/export_workflow.py) +- [`examples/minio_storage.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/minio_storage.py) - [`examples/fastapi_upload.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/fastapi_upload.py) +- [`examples/README.md`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/README.md) + +If you want the recommended reading order, start with +[`examples/README.md`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/README.md). ## Public API Boundaries @@ -187,6 +197,25 @@ If you want to know which modules are stable public entry points versus compatibility shims or internal modules, see [`docs/public-api.md`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/docs/public-api.md). +## Import Inspection Names + +When you inspect import-run state from the facade, prefer the clearer 2.2 names: + +- `alchemy.worksheet_table` +- `alchemy.header_table` +- `alchemy.cell_error_map` +- `alchemy.row_error_map` + +The older aliases: + +- `alchemy.df` +- `alchemy.header_df` +- `alchemy.cell_errors` +- `alchemy.row_errors` + +still work in the 2.x line as compatibility paths, but new application code +should use the clearer names above. + ## Locale-Aware Workbook Output `locale` affects workbook-facing display text such as: @@ -316,7 +345,7 @@ The short version: | Topic | v1-style risk | Current v2 design | | --- | --- | --- | | Field access | Tight coupling to `__fields__` / `ModelField` | Adapter over `model_fields` | -| Metadata ownership | Excel metadata mixed with validation internals | `FieldMetaInfo` owns Excel metadata | +| Metadata ownership | Excel metadata mixed with validation internals | `FieldMetaInfo` is a compatibility facade over layered Excel metadata | | Validation integration | Deep reliance on internals | Adapter + explicit runtime validation | | Upgrade path | Brittle | Layered | diff --git a/README_cn.md b/README_cn.md index c974754..81af3ad 100644 --- a/README_cn.md +++ b/README_cn.md @@ -102,6 +102,24 @@ pip install ExcelAlchemy pip install "ExcelAlchemy[minio]" ``` +## 示例 + +仓库里有一组更贴近实际接入的示例: + +- [`examples/annotated_schema.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/annotated_schema.py) +- [`examples/employee_import_workflow.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/employee_import_workflow.py) +- [`examples/create_or_update_import.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/create_or_update_import.py) +- [`examples/date_and_range_fields.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/date_and_range_fields.py) +- [`examples/selection_fields.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/selection_fields.py) +- [`examples/custom_storage.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/custom_storage.py) +- [`examples/export_workflow.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/export_workflow.py) +- [`examples/minio_storage.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/minio_storage.py) +- [`examples/fastapi_upload.py`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/fastapi_upload.py) +- [`examples/README.md`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/README.md) + +如果你想按推荐顺序来阅读,建议先看 +[`examples/README.md`](https://github.com/RayCarterLab/ExcelAlchemy/blob/main/examples/README.md)。 + ## 快速开始 ```python @@ -192,6 +210,24 @@ alchemy = ExcelAlchemy(ExporterConfig(Importer, storage=InMemoryExcelStorage())) 如果你希望使用内置 Minio 实现,推荐显式传入 `storage=MinioStorageGateway(...)`,而不是再把 Minio 配置散落到门面层。 +## 导入结果状态查看命名 + +如果你需要从 facade 上查看一次导入后的中间状态,推荐使用 2.2 这套更清晰的命名: + +- `alchemy.worksheet_table` +- `alchemy.header_table` +- `alchemy.cell_error_map` +- `alchemy.row_error_map` + +旧别名: + +- `alchemy.df` +- `alchemy.header_df` +- `alchemy.cell_errors` +- `alchemy.row_errors` + +在 2.x 里仍然可用,用于兼容旧代码;但新代码建议统一使用前面这组更明确的名字。 + ## 为什么这样设计 ### 为什么去掉 pandas @@ -210,7 +246,7 @@ alchemy = ExcelAlchemy(ExporterConfig(Importer, storage=InMemoryExcelStorage())) Excel 元数据不应该深绑到 Pydantic 内部结构上。 所以现在的分层是: -- `FieldMetaInfo` 负责 Excel 元数据 +- `FieldMetaInfo` 是对外兼容 façade,内部再组合声明层、运行时绑定层、展示层和导入约束层 - `helper/pydantic.py` 只做适配 - 真正的业务校验仍然由 ExcelAlchemy 控制 diff --git a/docs/architecture.md b/docs/architecture.md index 011e8d4..bb6af66 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -99,8 +99,10 @@ flowchart LR `src/excelalchemy/metadata.py` -- owns Excel field metadata -- exposes workbook comment fragments +- exposes `FieldMeta(...)` / `ExcelMeta(...)` as the stable public entry points +- keeps `FieldMetaInfo` as a compatibility facade for the 2.x line +- splits the real metadata state into declaration, runtime binding, + workbook presentation, and import-constraint layers - keeps runtime metadata separate from validation backend internals ### Pydantic Integration diff --git a/docs/public-api.md b/docs/public-api.md index 3bf95ae..005b388 100644 --- a/docs/public-api.md +++ b/docs/public-api.md @@ -34,6 +34,9 @@ These modules are the recommended import paths for application code: The recommended backend configuration pattern in the 2.x line. - `ExcelArtifact` The recommended return shape when you need bytes, base64, or data URLs. +- import inspection names: + Prefer `worksheet_table`, `header_table`, `cell_error_map`, and + `row_error_map` when reading import-run state from the facade. ## Compatibility Modules In 2.x @@ -92,3 +95,10 @@ direction is: and `excelalchemy.codecs` - backend integration through `ExcelStorage` - internal orchestration and helper modules treated as implementation details + +For import-run state naming, the long-term direction is also: + +- clear facade inspection names such as `worksheet_table`, `header_table`, + `cell_error_map`, and `row_error_map` +- older aliases such as `df`, `header_df`, `cell_errors`, and `row_errors` + retained only as 2.x compatibility paths diff --git a/docs/releases/2.2.3.md b/docs/releases/2.2.3.md index d8525e9..197c37a 100644 --- a/docs/releases/2.2.3.md +++ b/docs/releases/2.2.3.md @@ -9,6 +9,8 @@ line. - present `2.2.3` as a focused validation-correctness release - restore the intended failure mode for unsupported annotated declarations - ship regression coverage for the declaration guard in the Pydantic adapter +- continue consolidating the remaining runtime typing gray areas +- add direct smoke coverage for repository examples ## Release Positioning @@ -19,6 +21,8 @@ line. - unsupported native annotations with `ExcelMeta(...)` now fail early with the intended `ProgrammaticError` - the validation path is safer and easier to reason about +- runtime typing boundaries are more explicit in core codec and metadata paths +- repository examples now have direct smoke coverage in tests ## Before Tagging @@ -28,6 +32,8 @@ line. `src/excelalchemy/helper/pydantic.py`. 4. Confirm the regression test in `tests/integration/test_excelalchemy_workflows.py`. +5. Confirm the example smoke tests in + `tests/integration/test_examples_smoke.py`. ## Local Verification @@ -36,9 +42,11 @@ Run these commands from the repository root: ```bash uv sync --extra development uv run ruff check src/excelalchemy/helper/pydantic.py \ - tests/integration/test_excelalchemy_workflows.py + tests/integration/test_excelalchemy_workflows.py \ + tests/integration/test_examples_smoke.py uv run pyright -uv run pytest tests/integration/test_excelalchemy_workflows.py -q +uv run pytest tests/integration/test_excelalchemy_workflows.py \ + tests/integration/test_examples_smoke.py -q rm -rf dist uv build uvx twine check dist/* @@ -60,6 +68,8 @@ themes clearly: - unsupported annotated declarations now fail with the intended error again - codec resolution in the Pydantic adapter is stricter and more explicit - the fix is protected by a regression test +- runtime typing boundaries are more explicit and less ad hoc +- repository examples are exercised directly in the test suite ## Recommended Release Messaging @@ -69,6 +79,8 @@ Prefer wording that emphasizes stability and correctness: - "restores the intended ProgrammaticError path" - "tightens codec resolution in the Pydantic adapter" - "adds regression coverage for unsupported annotated declarations" +- "continues tightening runtime typing boundaries without breaking the public API" +- "adds smoke coverage for repository examples" ## Done When diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..f91a038 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,89 @@ +# Examples + +These examples are organized as a recommended learning path rather than a flat list. + +## Recommended Reading Order + +1. `annotated_schema.py` + - Start here if you want to learn the declaration style first. + - Shows the modern `Annotated[..., Field(...), ExcelMeta(...)]` pattern. +2. `employee_import_workflow.py` + - Read this next if you want to understand the core import story. + - Shows template generation, workbook upload, import execution, and result reporting. +3. `create_or_update_import.py` + - Read this after the basic import flow. + - Shows `ImporterConfig.for_create_or_update(...)` with `is_data_exist`, `creator`, and `updater`. +4. `export_workflow.py` + - Read this once the import flow is clear. + - Shows artifact generation, export uploads, and a custom storage-backed export task. +5. `custom_storage.py` + - Read this when you want to implement your own `ExcelStorage`. + - Keeps the example minimal and focused on the protocol boundary. +6. `date_and_range_fields.py` + - Read this if you want to understand workbook-friendly date, date range, number range, and money fields. +7. `selection_fields.py` + - Read this if your domain uses approval forms, assignments, ownership trees, or selection-heavy templates. +8. `minio_storage.py` + - Read this if you need the built-in Minio path in the current 2.x line. + - This reflects the current 2.x compatibility-based Minio path rather than a future 3.x-only storage story. +9. `fastapi_upload.py` + - Read this last as an integration sketch. + - It is useful once the import and storage examples already make sense. + +## By Goal + +- Learn the declaration style: + - `annotated_schema.py` +- Learn the core import flow: + - `employee_import_workflow.py` + - `create_or_update_import.py` +- Learn export and storage integration: + - `export_workflow.py` + - `custom_storage.py` + - `minio_storage.py` +- Learn field families: + - `date_and_range_fields.py` + - `selection_fields.py` +- Learn web integration: + - `fastapi_upload.py` + +## Storage and Backend Integration + +- `custom_storage.py` + - Shows a minimal custom `ExcelStorage` implementation for export uploads. +- `export_workflow.py` + - Shows a realistic export flow with artifact generation and upload. +- `minio_storage.py` + - Shows the built-in Minio-backed storage path currently available in the 2.x line. +- `fastapi_upload.py` + - Shows a FastAPI integration sketch for template download and workbook import. + +## How To Run + +Run examples from the repository root: + +```bash +uv run python examples/annotated_schema.py +uv run python examples/employee_import_workflow.py +uv run python examples/create_or_update_import.py +uv run python examples/date_and_range_fields.py +uv run python examples/selection_fields.py +uv run python examples/custom_storage.py +uv run python examples/export_workflow.py +uv run python examples/minio_storage.py +``` + +If you want to try the FastAPI sketch, install FastAPI first and then run your +preferred ASGI server against `examples.fastapi_upload:app`. + +## Notes + +- The examples intentionally use in-memory storage so they stay self-contained. +- They are meant to show the recommended public API shape for the stable 2.x + line. +- If you want a production backend, prefer `storage=...` with + `MinioStorageGateway` or your own `ExcelStorage` implementation. +- The built-in `minio_storage.py` example reflects the current 2.x Minio path, + which still uses the compatibility configuration fields under the hood. +- The smoke tests in `tests/integration/test_examples_smoke.py` cover the main + example entry points directly. diff --git a/examples/annotated_schema.py b/examples/annotated_schema.py index 3c22970..78f5f45 100644 --- a/examples/annotated_schema.py +++ b/examples/annotated_schema.py @@ -1,16 +1,18 @@ """Minimal example that uses Annotated + ExcelMeta declarations.""" -from __future__ import annotations - from typing import Annotated from pydantic import BaseModel, Field -from excelalchemy import Email, ExcelAlchemy, ExcelMeta, ImporterConfig, Number +from excelalchemy import Email, ExcelAlchemy, ExcelMeta, ImporterConfig, Number, String class EmployeeImporter(BaseModel): - full_name: Annotated[str, Field(min_length=2), ExcelMeta(label='Full name', order=1, hint='Use the legal name')] + full_name: Annotated[ + String, + Field(min_length=2), + ExcelMeta(label='Full name', order=1, hint='Use the legal name'), + ] age: Annotated[Number, Field(ge=18), ExcelMeta(label='Age', order=2)] work_email: Annotated[ Email, diff --git a/examples/create_or_update_import.py b/examples/create_or_update_import.py new file mode 100644 index 0000000..a108b62 --- /dev/null +++ b/examples/create_or_update_import.py @@ -0,0 +1,144 @@ +"""Create-or-update import example with explicit existence checks.""" + +import asyncio +import io +from base64 import b64decode + +from openpyxl import load_workbook +from pydantic import BaseModel + +from excelalchemy import Email, ExcelAlchemy, ExcelStorage, FieldMeta, ImporterConfig, ImportResult, String, UrlStr +from excelalchemy.core.table import WorksheetTable + + +class CustomerImporter(BaseModel): + customer_name: String = FieldMeta(label='Customer name', order=1) + work_email: Email = FieldMeta(label='Work email', order=2) + team: String = FieldMeta(label='Team', order=3) + + +class InMemoryUpsertStorage(ExcelStorage): + def __init__(self) -> None: + self.fixtures: dict[str, bytes] = {} + self.uploaded: dict[str, bytes] = {} + + def read_excel_table(self, input_excel_name: str, *, skiprows: int, sheet_name: str) -> WorksheetTable: + workbook = load_workbook(io.BytesIO(self.fixtures[input_excel_name]), data_only=True) + try: + worksheet = workbook[sheet_name] + rows = [ + [None if value is None else str(value) for value in row] + for row in worksheet.iter_rows( + min_row=skiprows + 1, + max_row=worksheet.max_row, + max_col=worksheet.max_column, + values_only=True, + ) + ] + finally: + workbook.close() + + while rows and all(value is None for value in rows[-1]): + rows.pop() + + return WorksheetTable(rows=rows) + + def upload_excel(self, output_name: str, content_with_prefix: str) -> UrlStr: + _, payload = content_with_prefix.split(',', 1) + self.uploaded[output_name] = b64decode(payload) + return UrlStr(f'memory://{output_name}') + + +def _build_import_fixture(storage: InMemoryUpsertStorage, template_bytes: bytes) -> None: + workbook = load_workbook(io.BytesIO(template_bytes)) + try: + worksheet = workbook['Sheet1'] + worksheet['A3'] = 'TaylorChen' + worksheet['B3'] = 'taylor.chen@example.com' + worksheet['C3'] = 'Finance' + + worksheet['A4'] = 'AveryStone' + worksheet['B4'] = 'avery.stone@example.com' + worksheet['C4'] = 'Operations' + + buffer = io.BytesIO() + workbook.save(buffer) + storage.fixtures['customer-import.xlsx'] = buffer.getvalue() + finally: + workbook.close() + + +async def is_customer_existing(row: dict[str, object], context: dict[str, object] | None) -> bool: + if context is None: + return False + existing_emails = context.setdefault('existing_emails', set()) + assert isinstance(existing_emails, set) + email = str(row['work_email']) + return email in existing_emails + + +async def create_customer(row: dict[str, object], context: dict[str, object] | None) -> dict[str, object]: + if context is not None: + created_rows = context.setdefault('created_rows', []) + existing_emails = context.setdefault('existing_emails', set()) + assert isinstance(created_rows, list) + assert isinstance(existing_emails, set) + created_rows.append(row.copy()) + existing_emails.add(str(row['work_email'])) + return row + + +async def update_customer(row: dict[str, object], context: dict[str, object] | None) -> dict[str, object]: + if context is not None: + updated_rows = context.setdefault('updated_rows', []) + assert isinstance(updated_rows, list) + updated_rows.append(row.copy()) + return row + + +async def run_workflow() -> tuple[ImportResult, InMemoryUpsertStorage, dict[str, object]]: + storage = InMemoryUpsertStorage() + context: dict[str, object] = { + 'existing_emails': {'taylor.chen@example.com'}, + 'created_rows': [], + 'updated_rows': [], + } + + alchemy = ExcelAlchemy( + ImporterConfig.for_create_or_update( + create_importer_model=CustomerImporter, + update_importer_model=CustomerImporter, + is_data_exist=is_customer_existing, + creator=create_customer, + updater=update_customer, + storage=storage, + locale='en', + ) + ) + alchemy.add_context(context) + + template = alchemy.download_template_artifact(filename='customer-template.xlsx') + _build_import_fixture(storage, template.as_bytes()) + result = await alchemy.import_data('customer-import.xlsx', 'customer-import-result.xlsx') + return result, storage, context + + +def main() -> None: + result, storage, context = asyncio.run(run_workflow()) + created_rows = context['created_rows'] + updated_rows = context['updated_rows'] + assert isinstance(created_rows, list) + assert isinstance(updated_rows, list) + + print('Create-or-update import workflow completed') + print(f'Result: {result.result}') + print(f'Success rows: {result.success_count}') + print(f'Failed rows: {result.fail_count}') + print(f'Created rows: {len(created_rows)}') + print(f'Updated rows: {len(updated_rows)}') + print(f'Result workbook URL: {result.url}') + print(f'Uploaded artifacts: {sorted(storage.uploaded)}') + + +if __name__ == '__main__': + main() diff --git a/examples/custom_storage.py b/examples/custom_storage.py index 442982b..237b54f 100644 --- a/examples/custom_storage.py +++ b/examples/custom_storage.py @@ -1,7 +1,5 @@ """Custom storage example that keeps uploaded workbooks in memory.""" -from __future__ import annotations - from base64 import b64decode from pydantic import BaseModel diff --git a/examples/date_and_range_fields.py b/examples/date_and_range_fields.py new file mode 100644 index 0000000..7bc1baf --- /dev/null +++ b/examples/date_and_range_fields.py @@ -0,0 +1,54 @@ +"""Example schema that focuses on date, range, and money workbook fields.""" + +from pydantic import BaseModel + +from excelalchemy import ( + DataRangeOption, + Date, + DateFormat, + DateRange, + ExcelAlchemy, + FieldMeta, + ImporterConfig, + Money, + NumberRange, +) + + +class CompensationImporter(BaseModel): + start_date: Date = FieldMeta( + label='Start date', + order=1, + date_format=DateFormat.DAY, + hint='Expected format: yyyy/mm/dd', + ) + probation_window: DateRange = FieldMeta( + label='Probation window', + order=2, + date_format=DateFormat.DAY, + date_range_option=DataRangeOption.NONE, + hint='Enter the probation start and end dates', + ) + salary_band: NumberRange = FieldMeta( + label='Salary band', + order=3, + fraction_digits=2, + unit='USD', + ) + signing_bonus: Money = FieldMeta( + label='Signing bonus', + order=4, + unit='USD', + hint='Use plain numbers without separators', + ) + + +def main() -> None: + alchemy = ExcelAlchemy(ImporterConfig.for_create(CompensationImporter, locale='en')) + template = alchemy.download_template_artifact(filename='compensation-template.xlsx') + print(f'Generated template: {template.filename} ({len(template.as_bytes())} bytes)') + print('Fields: Start date, Probation window, Salary band, Signing bonus') + + +if __name__ == '__main__': + main() diff --git a/examples/employee_import_workflow.py b/examples/employee_import_workflow.py new file mode 100644 index 0000000..d704c80 --- /dev/null +++ b/examples/employee_import_workflow.py @@ -0,0 +1,120 @@ +"""End-to-end import example with template generation and in-memory workbook uploads.""" + +import asyncio +import io +from base64 import b64decode + +from openpyxl import load_workbook +from pydantic import BaseModel + +from excelalchemy import ( + Email, + ExcelAlchemy, + ExcelStorage, + FieldMeta, + ImporterConfig, + ImportResult, + Number, + String, + UrlStr, +) +from excelalchemy.core.table import WorksheetTable + + +class EmployeeImporter(BaseModel): + full_name: String = FieldMeta(label='Full name', order=1, hint='Use the legal name') + age: Number = FieldMeta(label='Age', order=2) + work_email: Email = FieldMeta(label='Work email', order=3, hint='Use the company mailbox') + + +class InMemoryImportStorage(ExcelStorage): + def __init__(self) -> None: + self.fixtures: dict[str, bytes] = {} + self.uploaded: dict[str, bytes] = {} + + def read_excel_table(self, input_excel_name: str, *, skiprows: int, sheet_name: str) -> WorksheetTable: + workbook = load_workbook(io.BytesIO(self.fixtures[input_excel_name]), data_only=True) + try: + worksheet = workbook[sheet_name] + rows = [ + [None if value is None else str(value) for value in row] + for row in worksheet.iter_rows( + min_row=skiprows + 1, + max_row=worksheet.max_row, + max_col=worksheet.max_column, + values_only=True, + ) + ] + finally: + workbook.close() + + while rows and all(value is None for value in rows[-1]): + rows.pop() + + return WorksheetTable(rows=rows) + + def upload_excel(self, output_name: str, content_with_prefix: str) -> UrlStr: + _, payload = content_with_prefix.split(',', 1) + self.uploaded[output_name] = b64decode(payload) + return UrlStr(f'memory://{output_name}') + + +def _build_import_fixture(storage: InMemoryImportStorage, template_bytes: bytes) -> None: + workbook = load_workbook(io.BytesIO(template_bytes)) + try: + worksheet = workbook['Sheet1'] + worksheet['A3'] = 'TaylorChen' + worksheet['B3'] = '32' + worksheet['C3'] = 'taylor.chen@example.com' + + buffer = io.BytesIO() + workbook.save(buffer) + storage.fixtures['employee-import.xlsx'] = buffer.getvalue() + finally: + workbook.close() + + +async def create_employee(row: dict[str, object], context: dict[str, object] | None) -> dict[str, object]: + if context is not None: + created_rows = context.setdefault('created_rows', []) + assert isinstance(created_rows, list) + created_rows.append(row.copy()) + return row + + +async def run_workflow() -> tuple[ImportResult, InMemoryImportStorage, dict[str, object]]: + storage = InMemoryImportStorage() + context: dict[str, object] = {'created_rows': []} + + alchemy = ExcelAlchemy( + ImporterConfig.for_create( + EmployeeImporter, + creator=create_employee, + storage=storage, + locale='en', + ) + ) + alchemy.add_context(context) + + template = alchemy.download_template_artifact(filename='employee-template.xlsx') + _build_import_fixture(storage, template.as_bytes()) + result = await alchemy.import_data('employee-import.xlsx', 'employee-import-result.xlsx') + return result, storage, context + + +def main() -> None: + result, storage, context = asyncio.run(run_workflow()) + created_rows = context['created_rows'] + assert isinstance(created_rows, list) + + print('Employee import workflow completed') + print(f'Result: {result.result}') + print(f'Success rows: {result.success_count}') + print(f'Failed rows: {result.fail_count}') + print(f'Result workbook URL: {result.url}') + print(f'Created rows: {len(created_rows)}') + print(f'Uploaded artifacts: {sorted(storage.uploaded)}') + + +if __name__ == '__main__': + main() diff --git a/examples/export_workflow.py b/examples/export_workflow.py new file mode 100644 index 0000000..6ade454 --- /dev/null +++ b/examples/export_workflow.py @@ -0,0 +1,50 @@ +"""End-to-end export example with artifact generation and upload.""" + +from base64 import b64decode + +from pydantic import BaseModel + +from excelalchemy import ExcelAlchemy, ExcelStorage, ExporterConfig, FieldMeta, Number, String, UrlStr +from excelalchemy.core.table import WorksheetTable + + +class InMemoryExportStorage(ExcelStorage): + def __init__(self) -> None: + self.uploaded: dict[str, bytes] = {} + + def read_excel_table(self, input_excel_name: str, *, skiprows: int, sheet_name: str) -> WorksheetTable: + raise NotImplementedError('This example focuses on export workflows only') + + def upload_excel(self, output_name: str, content_with_prefix: str) -> UrlStr: + _, payload = content_with_prefix.split(',', 1) + self.uploaded[output_name] = b64decode(payload) + return UrlStr(f'memory://{output_name}') + + +class EmployeeExporter(BaseModel): + full_name: String = FieldMeta(label='Full name', order=1) + team: String = FieldMeta(label='Team', order=2) + age: Number = FieldMeta(label='Age', order=3) + + +def main() -> None: + rows = [ + {'full_name': 'TaylorChen', 'team': 'Finance', 'age': 32}, + {'full_name': 'AveryStone', 'team': 'Operations', 'age': 29}, + ] + + storage = InMemoryExportStorage() + alchemy = ExcelAlchemy(ExporterConfig.for_storage(EmployeeExporter, storage=storage, locale='en')) + + artifact = alchemy.export_artifact(rows, filename='employees-export.xlsx') + uploaded_url = alchemy.export_upload('employees-export-upload.xlsx', rows) + + print('Export workflow completed') + print(f'Artifact filename: {artifact.filename}') + print(f'Artifact bytes: {len(artifact.as_bytes())}') + print(f'Upload URL: {uploaded_url}') + print(f'Uploaded objects: {sorted(storage.uploaded)}') + + +if __name__ == '__main__': + main() diff --git a/examples/fastapi_upload.py b/examples/fastapi_upload.py index 9cc3dbd..66f344a 100644 --- a/examples/fastapi_upload.py +++ b/examples/fastapi_upload.py @@ -1,7 +1,5 @@ """FastAPI integration sketch for template download and workbook import.""" -from __future__ import annotations - from io import BytesIO from fastapi import FastAPI, HTTPException, UploadFile diff --git a/examples/minio_storage.py b/examples/minio_storage.py new file mode 100644 index 0000000..0ea24b3 --- /dev/null +++ b/examples/minio_storage.py @@ -0,0 +1,42 @@ +"""Built-in Minio storage example for the current 2.x line.""" + +from minio import Minio +from pydantic import BaseModel + +from excelalchemy import ExcelAlchemy, FieldMeta, ImporterConfig, Number, String +from excelalchemy.core.storage import build_storage_gateway +from excelalchemy.core.storage_minio import MinioStorageGateway + + +class EmployeeImporter(BaseModel): + full_name: String = FieldMeta(label='Full name', order=1) + age: Number = FieldMeta(label='Age', order=2) + + +def main() -> None: + minio_client = Minio( + 'localhost:9000', + access_key='minioadmin', + secret_key='minioadmin', + secure=False, + ) + config = ImporterConfig.for_create( + EmployeeImporter, + creator=lambda row, context: row, + minio=minio_client, + bucket_name='excel-files', + locale='en', + ) + + gateway = build_storage_gateway(config) + alchemy = ExcelAlchemy(config) + template = alchemy.download_template_artifact(filename='employee-template.xlsx') + + print(f'Built gateway: {type(gateway).__name__}') + print(f'Uses built-in Minio path: {config.storage_options.uses_legacy_minio_path}') + print(f'Template bytes: {len(template.as_bytes())}') + print(f'Gateway type check: {isinstance(gateway, MinioStorageGateway)}') + + +if __name__ == '__main__': + main() diff --git a/examples/selection_fields.py b/examples/selection_fields.py new file mode 100644 index 0000000..0f38432 --- /dev/null +++ b/examples/selection_fields.py @@ -0,0 +1,53 @@ +"""Example schema that focuses on selection-heavy business forms.""" + +from pydantic import BaseModel + +from excelalchemy import ( + ExcelAlchemy, + FieldMeta, + ImporterConfig, + MultiCheckbox, + MultiOrganization, + MultiStaff, + Option, + OptionId, + Radio, + SingleOrganization, + SingleStaff, +) + +DEPARTMENT_OPTIONS = [ + Option(id=OptionId('finance'), name='Finance'), + Option(id=OptionId('operations'), name='Operations'), + Option(id=OptionId('engineering'), name='Engineering'), +] + +ORGANIZATION_OPTIONS = [ + Option(id=OptionId('org-finance'), name='Acme/Finance'), + Option(id=OptionId('org-ops'), name='Acme/Operations'), +] + +STAFF_OPTIONS = [ + Option(id=OptionId('staff-taylor'), name='TaylorChen'), + Option(id=OptionId('staff-avery'), name='AveryStone'), +] + + +class ApprovalFormImporter(BaseModel): + request_type: Radio = FieldMeta(label='Request type', order=1, options=DEPARTMENT_OPTIONS) + impacted_teams: MultiCheckbox = FieldMeta(label='Impacted teams', order=2, options=DEPARTMENT_OPTIONS) + owner_org: SingleOrganization = FieldMeta(label='Owner organization', order=3, options=ORGANIZATION_OPTIONS) + partner_orgs: MultiOrganization = FieldMeta(label='Partner organizations', order=4, options=ORGANIZATION_OPTIONS) + owner: SingleStaff = FieldMeta(label='Owner', order=5, options=STAFF_OPTIONS) + reviewers: MultiStaff = FieldMeta(label='Reviewers', order=6, options=STAFF_OPTIONS) + + +def main() -> None: + alchemy = ExcelAlchemy(ImporterConfig.for_create(ApprovalFormImporter, locale='en')) + template = alchemy.download_template_artifact(filename='selection-fields-template.xlsx') + print(f'Generated template: {template.filename} ({len(template.as_bytes())} bytes)') + print('Fields: Request type, Impacted teams, Owner organization, Partner organizations, Owner, Reviewers') + + +if __name__ == '__main__': + main() diff --git a/src/excelalchemy/__init__.py b/src/excelalchemy/__init__.py index 01061ae..65c9bc1 100644 --- a/src/excelalchemy/__init__.py +++ b/src/excelalchemy/__init__.py @@ -1,6 +1,6 @@ """A Python Library for Reading and Writing Excel Files""" -__version__ = '2.2.3' +__version__ = '2.2.4' from excelalchemy._primitives.constants import CharacterSet, DataRangeOption, DateFormat, Option from excelalchemy._primitives.deprecation import ExcelAlchemyDeprecationWarning from excelalchemy._primitives.identity import ( diff --git a/src/excelalchemy/_primitives/constants.py b/src/excelalchemy/_primitives/constants.py index ae8a478..f6b6f50 100644 --- a/src/excelalchemy/_primitives/constants.py +++ b/src/excelalchemy/_primitives/constants.py @@ -1,6 +1,5 @@ from dataclasses import dataclass from enum import StrEnum -from typing import Any from excelalchemy._primitives.identity import Key, Label, OptionId from excelalchemy.i18n.messages import MessageKey @@ -38,8 +37,6 @@ MAX_OPTIONS_COUNT = 100 DEFAULT_FIELD_META_ORDER = -1 -type DictStrAny = dict[str, Any] -type DictAny = dict[Any, Any] type SetStr = set[str] type ListStr = list[str] type IntStr = int | str diff --git a/src/excelalchemy/_primitives/identity.py b/src/excelalchemy/_primitives/identity.py index e768f59..73aa23c 100644 --- a/src/excelalchemy/_primitives/identity.py +++ b/src/excelalchemy/_primitives/identity.py @@ -1,7 +1,5 @@ """Internal typed primitives used across the ExcelAlchemy core layer.""" -from typing import Any - from pydantic import GetCoreSchemaHandler from pydantic_core import core_schema @@ -10,7 +8,7 @@ class _StringIdentity(str): @classmethod def __get_pydantic_core_schema__( cls, - source_type: Any, + source_type: object, handler: GetCoreSchemaHandler, ) -> core_schema.CoreSchema: return core_schema.no_info_after_validator_function(cls, core_schema.str_schema()) @@ -20,7 +18,7 @@ class _IntegerIdentity(int): @classmethod def __get_pydantic_core_schema__( cls, - source_type: Any, + source_type: object, handler: GetCoreSchemaHandler, ) -> core_schema.CoreSchema: return core_schema.no_info_after_validator_function(cls, core_schema.int_schema()) diff --git a/src/excelalchemy/codecs/base.py b/src/excelalchemy/codecs/base.py index 0f2faa5..9b79bb3 100644 --- a/src/excelalchemy/codecs/base.py +++ b/src/excelalchemy/codecs/base.py @@ -11,6 +11,13 @@ if TYPE_CHECKING: from excelalchemy.metadata import FieldMetaInfo +# These aliases remain `Any` intentionally because codec subclasses narrow their +# accepted workbook values heavily. Using `object` here makes every override +# incompatible under pyright's method override rules. +type WorkbookInputValue = Any +type WorkbookDisplayValue = Any +type NormalizedImportValue = Any + class ExcelFieldCodec(ABC): """Excel-facing field adapter responsible for comments, parsing, formatting, and normalization.""" @@ -22,17 +29,17 @@ def build_comment(cls, field_meta: FieldMetaInfo) -> str: @classmethod @abstractmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> Any: # value is always not None + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: """Parse workbook input into the intermediate Python value consumed by the import pipeline.""" @classmethod @abstractmethod - def format_display_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def format_display_value(cls, value: WorkbookDisplayValue, field_meta: FieldMetaInfo) -> WorkbookDisplayValue: """Format a raw worksheet value back into a user-recognizable display value.""" @classmethod @abstractmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def normalize_import_value(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> NormalizedImportValue: """Validate and normalize parsed input before handing it to the Pydantic layer.""" @classmethod @@ -41,24 +48,24 @@ def comment(cls, field_meta: FieldMetaInfo) -> str: return cls.build_comment(field_meta) @classmethod - def serialize(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def serialize(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: """Backward-compatible alias for parse_input().""" return cls.parse_input(value, field_meta) @classmethod - def deserialize(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def deserialize(cls, value: WorkbookDisplayValue, field_meta: FieldMetaInfo) -> WorkbookDisplayValue: """Backward-compatible alias for format_display_value().""" return cls.format_display_value(value, field_meta) @classmethod - def __validate__(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def __validate__(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> NormalizedImportValue: """Backward-compatible alias for normalize_import_value().""" return cls.normalize_import_value(value, field_meta) @classmethod def __get_pydantic_core_schema__( cls, - source_type: Any, + source_type: object, handler: GetCoreSchemaHandler, ) -> core_schema.CoreSchema: # ExcelAlchemy runs metadata-aware validation in its adapter layer. @@ -88,15 +95,23 @@ def build_comment(cls, field_meta: FieldMetaInfo) -> str: return '' @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: return value @classmethod - def format_display_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def format_display_value( + cls, + value: WorkbookDisplayValue, + field_meta: FieldMetaInfo, + ) -> WorkbookDisplayValue: return value @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def normalize_import_value( + cls, + value: WorkbookInputValue, + field_meta: FieldMetaInfo, + ) -> NormalizedImportValue: return value @@ -108,15 +123,23 @@ def build_comment(cls, field_meta: FieldMetaInfo) -> str: return '' @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: return value @classmethod - def format_display_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def format_display_value( + cls, + value: WorkbookDisplayValue, + field_meta: FieldMetaInfo, + ) -> WorkbookDisplayValue: return value @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def normalize_import_value( + cls, + value: WorkbookInputValue, + field_meta: FieldMetaInfo, + ) -> NormalizedImportValue: return value diff --git a/src/excelalchemy/codecs/boolean.py b/src/excelalchemy/codecs/boolean.py index cde48ea..31be18b 100644 --- a/src/excelalchemy/codecs/boolean.py +++ b/src/excelalchemy/codecs/boolean.py @@ -1,8 +1,7 @@ import logging -from typing import Any from excelalchemy.codecs import excel_choice_codec -from excelalchemy.codecs.base import ExcelFieldCodec +from excelalchemy.codecs.base import ExcelFieldCodec, WorkbookDisplayValue, WorkbookInputValue from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import display_message as dmsg from excelalchemy.i18n.messages import message as msg @@ -31,19 +30,26 @@ def _false_values(cls) -> set[str]: @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, - field_meta.comment_hint, + declared.comment_required, + presentation.comment_hint, ] ) @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> str: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> str: return str(value).strip() @classmethod - def format_display_value(cls, value: bool | str | None | Any, field_meta: FieldMetaInfo) -> str: + def format_display_value( + cls, + value: bool | str | WorkbookDisplayValue | None, + field_meta: FieldMetaInfo, + ) -> str: + declared = field_meta.declared if value is None or value == '': return cls._false_display() @@ -64,14 +70,14 @@ def format_display_value(cls, value: bool | str | None | Any, field_meta: FieldM 'Type %s could not deserialize %s for field %s; returning the default value %s', cls.__name__, value, - field_meta.label, + declared.label, cls._false_display(), ) return cls._true_display() if str(value) in cls._true_values() else cls._false_display() @classmethod - def normalize_import_value(cls, value: str | bool | Any, field_meta: FieldMetaInfo) -> bool: + def normalize_import_value(cls, value: str | bool | WorkbookInputValue, field_meta: FieldMetaInfo) -> bool: if isinstance(value, bool): return value diff --git a/src/excelalchemy/codecs/date.py b/src/excelalchemy/codecs/date.py index 7bd07ba..b5f8e0b 100644 --- a/src/excelalchemy/codecs/date.py +++ b/src/excelalchemy/codecs/date.py @@ -1,12 +1,17 @@ import logging from datetime import datetime -from typing import Any, cast +from typing import cast import pendulum from pendulum import DateTime from excelalchemy._primitives.constants import DATE_FORMAT_TO_HINT_MAPPING, MILLISECOND_TO_SECOND, DataRangeOption -from excelalchemy.codecs.base import ExcelFieldCodec +from excelalchemy.codecs.base import ( + ExcelFieldCodec, + NormalizedImportValue, + WorkbookDisplayValue, + WorkbookInputValue, +) from excelalchemy.exceptions import ConfigError from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import message as msg @@ -18,36 +23,44 @@ class Date(ExcelFieldCodec, datetime): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - if not field_meta.date_format: + declared = field_meta.declared + presentation = field_meta.presentation + if not presentation.date_format: raise ConfigError(msg(MessageKey.DATE_FORMAT_NOT_CONFIGURED)) return '\n'.join( [ - field_meta.comment_required, - field_meta.comment_date_format, - field_meta.comment_date_range_option, - field_meta.comment_hint, + declared.comment_required, + presentation.comment_date_format, + presentation.comment_date_range_option, + presentation.comment_hint, ] ) @classmethod - def parse_input(cls, value: str | DateTime | Any, field_meta: FieldMetaInfo) -> datetime | Any: + def parse_input( + cls, + value: str | DateTime | WorkbookInputValue, + field_meta: FieldMetaInfo, + ) -> datetime | WorkbookInputValue: + declared = field_meta.declared + presentation = field_meta.presentation if isinstance(value, DateTime): logging.info( 'Codec %s received a parsed datetime for %s; returning it unchanged: %s', cls.__name__, - field_meta.label, + declared.label, value, ) return value - if not field_meta.date_format: + if not presentation.date_format: raise ConfigError(msg(MessageKey.DATE_FORMAT_NOT_CONFIGURED)) value = str(value).strip() try: v = value.replace('/', '-') # pendulum does not accept "/" as a date separator here. dt: DateTime = cast(DateTime, pendulum.parse(v)) - return dt.replace(tzinfo=field_meta.timezone) + return dt.replace(tzinfo=presentation.timezone) except Exception as exc: logging.warning( 'ValueType <%s> could not parse Excel input %s; returning the original value. Reason: %s', @@ -58,27 +71,31 @@ def parse_input(cls, value: str | DateTime | Any, field_meta: FieldMetaInfo) -> return value @classmethod - def format_display_value(cls, value: str | datetime | None | Any, field_meta: FieldMetaInfo) -> str: + def format_display_value( + cls, + value: str | datetime | WorkbookDisplayValue | None, + field_meta: FieldMetaInfo, + ) -> str: + presentation = field_meta.presentation match value: case None | '': return '' case datetime(): - return value.strftime(field_meta.python_date_format) + return value.strftime(presentation.python_date_format) case int() | float(): - return datetime.fromtimestamp(int(value) / MILLISECOND_TO_SECOND).strftime( - field_meta.python_date_format - ) + return datetime.fromtimestamp(int(value) / MILLISECOND_TO_SECOND).strftime(presentation.python_date_format) case _: return str(value) if value is not None else '' @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> int: - if field_meta.date_format is None: + def normalize_import_value(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> NormalizedImportValue: + presentation = field_meta.presentation + if presentation.date_format is None: raise ConfigError(msg(MessageKey.DATE_FORMAT_NOT_CONFIGURED)) if not isinstance(value, datetime): raise ValueError( - msg(MessageKey.ENTER_DATE_FORMAT, date_format=DATE_FORMAT_TO_HINT_MAPPING[field_meta.date_format]) + msg(MessageKey.ENTER_DATE_FORMAT, date_format=DATE_FORMAT_TO_HINT_MAPPING[presentation.date_format]) ) parsed = cls._parse_date(value, field_meta) @@ -91,17 +108,19 @@ def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> int: @staticmethod def _parse_date(v: datetime, field_meta: FieldMetaInfo) -> datetime: - format_ = field_meta.python_date_format + presentation = field_meta.presentation + format_ = presentation.python_date_format parsed = datetime.strptime(v.strftime(format_), format_) - parsed = parsed.replace(tzinfo=field_meta.timezone) + parsed = parsed.replace(tzinfo=presentation.timezone) return parsed @staticmethod def _validate_date_range(parsed: datetime, field_meta: FieldMetaInfo) -> list[str]: - now = datetime.now(tz=field_meta.timezone) + presentation = field_meta.presentation + now = datetime.now(tz=presentation.timezone) errors: list[str] = [] - match field_meta.date_range_option: + match presentation.date_range_option: case DataRangeOption.PRE: if parsed > now: errors.append(msg(MessageKey.DATE_MUST_BE_EARLIER_THAN_NOW)) diff --git a/src/excelalchemy/codecs/date_range.py b/src/excelalchemy/codecs/date_range.py index 2ec264a..7c6aeb7 100644 --- a/src/excelalchemy/codecs/date_range.py +++ b/src/excelalchemy/codecs/date_range.py @@ -1,7 +1,7 @@ import logging from collections.abc import Mapping from datetime import datetime -from typing import Any, cast +from typing import cast import pendulum from pendulum import DateTime @@ -28,7 +28,7 @@ class DateRange(CompositeExcelFieldCodec): __name__ = 'DateRange' @classmethod - def model_validate(cls, obj: Any) -> 'DateRange': + def model_validate(cls, obj: object) -> 'DateRange': impl = _DateRangeImpl.model_validate(obj) self = cls(impl.start, impl.end) return self @@ -50,14 +50,16 @@ def column_items(cls) -> list[tuple[Key, FieldMetaInfo]]: @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - if field_meta.date_format is None: + declared = field_meta.declared + presentation = field_meta.presentation + if presentation.date_format is None: raise RuntimeError(msg(MessageKey.DATE_FORMAT_NOT_CONFIGURED)) return '\n'.join( [ - field_meta.comment_required, - field_meta.comment_date_format, - dmsg(MessageKey.COMMENT_DATE_RANGE_START_NOT_AFTER_END, extra_hint=field_meta.hint or ''), + declared.comment_required, + presentation.comment_date_format, + dmsg(MessageKey.COMMENT_DATE_RANGE_START_NOT_AFTER_END, extra_hint=presentation.hint or ''), ] ) @@ -92,20 +94,21 @@ def normalize_import_value( value: object, field_meta: FieldMetaInfo, ) -> 'DateRange': + presentation = field_meta.presentation try: parsed = DateRange.model_validate(value) - parsed.start = pendulum.instance(parsed.start, tz=field_meta.timezone) if parsed.start else None - parsed.end = pendulum.instance(parsed.end, tz=field_meta.timezone) if parsed.end else None + parsed.start = pendulum.instance(parsed.start, tz=presentation.timezone) if parsed.start else None + parsed.end = pendulum.instance(parsed.end, tz=presentation.timezone) if parsed.end else None except Exception as exc: raise ValueError(msg(MessageKey.INVALID_INPUT)) from exc errors: list[str] = [] - now = datetime.now(tz=field_meta.timezone) + now = datetime.now(tz=presentation.timezone) if parsed.start and parsed.end and parsed.start > parsed.end: errors.append(msg(MessageKey.DATE_RANGE_START_AFTER_END)) - match field_meta.date_range_option: + match presentation.date_range_option: case DataRangeOption.PRE: if (parsed.start and parsed.start > now) or (parsed.end and parsed.end > now): errors.append(msg(MessageKey.DATE_MUST_BE_EARLIER_THAN_NOW)) @@ -124,7 +127,8 @@ def normalize_import_value( def format_display_value(cls, value: object | None, field_meta: FieldMetaInfo) -> str: if value is None or value == '': return '' - date_format = field_meta.must_date_format + presentation = field_meta.presentation + date_format = presentation.must_date_format py_date_format = DATE_FORMAT_TO_PYTHON_MAPPING[date_format] if isinstance(value, str): @@ -178,11 +182,12 @@ def _parse_optional_datetime(value: object, field_meta: FieldMetaInfo) -> DateTi @staticmethod def _parse_datetime_text(value: str, field_meta: FieldMetaInfo) -> DateTime: + presentation = field_meta.presentation parsed = pendulum.parse(value) if isinstance(parsed, DateTime): - return parsed.replace(tzinfo=field_meta.timezone) + return parsed.replace(tzinfo=presentation.timezone) if isinstance(parsed, datetime): - return pendulum.instance(parsed).replace(tzinfo=field_meta.timezone) + return pendulum.instance(parsed).replace(tzinfo=presentation.timezone) raise ValueError(msg(MessageKey.INVALID_INPUT)) diff --git a/src/excelalchemy/codecs/money.py b/src/excelalchemy/codecs/money.py index ac43eee..52b88c9 100644 --- a/src/excelalchemy/codecs/money.py +++ b/src/excelalchemy/codecs/money.py @@ -1,5 +1,7 @@ -from typing import Any, ClassVar +from dataclasses import replace +from typing import ClassVar +from excelalchemy.codecs.base import NormalizedImportValue, WorkbookDisplayValue, WorkbookInputValue from excelalchemy.codecs.number import Number from excelalchemy.metadata import FieldMetaInfo @@ -10,7 +12,10 @@ class Money(Number): @classmethod def _money_field_meta(cls, field_meta: FieldMetaInfo) -> FieldMetaInfo: money_field_meta = field_meta.clone() - money_field_meta.fraction_digits = cls.MONEY_FRACTION_DIGITS + money_field_meta.presentation_meta = replace( + money_field_meta.presentation_meta, + fraction_digits=cls.MONEY_FRACTION_DIGITS, + ) return money_field_meta @classmethod @@ -18,11 +23,19 @@ def build_comment(cls, field_meta: FieldMetaInfo) -> str: return super().build_comment(cls._money_field_meta(field_meta)) @classmethod - def format_display_value(cls, value: str | None | Any, field_meta: FieldMetaInfo) -> str: + def format_display_value( + cls, + value: str | WorkbookDisplayValue | None, + field_meta: FieldMetaInfo, + ) -> str: return super().format_display_value(value, cls._money_field_meta(field_meta)) @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> float | int: + def normalize_import_value( + cls, + value: WorkbookInputValue, + field_meta: FieldMetaInfo, + ) -> NormalizedImportValue: return super().normalize_import_value(value, cls._money_field_meta(field_meta)) diff --git a/src/excelalchemy/codecs/multi_checkbox.py b/src/excelalchemy/codecs/multi_checkbox.py index d9054a4..67f6f42 100644 --- a/src/excelalchemy/codecs/multi_checkbox.py +++ b/src/excelalchemy/codecs/multi_checkbox.py @@ -16,12 +16,14 @@ class MultiCheckbox(ExcelFieldCodec, list[str]): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, - field_meta.comment_options, + declared.comment_required, + presentation.comment_options, dmsg(MessageKey.COMMENT_SELECTION_MODE, value=dmsg(MessageKey.COMMENT_SELECTION_VALUE_MULTI)), - field_meta.comment_hint, + presentation.comment_hint, ] ) @@ -41,25 +43,27 @@ def parse_input(cls, value: object, field_meta: FieldMetaInfo) -> list[str] | ob @classmethod def normalize_import_value(cls, value: object, field_meta: FieldMetaInfo) -> list[str]: # OptionId + declared = field_meta.declared + presentation = field_meta.presentation if not isinstance(value, list): raise ValueError(msg(MessageKey.OPTION_NOT_FOUND_HEADER_COMMENT)) items = cast(list[object], value) parsed = [str(item).strip() for item in items] - if field_meta.options is None: + if presentation.options is None: raise ProgrammaticError(msg(MessageKey.OPTIONS_CANNOT_BE_NONE_FOR_VALUE_TYPE, value_type=cls.__name__)) - if not field_meta.options: # empty + if not presentation.options: # empty logging.warning( - 'Field %s of type %s has no options; returning the original value', field_meta.label, cls.__name__ + 'Field %s of type %s has no options; returning the original value', declared.label, cls.__name__ ) return parsed if len(parsed) != len(set(parsed)): raise ValueError(msg(MessageKey.OPTIONS_CONTAIN_DUPLICATES)) - result, errors = field_meta.exchange_names_to_option_ids_with_errors(parsed) + result, errors = presentation.exchange_names_to_option_ids_with_errors(parsed, field_label=declared.label) if errors: raise ValueError(*errors) @@ -68,6 +72,8 @@ def normalize_import_value(cls, value: object, field_meta: FieldMetaInfo) -> lis @classmethod def format_display_value(cls, value: str | list[OptionId] | None, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation match value: case None | '': return '' @@ -75,7 +81,7 @@ def format_display_value(cls, value: str | list[OptionId] | None, field_meta: Fi return value case list(): option_ids = [OptionId(option_id) for option_id in value] - option_names = field_meta.exchange_option_ids_to_names(option_ids) + option_names = presentation.exchange_option_ids_to_names(option_ids, field_label=declared.label) return f'{MULTI_CHECKBOX_SEPARATOR}'.join(option_names) diff --git a/src/excelalchemy/codecs/number.py b/src/excelalchemy/codecs/number.py index 36f151e..dd3129a 100644 --- a/src/excelalchemy/codecs/number.py +++ b/src/excelalchemy/codecs/number.py @@ -1,8 +1,12 @@ import logging from decimal import ROUND_DOWN, Context, Decimal, InvalidOperation -from typing import Any -from excelalchemy.codecs.base import ExcelFieldCodec +from excelalchemy.codecs.base import ( + ExcelFieldCodec, + NormalizedImportValue, + WorkbookDisplayValue, + WorkbookInputValue, +) from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import display_message as dmsg from excelalchemy.i18n.messages import message as msg @@ -42,18 +46,24 @@ class Number(Decimal, ExcelFieldCodec): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, + declared.comment_required, dmsg(MessageKey.COMMENT_NUMBER_FORMAT), - field_meta.comment_fraction_digits, + presentation.comment_fraction_digits, dmsg(MessageKey.COMMENT_NUMBER_INPUT_RANGE, value=cls.__get_range_description__(field_meta)), - field_meta.comment_unit, + presentation.comment_unit, ] ) @classmethod - def parse_input(cls, value: str | int | float | None, field_meta: FieldMetaInfo) -> Decimal | Any: + def parse_input( + cls, + value: str | int | float | WorkbookInputValue | None, + field_meta: FieldMetaInfo, + ) -> Decimal | WorkbookInputValue: if isinstance(value, str): value = value.strip() if value is None: @@ -70,7 +80,11 @@ def parse_input(cls, value: str | int | float | None, field_meta: FieldMetaInfo) return str(value) @classmethod - def format_display_value(cls, value: str | None | Any, field_meta: FieldMetaInfo) -> str: + def format_display_value( + cls, + value: str | WorkbookDisplayValue | None, + field_meta: FieldMetaInfo, + ) -> str: if value is None or value == '': return '' @@ -86,19 +100,21 @@ def format_display_value(cls, value: str | None | Any, field_meta: FieldMetaInfo return str(value) @classmethod - def __get_range_description__(cls, field_meta: FieldMetaInfo) -> str: # type: ignore[return] - match (field_meta.importer_le, field_meta.importer_ge): - case (None, None): - return dmsg(MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY) - case (_, None): - return f'≤ {field_meta.importer_le}' - case (None, _): - return f'≥ {field_meta.importer_ge}' - case (le, ge): - return f'{ge}~{le}' + def __get_range_description__(cls, field_meta: FieldMetaInfo) -> str: + constraints = field_meta.constraints + upper_bound = constraints.le + lower_bound = constraints.ge + + if upper_bound is None and lower_bound is None: + return dmsg(MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY) + if lower_bound is None: + return f'≤ {upper_bound}' + if upper_bound is None: + return f'≥ {lower_bound}' + return f'{lower_bound}~{upper_bound}' @staticmethod - def __maybe_decimal__(value: Any) -> Decimal | None: + def __maybe_decimal__(value: WorkbookInputValue) -> Decimal | None: # Convert non-Decimal input through Decimal for validation. if isinstance(value, Decimal): return value @@ -113,38 +129,44 @@ def __maybe_decimal__(value: Any) -> Decimal | None: @staticmethod def __check_range__(value: Decimal | float | int, field_meta: FieldMetaInfo) -> list[str]: errors: list[str] = [] + constraints = field_meta.constraints # Read the configured importer bounds from field metadata. - importer_le = field_meta.importer_le or Decimal('Infinity') - importer_ge = field_meta.importer_ge or Decimal('-Infinity') + importer_le = constraints.le or Decimal('Infinity') + importer_ge = constraints.ge or Decimal('-Infinity') # Ensure the parsed decimal stays within the accepted range. if not importer_ge <= value <= importer_le: - if field_meta.importer_le and field_meta.importer_ge: + if constraints.le and constraints.ge: errors.append( msg( MessageKey.NUMBER_BETWEEN_MIN_AND_MAX, - minimum=field_meta.importer_ge, - maximum=field_meta.importer_le, + minimum=constraints.ge, + maximum=constraints.le, ) ) - elif field_meta.importer_le: - errors.append(msg(MessageKey.NUMBER_BETWEEN_NEG_INF_AND_MAX, maximum=field_meta.importer_le)) - elif field_meta.importer_ge: - errors.append(msg(MessageKey.NUMBER_BETWEEN_MIN_AND_POS_INF, minimum=field_meta.importer_ge)) + elif constraints.le: + errors.append(msg(MessageKey.NUMBER_BETWEEN_NEG_INF_AND_MAX, maximum=constraints.le)) + elif constraints.ge: + errors.append(msg(MessageKey.NUMBER_BETWEEN_MIN_AND_POS_INF, minimum=constraints.ge)) return errors @classmethod - def normalize_import_value(cls, value: Decimal | Any, field_meta: FieldMetaInfo) -> float | int: + def normalize_import_value( + cls, + value: Decimal | WorkbookInputValue, + field_meta: FieldMetaInfo, + ) -> NormalizedImportValue: # Convert non-Decimal input before range validation. + presentation = field_meta.presentation parsed = cls.__maybe_decimal__(value) if parsed is None: raise ValueError(msg(MessageKey.INVALID_NUMBER_ENTER_NUMBER)) errors: list[str] = cls.__check_range__(parsed, field_meta) if errors: raise ValueError(*errors) - parsed = canonicalize_decimal(parsed, field_meta.fraction_digits) + parsed = canonicalize_decimal(parsed, presentation.fraction_digits) value = transform_decimal(parsed) if value is None: raise ValueError(msg(MessageKey.INVALID_NUMBER_ENTER_NUMBER)) diff --git a/src/excelalchemy/codecs/number_range.py b/src/excelalchemy/codecs/number_range.py index 1f2fd64..d9701dc 100644 --- a/src/excelalchemy/codecs/number_range.py +++ b/src/excelalchemy/codecs/number_range.py @@ -63,10 +63,11 @@ def format_display_value(cls, value: object | None, field_meta: FieldMetaInfo) - if value is None or value == '': return '' try: + presentation = field_meta.presentation parsed = cls._parse_decimal_boundary(value) if parsed is None: return '' - return str(transform_decimal(canonicalize_decimal(parsed, field_meta.fraction_digits))) + return str(transform_decimal(canonicalize_decimal(parsed, presentation.fraction_digits))) except Exception as exc: logging.warning( 'ValueType <%s> could not parse Excel input %s; returning the original value. Reason: %s', @@ -132,10 +133,11 @@ def _parse_decimal_boundary(value: object) -> Decimal | None: @staticmethod def _canonicalize_boundary(value: object, field_meta: FieldMetaInfo) -> Decimal | None: + presentation = field_meta.presentation parsed = NumberRange._parse_decimal_boundary(value) if parsed is None: return None - return canonicalize_decimal(parsed, field_meta.fraction_digits) + return canonicalize_decimal(parsed, presentation.fraction_digits) NumberRangeCodec = NumberRange diff --git a/src/excelalchemy/codecs/organization.py b/src/excelalchemy/codecs/organization.py index 66db89f..e54f8dd 100644 --- a/src/excelalchemy/codecs/organization.py +++ b/src/excelalchemy/codecs/organization.py @@ -15,10 +15,12 @@ class SingleOrganization(Radio): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - extra_hint = field_meta.hint or dmsg(MessageKey.SINGLE_ORGANIZATION_HINT) + declared = field_meta.declared + presentation = field_meta.presentation + extra_hint = presentation.hint or dmsg(MessageKey.SINGLE_ORGANIZATION_HINT) value_key = ( MessageKey.COMMENT_REQUIRED_VALUE_REQUIRED - if field_meta.required + if declared.effective_required else MessageKey.COMMENT_REQUIRED_VALUE_OPTIONAL ) return '\n'.join( @@ -31,10 +33,12 @@ def parse_input(cls, value: object, field_meta: FieldMetaInfo) -> str: @classmethod def format_display_value(cls, value: object, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation if not isinstance(value, str): return '' if value is None else str(value) try: - return field_meta.options_id_map[OptionId(value.strip())].name + return presentation.options_id_map(field_label=declared.label)[OptionId(value.strip())].name except KeyError: logging.warning('Could not resolve organization option %s; returning the original value', value) @@ -46,10 +50,12 @@ class MultiOrganization(MultiCheckbox): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, - dmsg(MessageKey.COMMENT_HINT, value=field_meta.hint or dmsg(MessageKey.MULTI_ORGANIZATION_HINT)), + declared.comment_required, + dmsg(MessageKey.COMMENT_HINT, value=presentation.hint or dmsg(MessageKey.MULTI_ORGANIZATION_HINT)), ] ) @@ -59,6 +65,8 @@ def parse_input(cls, value: object, field_meta: FieldMetaInfo) -> object: @classmethod def format_display_value(cls, value: object | None, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation if value is None or value == '': return '' @@ -68,7 +76,7 @@ def format_display_value(cls, value: object | None, field_meta: FieldMetaInfo) - if isinstance(value, list): items = cast(list[object], value) option_ids = [OptionId(option_id) for option_id in items] - option_names = field_meta.exchange_option_ids_to_names(option_ids) + option_names = presentation.exchange_option_ids_to_names(option_ids, field_label=declared.label) return MULTI_CHECKBOX_SEPARATOR.join(map(str, option_names)) logging.warning('%s could not be deserialized; returning the original value', cls.__name__) diff --git a/src/excelalchemy/codecs/phone_number.py b/src/excelalchemy/codecs/phone_number.py index f8309af..b95f525 100644 --- a/src/excelalchemy/codecs/phone_number.py +++ b/src/excelalchemy/codecs/phone_number.py @@ -1,6 +1,6 @@ import re -from typing import Any +from excelalchemy.codecs.base import WorkbookInputValue from excelalchemy.codecs.string import String from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import message as msg @@ -11,7 +11,7 @@ class PhoneNumber(String): @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> str: + def normalize_import_value(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> str: parsed = str(value) if not PHONE_NUMBER_PATTERN.match(parsed): diff --git a/src/excelalchemy/codecs/radio.py b/src/excelalchemy/codecs/radio.py index c6ad935..295e630 100644 --- a/src/excelalchemy/codecs/radio.py +++ b/src/excelalchemy/codecs/radio.py @@ -1,9 +1,8 @@ import logging -from typing import Any from excelalchemy._primitives.constants import MULTI_CHECKBOX_SEPARATOR from excelalchemy._primitives.identity import OptionId -from excelalchemy.codecs.base import ExcelFieldCodec +from excelalchemy.codecs.base import ExcelFieldCodec, WorkbookDisplayValue, WorkbookInputValue from excelalchemy.exceptions import ProgrammaticError from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import display_message as dmsg @@ -16,62 +15,70 @@ class Radio(ExcelFieldCodec, str): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - if not field_meta.options: - logging.error('Field %s of type %s must define options', field_meta.label, cls.__name__) + declared = field_meta.declared + presentation = field_meta.presentation + if not presentation.options: + logging.error('Field %s of type %s must define options', declared.label, cls.__name__) return '\n'.join( [ - field_meta.comment_required, - field_meta.comment_options, + declared.comment_required, + presentation.comment_options, dmsg(MessageKey.COMMENT_SELECTION_MODE, value=dmsg(MessageKey.COMMENT_SELECTION_VALUE_SINGLE)), - field_meta.comment_hint, + presentation.comment_hint, ] ) @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> str: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> str: return str(value).strip() @classmethod - def format_display_value(cls, value: Any | None, field_meta: FieldMetaInfo) -> str: + def format_display_value(cls, value: WorkbookDisplayValue | None, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation if value is None or value == '': return '' try: - return field_meta.options_id_map[value.strip()].name + return presentation.options_id_map(field_label=declared.label)[value.strip()].name except Exception as exc: logging.warning( 'Type %s could not resolve option %s for field %s; returning the original value. Reason: %s', cls.__name__, value, - field_meta.label, + declared.label, exc, ) return value if value is not None else '' @classmethod def normalize_import_value(cls, value: str, field_meta: FieldMetaInfo) -> OptionId | str: # return Option.id + declared = field_meta.declared + presentation = field_meta.presentation if MULTI_CHECKBOX_SEPARATOR in value: raise ValueError(msg(MessageKey.MULTIPLE_SELECTIONS_NOT_SUPPORTED)) parsed = value.strip() - if field_meta.options is None: + if presentation.options is None: raise ProgrammaticError(msg(MessageKey.OPTIONS_CANNOT_BE_NONE_FOR_SELECTION_FIELDS)) - if not field_meta.options: # empty + if not presentation.options: # empty logging.warning( - 'Field %s of type %s has no options; returning the original value', field_meta.label, cls.__name__ + 'Field %s of type %s has no options; returning the original value', declared.label, cls.__name__ ) return parsed - if parsed in field_meta.options_id_map: + options_id_map = presentation.options_id_map(field_label=declared.label) + if parsed in options_id_map: return parsed - if parsed not in field_meta.options_name_map: + options_name_map = presentation.options_name_map(field_label=declared.label) + if parsed not in options_name_map: raise ValueError(msg(MessageKey.OPTION_NOT_FOUND_FIELD_COMMENT)) - return field_meta.options_name_map[parsed].id + return options_name_map[parsed].id SingleChoiceCodec = Radio diff --git a/src/excelalchemy/codecs/staff.py b/src/excelalchemy/codecs/staff.py index 71e2dc9..d7f5a7c 100644 --- a/src/excelalchemy/codecs/staff.py +++ b/src/excelalchemy/codecs/staff.py @@ -16,10 +16,12 @@ class SingleStaff(Radio): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - extra_hint = field_meta.hint or dmsg(MessageKey.SINGLE_STAFF_HINT) + declared = field_meta.declared + presentation = field_meta.presentation + extra_hint = presentation.hint or dmsg(MessageKey.SINGLE_STAFF_HINT) value_key = ( MessageKey.COMMENT_REQUIRED_VALUE_REQUIRED - if field_meta.required + if declared.effective_required else MessageKey.COMMENT_REQUIRED_VALUE_OPTIONAL ) return f'{dmsg(MessageKey.COMMENT_REQUIRED, value=dmsg(value_key))} \n{dmsg(MessageKey.COMMENT_HINT, value=extra_hint)}' @@ -30,18 +32,20 @@ def parse_input(cls, value: object, field_meta: FieldMetaInfo) -> str: @classmethod def format_display_value(cls, value: object | None, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation if value is None or value == '': return '' if not isinstance(value, str): return str(value) try: - return field_meta.options_id_map[OptionId(value.strip())].name + return presentation.options_id_map(field_label=declared.label)[OptionId(value.strip())].name except KeyError: logging.warning( 'Type %s could not resolve option %s for field %s; returning the original value', cls.__name__, value, - field_meta.label, + declared.label, ) return value @@ -51,10 +55,12 @@ class MultiStaff(MultiCheckbox): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, - dmsg(MessageKey.COMMENT_HINT, value=field_meta.hint or dmsg(MessageKey.MULTI_STAFF_HINT)), + declared.comment_required, + dmsg(MessageKey.COMMENT_HINT, value=presentation.hint or dmsg(MessageKey.MULTI_STAFF_HINT)), ] ) @@ -68,6 +74,8 @@ def normalize_import_value(cls, value: object, field_meta: FieldMetaInfo) -> lis @classmethod def format_display_value(cls, value: object, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation if isinstance(value, str): return value @@ -77,7 +85,7 @@ def format_display_value(cls, value: object, field_meta: FieldMetaInfo) -> str: if len(option_ids) != len(set(option_ids)): raise ValueError(msg(MessageKey.OPTIONS_CONTAIN_DUPLICATES)) - option_names = field_meta.exchange_option_ids_to_names(option_ids) + option_names = presentation.exchange_option_ids_to_names(option_ids, field_label=declared.label) return f'{MULTI_CHECKBOX_SEPARATOR}'.join(option_names) logging.warning('%s could not be deserialized', cls.__name__) diff --git a/src/excelalchemy/codecs/string.py b/src/excelalchemy/codecs/string.py index ced9e3d..516fe12 100644 --- a/src/excelalchemy/codecs/string.py +++ b/src/excelalchemy/codecs/string.py @@ -1,7 +1,5 @@ -from typing import Any - from excelalchemy._primitives.constants import CharacterSet -from excelalchemy.codecs.base import ExcelFieldCodec +from excelalchemy.codecs.base import ExcelFieldCodec, WorkbookDisplayValue, WorkbookInputValue from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import display_message as dmsg from excelalchemy.i18n.messages import message as msg @@ -85,22 +83,25 @@ def _format_character_set_names(cs: set[CharacterSet]) -> str: class String(str, ExcelFieldCodec): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + constraints = field_meta.constraints + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_unique, - field_meta.comment_required, - field_meta.comment_max_length, + declared.comment_unique, + declared.comment_required, + constraints.comment_max_length, dmsg(MessageKey.COMMENT_STRING_ALLOWED_CONTENT), - field_meta.comment_hint, + presentation.comment_hint, ] ) @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> str: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> str: return str(value).strip() @classmethod - def format_display_value(cls, value: str | None | Any, field_meta: FieldMetaInfo) -> str: + def format_display_value(cls, value: WorkbookDisplayValue | None, field_meta: FieldMetaInfo) -> str: return str(value).strip() if value is not None else '' # mccabe-complexity: 12 @@ -108,9 +109,10 @@ def format_display_value(cls, value: str | None | Any, field_meta: FieldMetaInfo def normalize_import_value(cls, value: str, field_meta: FieldMetaInfo) -> str: parsed = str(value) errors: list[str] = [] + constraints = field_meta.constraints - if field_meta.importer_max_length is not None and len(parsed) > field_meta.importer_max_length: - errors.append(msg(MessageKey.MAX_LENGTH_CHARACTERS, max_length=field_meta.importer_max_length)) + if constraints.max_length is not None and len(parsed) > constraints.max_length: + errors.append(msg(MessageKey.MAX_LENGTH_CHARACTERS, max_length=constraints.max_length)) errors.extend(cls.__check_character_set__(parsed, field_meta)) @@ -122,12 +124,14 @@ def normalize_import_value(cls, value: str, field_meta: FieldMetaInfo) -> str: @classmethod def __check_character_set__(cls, value: str, field_meta: FieldMetaInfo) -> list[str]: errors: list[str] = [] + presentation = field_meta.presentation + character_set = set(presentation.character_set) for single_character in value: - if not any(_CHARACTER_SET_TO_VALIDATOR[cs](single_character) for cs in field_meta.character_set): + if not any(_CHARACTER_SET_TO_VALIDATOR[cs](single_character) for cs in character_set): errors.append( msg( MessageKey.ONLY_CHARACTER_SET_ALLOWED, - character_set_names=_format_character_set_names(field_meta.character_set), + character_set_names=_format_character_set_names(character_set), ) ) break diff --git a/src/excelalchemy/codecs/tree.py b/src/excelalchemy/codecs/tree.py index f88a4ae..9da0446 100644 --- a/src/excelalchemy/codecs/tree.py +++ b/src/excelalchemy/codecs/tree.py @@ -1,6 +1,6 @@ import logging -from typing import Any +from excelalchemy.codecs.base import WorkbookDisplayValue, WorkbookInputValue from excelalchemy.codecs.multi_checkbox import MultiCheckbox from excelalchemy.codecs.radio import Radio from excelalchemy.i18n.messages import MessageKey @@ -13,23 +13,31 @@ class SingleTreeNode(Radio): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: + declared = field_meta.declared + presentation = field_meta.presentation return '\n'.join( [ - field_meta.comment_required, - dmsg(MessageKey.COMMENT_HINT, value=field_meta.hint or dmsg(MessageKey.SINGLE_TREE_HINT)), + declared.comment_required, + dmsg(MessageKey.COMMENT_HINT, value=presentation.hint or dmsg(MessageKey.SINGLE_TREE_HINT)), ] ) @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: if isinstance(value, str): return value.strip() return value @classmethod - def format_display_value(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def format_display_value( + cls, + value: WorkbookDisplayValue, + field_meta: FieldMetaInfo, + ) -> WorkbookDisplayValue: + declared = field_meta.declared + presentation = field_meta.presentation try: - return field_meta.options_id_map[value.strip()].name + return presentation.options_id_map(field_label=declared.label)[value.strip()].name except KeyError: logging.warning('Could not resolve tree option %s; returning the original value', value) @@ -41,10 +49,12 @@ class MultiTreeNode(MultiCheckbox): @classmethod def build_comment(cls, field_meta: FieldMetaInfo) -> str: - extra_hint = field_meta.hint or dmsg(MessageKey.MULTI_TREE_HINT) + declared = field_meta.declared + presentation = field_meta.presentation + extra_hint = presentation.hint or dmsg(MessageKey.MULTI_TREE_HINT) value_key = ( MessageKey.COMMENT_REQUIRED_VALUE_REQUIRED - if field_meta.required + if declared.effective_required else MessageKey.COMMENT_REQUIRED_VALUE_OPTIONAL ) return '\n'.join( @@ -52,11 +62,11 @@ def build_comment(cls, field_meta: FieldMetaInfo) -> str: ) @classmethod - def parse_input(cls, value: Any, field_meta: FieldMetaInfo) -> Any: + def parse_input(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> WorkbookInputValue: return super().parse_input(value, field_meta) @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> list[str]: + def normalize_import_value(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> list[str]: return super().normalize_import_value(value, field_meta) diff --git a/src/excelalchemy/codecs/url.py b/src/excelalchemy/codecs/url.py index d466201..73aec24 100644 --- a/src/excelalchemy/codecs/url.py +++ b/src/excelalchemy/codecs/url.py @@ -1,7 +1,6 @@ -from typing import Any - from pydantic import HttpUrl, TypeAdapter +from excelalchemy.codecs.base import WorkbookInputValue from excelalchemy.codecs.string import String from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import message as msg @@ -12,7 +11,7 @@ class Url(String): _validator = TypeAdapter(HttpUrl) @classmethod - def normalize_import_value(cls, value: Any, field_meta: FieldMetaInfo) -> str: + def normalize_import_value(cls, value: WorkbookInputValue, field_meta: FieldMetaInfo) -> str: parsed = str(value) errors: list[str] = [] diff --git a/src/excelalchemy/core/alchemy.py b/src/excelalchemy/core/alchemy.py index b65a28f..7f837da 100644 --- a/src/excelalchemy/core/alchemy.py +++ b/src/excelalchemy/core/alchemy.py @@ -97,18 +97,18 @@ def __sync_layout_state__(self) -> None: self.ordered_field_meta = self._layout.ordered_field_meta def __get_importer_model__(self) -> type[ImportCreateModelT] | type[ImportUpdateModelT] | type[ExportModelT]: - importer_model = None + importer_model: type[ImportCreateModelT] | type[ImportUpdateModelT] | type[ExportModelT] | None = None if self.excel_mode == ExcelMode.IMPORT: if not isinstance(self.config, ImporterConfig): raise ConfigError(msg(MessageKey.IMPORT_MODE_CONFIG_REQUIRED, config_name=ImporterConfig.__name__)) if self.config.behavior.import_mode in (ImportMode.CREATE, ImportMode.CREATE_OR_UPDATE): - importer_model = self.config.schema_options.create_importer_model # type: ignore[assignment] + importer_model = self.config.schema_options.create_importer_model elif self.config.behavior.import_mode == ImportMode.UPDATE: - importer_model = self.config.schema_options.update_importer_model # type: ignore[assignment] + importer_model = self.config.schema_options.update_importer_model elif self.excel_mode == ExcelMode.EXPORT: if not isinstance(self.config, ExporterConfig): raise ConfigError(msg(MessageKey.EXPORT_MODE_CONFIG_REQUIRED, config_name=ExporterConfig.__name__)) - importer_model = self.config.schema_options.exporter_model # type: ignore[assignment] + importer_model = self.config.schema_options.exporter_model if importer_model is None: raise ConfigError(msg(MessageKey.NO_IMPORTER_OR_EXPORTER_MODEL_CONFIGURED)) @@ -192,7 +192,9 @@ def df(self) -> WorksheetTable: @property def worksheet_table(self) -> WorksheetTable: - return self.df + if self._last_import_session is None: + return WorksheetTable() + return self._last_import_session.worksheet_table @property def header_df(self) -> WorksheetTable: @@ -203,19 +205,31 @@ def header_df(self) -> WorksheetTable: @property def header_table(self) -> WorksheetTable: - return self.header_df + if self._last_import_session is None: + return WorksheetTable() + return self._last_import_session.header_table @property - def cell_errors(self) -> dict[RowIndex, dict[ColumnIndex, list[ExcelCellError]]]: + def cell_error_map(self) -> dict[RowIndex, dict[ColumnIndex, list[ExcelCellError]]]: if self._last_import_session is None: return {} - return self._last_import_session.cell_errors + return self._last_import_session.cell_error_map @property - def row_errors(self) -> dict[RowIndex, list[ExcelRowError | ExcelCellError]]: + def row_error_map(self) -> dict[RowIndex, list[ExcelRowError | ExcelCellError]]: if self._last_import_session is None: return {} - return self._last_import_session.row_errors + return self._last_import_session.row_error_map + + @property + def cell_errors(self) -> dict[RowIndex, dict[ColumnIndex, list[ExcelCellError]]]: + """Backward-compatible alias for cell_error_map.""" + return self.cell_error_map + + @property + def row_errors(self) -> dict[RowIndex, list[ExcelRowError | ExcelCellError]]: + """Backward-compatible alias for row_error_map.""" + return self.row_error_map @property def last_import_snapshot(self) -> ImportSessionSnapshot | None: diff --git a/src/excelalchemy/core/import_session.py b/src/excelalchemy/core/import_session.py index 561fb71..f2928f2 100644 --- a/src/excelalchemy/core/import_session.py +++ b/src/excelalchemy/core/import_session.py @@ -11,7 +11,7 @@ from excelalchemy._primitives.constants import REASON_COLUMN_KEY, RESULT_COLUMN_KEY from excelalchemy._primitives.header_models import ExcelHeader -from excelalchemy._primitives.identity import DataUrlStr, RowIndex, UniqueLabel, UrlStr +from excelalchemy._primitives.identity import ColumnIndex, DataUrlStr, RowIndex, UniqueLabel, UrlStr from excelalchemy._primitives.payloads import FlatRowPayload, ModelRowPayload from excelalchemy.codecs.base import SystemReserved from excelalchemy.config import ImporterConfig @@ -22,7 +22,7 @@ from excelalchemy.core.schema import ExcelSchemaLayout from excelalchemy.core.storage_protocol import ExcelStorage from excelalchemy.core.table import WorksheetTable -from excelalchemy.exceptions import ConfigError +from excelalchemy.exceptions import ConfigError, ExcelCellError, ExcelRowError from excelalchemy.i18n.messages import MessageKey, use_display_locale from excelalchemy.i18n.messages import display_message as dmsg from excelalchemy.i18n.messages import message as msg @@ -96,7 +96,7 @@ def __init__( self.worksheet_table = WorksheetTable() self.header_table = WorksheetTable() - self._state_df_has_been_loaded = False + self._workbook_loaded = False self.issue_tracker = ImportIssueTracker(self.layout, self.import_result_field_meta) self.row_aggregator = RowAggregator(self.layout, self.behavior.import_mode) @@ -104,26 +104,36 @@ def __init__( self._snapshot = ImportSessionSnapshot() @property - def cell_errors(self): + def cell_error_map(self) -> dict[RowIndex, dict[ColumnIndex, list[ExcelCellError]]]: return self.issue_tracker.cell_errors @property - def row_errors(self): + def row_error_map(self) -> dict[RowIndex, list[ExcelRowError | ExcelCellError]]: return self.issue_tracker.row_errors + @property + def cell_errors(self) -> dict[RowIndex, dict[ColumnIndex, list[ExcelCellError]]]: + """Backward-compatible alias for cell_error_map.""" + return self.cell_error_map + + @property + def row_errors(self) -> dict[RowIndex, list[ExcelRowError | ExcelCellError]]: + """Backward-compatible alias for row_error_map.""" + return self.row_error_map + @property def snapshot(self) -> ImportSessionSnapshot: return self._snapshot @cached_property def input_excel_has_merged_header(self) -> bool: - if not self._state_df_has_been_loaded: + if not self._workbook_loaded: raise ConfigError(msg(MessageKey.WORKSHEET_TABLE_NOT_LOADED)) return self.header_parser.has_merged_header(self.header_table) @cached_property def input_excel_headers(self) -> list[ExcelHeader]: - if not self._state_df_has_been_loaded: + if not self._workbook_loaded: raise ConfigError(msg(MessageKey.WORKSHEET_TABLE_NOT_LOADED)) return self.header_parser.extract(self.header_table) @@ -203,7 +213,7 @@ def _validate_header(self, input_excel_name: str) -> ValidateHeaderResult: return validate_header def _load_workbook(self, input_excel_name: str) -> WorksheetTable: - if not self._state_df_has_been_loaded: + if not self._workbook_loaded: worksheet_table = self.storage_gateway.read_excel_table( input_excel_name, skiprows=HEADER_HINT_LINE_COUNT, @@ -211,7 +221,7 @@ def _load_workbook(self, input_excel_name: str) -> WorksheetTable: ) self.worksheet_table = worksheet_table self.header_table = worksheet_table.head(2) - self._state_df_has_been_loaded = True + self._workbook_loaded = True self._snapshot = replace(self._snapshot, phase=ImportSessionPhase.WORKBOOK_LOADED) return self.worksheet_table @@ -260,7 +270,7 @@ def _render_import_result_excel(self) -> DataUrlStr: self.worksheet_table, field_meta_mapping=self.import_result_label_to_field_meta | self.layout.unique_label_to_field_meta, has_merged_header=self.input_excel_has_merged_header, - errors=self.cell_errors, + errors=self.cell_error_map, ) def _upload_file(self, output_name: str, content_with_prefix: DataUrlStr) -> UrlStr: diff --git a/src/excelalchemy/core/rows.py b/src/excelalchemy/core/rows.py index e963529..1aa5a8f 100644 --- a/src/excelalchemy/core/rows.py +++ b/src/excelalchemy/core/rows.py @@ -34,8 +34,9 @@ def _aggregate(self, row_data: RowPayloadLike) -> AggregatedRowPayload: for unique_label_raw, value in row_data.items(): unique_label = UniqueLabel(unique_label_raw) field_meta = self.layout.unique_label_to_field_meta[unique_label] + runtime = field_meta.runtime - if field_meta.key is None or field_meta.parent_key is None: + if runtime.key is None or runtime.parent_key is None: raise ConfigError( msg(MessageKey.FIELD_META_RUNTIME_KEY_MISSING, field_meta_type=type(field_meta).__name__) ) @@ -46,11 +47,11 @@ def _aggregate(self, row_data: RowPayloadLike) -> AggregatedRowPayload: else: continue - if field_meta.parent_key == field_meta.key: - aggregated[str(field_meta.key)] = value + if runtime.parent_key == runtime.key: + aggregated[str(runtime.key)] = value else: - parent_key = str(field_meta.parent_key) - child_key = str(field_meta.key) + parent_key = str(runtime.parent_key) + child_key = str(runtime.key) nested = aggregated.setdefault(parent_key, {}) if not isinstance(nested, dict): raise TypeError(f'Expected nested payload mapping for {parent_key!r}, got {type(nested)}') diff --git a/src/excelalchemy/core/schema.py b/src/excelalchemy/core/schema.py index 97c5018..4135566 100644 --- a/src/excelalchemy/core/schema.py +++ b/src/excelalchemy/core/schema.py @@ -44,13 +44,15 @@ def from_model(cls, model: type[BaseModel]) -> 'ExcelSchemaLayout': def _build_indexes(self) -> None: for field_meta in self.ordered_field_meta: - if field_meta.parent_label is None: + runtime = field_meta.runtime + + if runtime.parent_label is None: raise ConfigError(msg(MessageKey.PARENT_LABEL_EMPTY_RUNTIME)) - if field_meta.parent_key is None: + if runtime.parent_key is None: raise ConfigError(msg(MessageKey.PARENT_KEY_EMPTY_RUNTIME)) - self.parent_label_to_field_metas.setdefault(field_meta.parent_label, []).append(field_meta) - self.parent_key_to_field_metas.setdefault(field_meta.parent_key, []).append(field_meta) + self.parent_label_to_field_metas.setdefault(runtime.parent_label, []).append(field_meta) + self.parent_key_to_field_metas.setdefault(runtime.parent_key, []).append(field_meta) self.unique_key_to_field_meta[field_meta.unique_key] = field_meta self.unique_label_to_field_meta[field_meta.unique_label] = field_meta @@ -58,8 +60,10 @@ def _build_indexes(self) -> None: def _check_field_meta_order(field_metas: list[FieldMetaInfo]) -> None: order_to_field_meta: dict[int, set[Label]] = defaultdict(set) for field_meta in field_metas: - assert field_meta.parent_label is not None - order_to_field_meta[field_meta.order].add(field_meta.parent_label) + runtime = field_meta.runtime + declared = field_meta.declared + assert runtime.parent_label is not None + order_to_field_meta[declared.order].add(runtime.parent_label) duplicate_order = [v for k, v in order_to_field_meta.items() if len(v) > 1 and k != DEFAULT_FIELD_META_ORDER] if duplicate_order: raise ConfigError( @@ -73,24 +77,26 @@ def _check_field_meta_order(field_metas: list[FieldMetaInfo]) -> None: def _sort_field_meta(cls, field_metas: list[FieldMetaInfo]) -> list[FieldMetaInfo]: orders: dict[Label, int] = {} for idx, field_meta in enumerate(field_metas): - assert field_meta.parent_label is not None - if field_meta.order == DEFAULT_FIELD_META_ORDER: - orders[field_meta.parent_label] = idx + runtime = field_meta.runtime + declared = field_meta.declared + assert runtime.parent_label is not None + if declared.order == DEFAULT_FIELD_META_ORDER: + orders[runtime.parent_label] = idx else: - orders[field_meta.parent_label] = field_meta.order + orders[runtime.parent_label] = declared.order return sorted( field_metas, key=lambda x: ( - orders.get(cast(Label, x.parent_label), Decimal('Infinity')), - x.offset, + orders.get(cast(Label, x.runtime.parent_label), Decimal('Infinity')), + x.runtime.offset, ), ) def has_merged_header(self, selected_keys: list[UniqueKey]) -> bool: """Return whether the selected keys need a two-row merged header.""" return any( - self.unique_key_to_field_meta[key].label != self.unique_key_to_field_meta[key].parent_label + self.unique_key_to_field_meta[key].declared.label != self.unique_key_to_field_meta[key].runtime.parent_label for key in selected_keys ) @@ -103,8 +109,8 @@ def get_output_parent_excel_headers(self, selected_keys: list[UniqueKey] | None def get_output_child_excel_headers(self, selected_keys: list[UniqueKey] | None = None) -> list[Label]: """Return the child labels used in the second header row for merged exports.""" if not selected_keys: - return [field_meta.label for field_meta in self.ordered_field_meta] - return [self.unique_key_to_field_meta[key].label for key in selected_keys] + return [field_meta.declared.label for field_meta in self.ordered_field_meta] + return [self.unique_key_to_field_meta[key].declared.label for key in selected_keys] def select_output_excel_keys(self, keys: Sequence[str] | None = None) -> list[UniqueKey]: """Expand parent keys into concrete flattened keys while preserving layout order.""" diff --git a/src/excelalchemy/core/storage.py b/src/excelalchemy/core/storage.py index 47843df..cff159a 100644 --- a/src/excelalchemy/core/storage.py +++ b/src/excelalchemy/core/storage.py @@ -1,6 +1,6 @@ """Storage factory for resolving ExcelAlchemy storage strategies.""" -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from pydantic import BaseModel @@ -43,7 +43,7 @@ def build_storage_gateway[ return MissingStorageGateway() -def __getattr__(name: str) -> Any: +def __getattr__(name: str) -> object: if name == 'MinioStorageGateway': from excelalchemy.core.storage_minio import MinioStorageGateway diff --git a/src/excelalchemy/core/writer.py b/src/excelalchemy/core/writer.py index eb5a22a..dee98c5 100644 --- a/src/excelalchemy/core/writer.py +++ b/src/excelalchemy/core/writer.py @@ -4,7 +4,7 @@ import io from collections import defaultdict from math import ceil -from typing import Any, BinaryIO, cast +from typing import BinaryIO, cast from openpyxl import Workbook from openpyxl.cell.cell import Cell @@ -21,7 +21,7 @@ FONT_READ_COLOR, ) from excelalchemy._primitives.identity import ColumnIndex, DataUrlStr, Label, RowIndex, UniqueLabel -from excelalchemy.core.table import WorksheetTable +from excelalchemy.core.table import WorksheetTable, WorksheetValue from excelalchemy.exceptions import ExcelCellError from excelalchemy.i18n.messages import MessageKey from excelalchemy.i18n.messages import display_message as dmsg @@ -83,7 +83,7 @@ def _style_header_cell(cell: Cell, field_meta: FieldMetaInfo) -> None: comment = _build_comment(field_meta) if comment is not None: cell.comment = comment - if field_meta.required: + if field_meta.declared.effective_required: cell.fill = PatternFill(start_color=BACKGROUND_REQUIRED_COLOR, fill_type='solid') cell.font = Font(bold=True) cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) @@ -143,7 +143,9 @@ def _write_vertically_merged_header( start=column_write_offset + OPENPYXL_EXCEL_INDEX_START_AT, ): field_meta = field_meta_mapping[cast(UniqueLabel, column)] - if field_meta.label == field_meta.parent_label: + declared = field_meta.declared + runtime = field_meta.runtime + if declared.label == runtime.parent_label: worksheet.merge_cells( start_row=start_row, start_column=openpyxl_col_index, @@ -162,26 +164,30 @@ def _write_horizontally_merged_header( ) -> None: counter: dict[Label, int] = defaultdict(int) for field_meta in field_meta_mapping.values(): - if field_meta.parent_label is None: + declared = field_meta.declared + runtime = field_meta.runtime + if runtime.parent_label is None: raise RuntimeError(msg(MessageKey.PARENT_LABEL_EMPTY_RUNTIME)) - counter[field_meta.parent_label] += 1 + counter[runtime.parent_label] += 1 for openpyxl_col_index, column in enumerate( worksheet_table.columns[column_write_offset:], start=column_write_offset + OPENPYXL_EXCEL_INDEX_START_AT, ): field_meta = field_meta_mapping[cast(UniqueLabel, column)] - if field_meta.parent_label is None: + declared = field_meta.declared + runtime = field_meta.runtime + if runtime.parent_label is None: raise RuntimeError(msg(MessageKey.PARENT_LABEL_EMPTY_RUNTIME)) - if field_meta.label != field_meta.parent_label and field_meta.offset == 0: + if declared.label != runtime.parent_label and runtime.offset == 0: cell = _worksheet_cell(worksheet, row=start_row, column=openpyxl_col_index) - cell.value = str(field_meta.parent_label) + cell.value = str(runtime.parent_label) cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) worksheet.merge_cells( start_row=start_row, start_column=openpyxl_col_index, end_row=start_row, - end_column=openpyxl_col_index + counter[field_meta.parent_label] - 1, + end_column=openpyxl_col_index + counter[runtime.parent_label] - 1, ) @@ -231,7 +237,7 @@ def _get_parsed_value( col_index: int, field_meta_mapping: dict[UniqueLabel, FieldMetaInfo], ) -> str: - cell_value: str | Any | None = worksheet_table.iloc[row_index, col_index] + cell_value: WorksheetValue = worksheet_table.iloc[row_index, col_index] if value_is_nan(cell_value): return '' diff --git a/src/excelalchemy/exceptions.py b/src/excelalchemy/exceptions.py index 741a2ff..b1e585e 100644 --- a/src/excelalchemy/exceptions.py +++ b/src/excelalchemy/exceptions.py @@ -1,7 +1,5 @@ """Public exception types raised by ExcelAlchemy.""" -from typing import Any - from excelalchemy._primitives.constants import UNIQUE_HEADER_CONNECTOR from excelalchemy._primitives.identity import Label, UniqueLabel from excelalchemy.i18n.messages import MessageKey @@ -14,14 +12,14 @@ class ExcelCellError(Exception): message = msg(MessageKey.EXCEL_IMPORT_ERROR) label: Label parent_label: Label | None - detail: dict[str, Any] + detail: dict[str, object] def __init__( self, message: str, label: Label, parent_label: Label | None = None, - **kwargs: Any, + **kwargs: object, ): super().__init__(message, label, parent_label) self.message = message or self.message @@ -63,7 +61,7 @@ class ExcelRowError(Exception): def __init__( self, message: str, - **kwargs: Any, + **kwargs: object, ): super().__init__(message) self.message = message or self.message diff --git a/src/excelalchemy/helper/pydantic.py b/src/excelalchemy/helper/pydantic.py index 729f000..ef7d0e6 100644 --- a/src/excelalchemy/helper/pydantic.py +++ b/src/excelalchemy/helper/pydantic.py @@ -14,6 +14,9 @@ from excelalchemy.i18n.messages import message as msg from excelalchemy.metadata import FieldMetaInfo, extract_declared_field_metadata +type ExcelValidationIssue = ExcelCellError | ExcelRowError +type ExcelValidationIssues = list[ExcelValidationIssue] + def _resolve_excel_codec_type(annotation: object) -> type[ExcelFieldCodec]: if isinstance(annotation, type): @@ -60,11 +63,12 @@ def allows_none(self) -> bool: @property def required(self) -> bool: declared = self.declared_metadata + declared_meta = declared.declared - if declared.is_primary_key or declared.unique: + if declared_meta.effective_required is not None: + return bool(declared_meta.effective_required) + if declared_meta.is_primary_key or declared_meta.unique: return True - if declared.required is not None: - return declared.required if self.raw_field.default is not PydanticUndefined or self.raw_field.default_factory is not None: return False return not self.allows_none @@ -75,10 +79,11 @@ def declared_metadata(self) -> FieldMetaInfo: def runtime_metadata(self) -> FieldMetaInfo: declared = self.declared_metadata + declared_meta = declared.declared return declared.bind_runtime( required=self.required, excel_codec=self.excel_codec, - parent_label=declared.label, + parent_label=declared_meta.label, parent_key=Key(self.name), key=Key(self.name), offset=0, @@ -128,11 +133,11 @@ def get_model_field_names(model: type[BaseModel]) -> list[str]: def instantiate_pydantic_model[ModelT: BaseModel]( data: Mapping[str, object], model: type[ModelT], -) -> ModelT | list[ExcelCellError | ExcelRowError]: +) -> ModelT | ExcelValidationIssues: """Instantiate a Pydantic model and return mapped Excel errors when validation fails.""" model_adapter = PydanticModelAdapter(model) normalized_data: dict[str, object] = {} - errors: list[ExcelCellError | ExcelRowError] = [] + errors: ExcelValidationIssues = [] failed_fields: set[str] = set() for field_adapter in model_adapter.fields(): @@ -161,6 +166,7 @@ def instantiate_pydantic_model[ModelT: BaseModel]( def _extract_pydantic_model(model: PydanticModelAdapter) -> Generator[FieldMetaInfo, None, None]: for field_adapter in model.fields(): declared_metadata = field_adapter.declared_metadata + declared_meta = declared_metadata.declared excel_codec = field_adapter.excel_codec if issubclass(excel_codec, CompositeExcelFieldCodec): @@ -169,7 +175,7 @@ def _extract_pydantic_model(model: PydanticModelAdapter) -> Generator[FieldMetaI yield inherited.bind_runtime( required=field_adapter.required, excel_codec=excel_codec, - parent_label=declared_metadata.label, + parent_label=declared_meta.label, parent_key=Key(field_adapter.name), key=key, offset=offset, @@ -179,7 +185,7 @@ def _extract_pydantic_model(model: PydanticModelAdapter) -> Generator[FieldMetaI def _handle_error( - error_container: list[ExcelCellError | ExcelRowError], + error_container: ExcelValidationIssues, exc: Exception, field_def: FieldMetaInfo, ) -> None: @@ -210,7 +216,7 @@ def _map_validation_error( model_adapter: PydanticModelAdapter, failed_fields: set[str], ) -> list[ExcelCellError | ExcelRowError]: - mapped: list[ExcelCellError | ExcelRowError] = [] + mapped: ExcelValidationIssues = [] for error in exc.errors(): loc = error.get('loc', ()) if not loc: @@ -230,7 +236,7 @@ def _map_validation_error( mapped.append(_nested_excel_error(field_adapter, loc[1], message)) continue - mapped.append(ExcelCellError(label=field_adapter.declared_metadata.label, message=message)) + mapped.append(ExcelCellError(label=field_adapter.declared_metadata.declared.label, message=message)) return mapped @@ -241,14 +247,15 @@ def _nested_excel_error( message: str, ) -> ExcelCellError: declared_metadata = field_adapter.declared_metadata + declared_meta = declared_metadata.declared excel_codec = field_adapter.excel_codec if issubclass(excel_codec, CompositeExcelFieldCodec): for key, sub_field_info in excel_codec.column_items(): if key == child_key: return ExcelCellError( label=sub_field_info.label, - parent_label=declared_metadata.label, + parent_label=declared_meta.label, message=message, ) - return ExcelCellError(label=declared_metadata.label, message=message) + return ExcelCellError(label=declared_meta.label, message=message) diff --git a/src/excelalchemy/metadata.py b/src/excelalchemy/metadata.py index 2e9455c..ae8faa9 100644 --- a/src/excelalchemy/metadata.py +++ b/src/excelalchemy/metadata.py @@ -35,6 +35,9 @@ EXCEL_FIELD_METADATA_KEY = 'excelalchemy_metadata' type FieldDefaultFactory = Callable[[], object] type FieldIncludeExclude = Set[IntStr] | bool | None +type FieldSchemaExtra = dict[str, Any] +type FieldKwargs = dict[str, Any] +type FieldFactoryReturn = Any def _normalize_character_set(character_set: set[CharacterSet] | None) -> frozenset[CharacterSet]: @@ -65,6 +68,28 @@ class DeclaredFieldMeta: required: bool | None order: int + @property + def effective_required(self) -> bool | None: + if self.is_primary_key or self.unique: + return True + return self.required + + @property + def comment_required(self) -> str: + value_key = ( + MessageKey.COMMENT_REQUIRED_VALUE_REQUIRED + if self.effective_required + else MessageKey.COMMENT_REQUIRED_VALUE_OPTIONAL + ) + return dmsg(MessageKey.COMMENT_REQUIRED, value=dmsg(value_key)) + + @property + def comment_unique(self) -> str: + value_key = ( + MessageKey.COMMENT_UNIQUE_VALUE_UNIQUE if self.unique else MessageKey.COMMENT_UNIQUE_VALUE_NON_UNIQUE + ) + return dmsg(MessageKey.COMMENT_UNIQUE, value=dmsg(value_key)) + @dataclass(slots=True, frozen=True) class RuntimeFieldBinding: @@ -76,6 +101,22 @@ class RuntimeFieldBinding: offset: int = DEFAULT_FIELD_META_ORDER excel_codec: type[ExcelFieldCodec] = UndefinedFieldCodec + def make_unique_label(self, *, label: Label) -> UniqueLabel: + if self.parent_label is None: + raise RuntimeError(msg(MessageKey.PARENT_LABEL_EMPTY_RUNTIME)) + unique_label = ( + f'{self.parent_label}{UNIQUE_HEADER_CONNECTOR}{label}' if self.parent_label != label else label + ) + return UniqueLabel(unique_label) + + def make_unique_key(self, *, key: Key | None) -> UniqueKey: + if self.parent_key is None: + raise RuntimeError(msg(MessageKey.PARENT_KEY_EMPTY_RUNTIME)) + if key is None: + raise RuntimeError(msg(MessageKey.KEY_EMPTY_RUNTIME)) + unique_key = f'{self.parent_key}{UNIQUE_HEADER_CONNECTOR}{key}' if self.parent_key != key else key + return UniqueKey(unique_key) + @dataclass(slots=True, frozen=True) class WorkbookPresentationMeta: @@ -90,6 +131,111 @@ class WorkbookPresentationMeta: unit: str | None = None hint: str | None = None + @property + def comment_date_format(self) -> str: + if self.date_format is None: + return '' + return dmsg(MessageKey.COMMENT_DATE_FORMAT, value=DATE_FORMAT_TO_HINT_MAPPING[self.date_format]) + + @property + def comment_date_range_option(self) -> str: + if self.date_range_option is None: + return dmsg(MessageKey.COMMENT_DATE_RANGE_OPTION, value=dmsg(MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY)) + option_mapping = { + DataRangeOption.PRE: MessageKey.DATE_RANGE_OPTION_PRE_DISPLAY, + DataRangeOption.NEXT: MessageKey.DATE_RANGE_OPTION_NEXT_DISPLAY, + DataRangeOption.NONE: MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY, + } + return dmsg(MessageKey.COMMENT_DATE_RANGE_OPTION, value=dmsg(option_mapping[self.date_range_option])) + + @property + def comment_hint(self) -> str: + if self.hint is None: + return '' + return dmsg(MessageKey.COMMENT_HINT, value=self.hint) + + @property + def comment_options(self) -> str: + if self.options is None: + return '' + return dmsg(MessageKey.COMMENT_OPTIONS, value=MULTI_CHECKBOX_SEPARATOR.join(option.name for option in self.options)) + + @property + def comment_fraction_digits(self) -> str: + return dmsg(MessageKey.COMMENT_FRACTION_DIGITS, value=self.fraction_digits or 0) + + @property + def comment_unit(self) -> str: + return dmsg(MessageKey.COMMENT_UNIT, value=self.unit or dmsg(MessageKey.COMMENT_UNIT_VALUE_NONE)) + + @property + def must_date_format(self) -> DateFormat: + if self.date_format is None: + raise ConfigError(msg(MessageKey.DATE_FORMAT_EMPTY_RUNTIME)) + return self.date_format + + @property + def python_date_format(self) -> str: + return DATE_FORMAT_TO_PYTHON_MAPPING[self.must_date_format] + + def options_id_map(self, *, field_label: Label) -> dict[OptionId, Option]: + if self.options is None: + return {} + if len(self.options) > MAX_OPTIONS_COUNT: + logging.warning( + 'Field "%s" defines %s options; please confirm that this is intentional because options are not meant for large datasets', + field_label, + len(self.options), + ) + return {option.id: option for option in self.options} + + def options_name_map(self, *, field_label: Label) -> dict[str, Option]: + if self.options is None: + return {} + if len(self.options) > MAX_OPTIONS_COUNT: + logging.warning( + 'Field "%s" defines %s options; please confirm that this is intentional because options are not meant for large datasets', + field_label, + len(self.options), + ) + return {option.name: option for option in self.options} + + def exchange_option_ids_to_names( + self, + option_ids: list[str] | list[OptionId], + *, + field_label: Label, + ) -> list[str]: + option_id_map = self.options_id_map(field_label=field_label) + option_names: list[str] = [] + + for option_id in option_ids: + normalized_id = OptionId(option_id) + try: + option_names.append(option_id_map[normalized_id].name) + except KeyError: + logging.warning('Could not find option id %s; returning the original value', normalized_id) + option_names.append(normalized_id) + + return option_names + + def exchange_names_to_option_ids_with_errors( + self, + names: list[str], + *, + field_label: Label, + ) -> tuple[list[str], list[str]]: + option_name_map = self.options_name_map(field_label=field_label) + errors: list[str] = [] + result: list[str] = [] + for name in names: + option = option_name_map.get(name) + if option is None: + errors.append(msg(MessageKey.OPTION_NOT_FOUND_HEADER_COMMENT)) + else: + result.append(option.id) + return result, errors + @dataclass(slots=True, frozen=True) class ImportConstraints: @@ -105,9 +251,29 @@ class ImportConstraints: min_length: int | None = None max_length: int | None = None + @property + def comment_max_length(self) -> str: + return dmsg( + MessageKey.COMMENT_MAX_LENGTH, + value=self.max_length or dmsg(MessageKey.COMMENT_MAX_LENGTH_VALUE_UNLIMITED), + ) + class FieldMetaInfo: - """Excel field metadata independent from any validation backend.""" + """Compatibility facade over layered Excel metadata objects. + + The public 2.x API still exposes a single ``FieldMetaInfo`` object because + ``FieldMeta(...)`` and ``ExcelMeta(...)`` are intentionally concise. The + actual metadata state, however, is split across: + + - ``DeclaredFieldMeta`` for declaration semantics + - ``RuntimeFieldBinding`` for flattened runtime identity + - ``WorkbookPresentationMeta`` for workbook-facing hints and formatting + - ``ImportConstraints`` for importer-side validation hints + + New internal code should prefer these layer objects over treating + ``FieldMetaInfo`` as a flat mutable record. + """ def __init__( self, @@ -200,6 +366,22 @@ def bind_runtime( runtime.offset = offset return runtime + @property + def declared(self) -> DeclaredFieldMeta: + return self.declared_meta + + @property + def runtime(self) -> RuntimeFieldBinding: + return self.runtime_binding + + @property + def presentation(self) -> WorkbookPresentationMeta: + return self.presentation_meta + + @property + def constraints(self) -> ImportConstraints: + return self.import_constraints + @property def excel_codec(self) -> type[ExcelFieldCodec]: return self.runtime_binding.excel_codec @@ -239,140 +421,70 @@ def validate_state(self) -> None: raise ValueError(msg(MessageKey.PRIMARY_KEY_AND_UNIQUE_MUST_BE_REQUIRED)) def exchange_option_ids_to_names(self, option_ids: list[str] | list[OptionId]) -> list[str]: - option_names: list[str] = [] - - for option_id in option_ids: - option_id = OptionId(option_id) - try: - option_names.append(self.options_id_map[option_id].name) - except KeyError: - logging.warning('Could not find option id %s; returning the original value', option_id) - option_names.append(option_id) - - return option_names + return self.presentation_meta.exchange_option_ids_to_names(option_ids, field_label=self.label) def exchange_names_to_option_ids_with_errors(self, names: list[str]) -> tuple[list[str], list[str]]: - errors: list[str] = [] - result: list[str] = [] - for name in names: - option = self.options_name_map.get(name) - if option is None: - errors.append(msg(MessageKey.OPTION_NOT_FOUND_HEADER_COMMENT)) - else: - result.append(option.id) - return result, errors + return self.presentation_meta.exchange_names_to_option_ids_with_errors(names, field_label=self.label) @property def unique_label(self) -> UniqueLabel: - if self.parent_label is None: - raise RuntimeError(msg(MessageKey.PARENT_LABEL_EMPTY_RUNTIME)) - label = ( - f'{self.parent_label}{UNIQUE_HEADER_CONNECTOR}{self.label}' - if self.parent_label != self.label - else self.label - ) - return UniqueLabel(label) + return self.runtime_binding.make_unique_label(label=self.label) @property def unique_key(self) -> UniqueKey: - if self.parent_key is None: - raise RuntimeError(msg(MessageKey.PARENT_KEY_EMPTY_RUNTIME)) - if self.key is None: - raise RuntimeError(msg(MessageKey.KEY_EMPTY_RUNTIME)) - key = f'{self.parent_key}{UNIQUE_HEADER_CONNECTOR}{self.key}' if self.parent_key != self.key else self.key - return UniqueKey(key) + return self.runtime_binding.make_unique_key(key=self.key) @cached_property def options_id_map(self) -> dict[OptionId, Option]: - if self.options is None: - return {} - if len(self.options) > MAX_OPTIONS_COUNT: - logging.warning( - 'Field "%s" defines %s options; please confirm that this is intentional because options are not meant for large datasets', - self.label, - len(self.options), - ) - return {option.id: option for option in self.options} + return self.presentation_meta.options_id_map(field_label=self.label) @cached_property def options_name_map(self) -> dict[str, Option]: - if self.options is None: - return {} - if len(self.options) > MAX_OPTIONS_COUNT: - logging.warning( - 'Field "%s" defines %s options; please confirm that this is intentional because options are not meant for large datasets', - self.label, - len(self.options), - ) - return {option.name: option for option in self.options} + return self.presentation_meta.options_name_map(field_label=self.label) @property def comment_required(self) -> str: - value_key = ( - MessageKey.COMMENT_REQUIRED_VALUE_REQUIRED if self.required else MessageKey.COMMENT_REQUIRED_VALUE_OPTIONAL - ) - return dmsg(MessageKey.COMMENT_REQUIRED, value=dmsg(value_key)) + return self.declared_meta.comment_required @property def comment_date_format(self) -> str: - if self.date_format is None: - return '' - return dmsg(MessageKey.COMMENT_DATE_FORMAT, value=DATE_FORMAT_TO_HINT_MAPPING[self.date_format]) + return self.presentation_meta.comment_date_format @property def comment_date_range_option(self) -> str: - if self.date_range_option is None: - return dmsg(MessageKey.COMMENT_DATE_RANGE_OPTION, value=dmsg(MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY)) - option_mapping = { - DataRangeOption.PRE: MessageKey.DATE_RANGE_OPTION_PRE_DISPLAY, - DataRangeOption.NEXT: MessageKey.DATE_RANGE_OPTION_NEXT_DISPLAY, - DataRangeOption.NONE: MessageKey.DATE_RANGE_OPTION_NONE_DISPLAY, - } - return dmsg(MessageKey.COMMENT_DATE_RANGE_OPTION, value=dmsg(option_mapping[self.date_range_option])) + return self.presentation_meta.comment_date_range_option @property def comment_hint(self) -> str: - if self.hint is None: - return '' - return dmsg(MessageKey.COMMENT_HINT, value=self.hint) + return self.presentation_meta.comment_hint @property def comment_options(self) -> str: - if self.options is None: - return '' - return dmsg(MessageKey.COMMENT_OPTIONS, value=MULTI_CHECKBOX_SEPARATOR.join(x.name for x in self.options)) + return self.presentation_meta.comment_options @property def comment_fraction_digits(self) -> str: - return dmsg(MessageKey.COMMENT_FRACTION_DIGITS, value=self.fraction_digits or 0) + return self.presentation_meta.comment_fraction_digits @property def comment_unit(self) -> str: - return dmsg(MessageKey.COMMENT_UNIT, value=self.unit or dmsg(MessageKey.COMMENT_UNIT_VALUE_NONE)) + return self.presentation_meta.comment_unit @property def comment_unique(self) -> str: - value_key = ( - MessageKey.COMMENT_UNIQUE_VALUE_UNIQUE if self.unique else MessageKey.COMMENT_UNIQUE_VALUE_NON_UNIQUE - ) - return dmsg(MessageKey.COMMENT_UNIQUE, value=dmsg(value_key)) + return self.declared_meta.comment_unique @property def comment_max_length(self) -> str: - return dmsg( - MessageKey.COMMENT_MAX_LENGTH, - value=self.importer_max_length or dmsg(MessageKey.COMMENT_MAX_LENGTH_VALUE_UNLIMITED), - ) + return self.import_constraints.comment_max_length @property def must_date_format(self) -> DateFormat: - if self.date_format is None: - raise ConfigError(msg(MessageKey.DATE_FORMAT_EMPTY_RUNTIME)) - return self.date_format + return self.presentation_meta.must_date_format @property def python_date_format(self) -> str: - return DATE_FORMAT_TO_PYTHON_MAPPING[self.must_date_format] + return self.presentation_meta.python_date_format def __repr__(self) -> str: return ( @@ -820,7 +932,8 @@ def FieldMeta( discriminator: str | None = None, repr: bool = True, **extra: object, -) -> Any: +) -> FieldFactoryReturn: + """Compatibility wrapper over pydantic.Field for workbook-aware declarations.""" metadata = _build_excel_metadata( label=label, is_primary_key=is_primary_key, @@ -847,7 +960,7 @@ def FieldMeta( max_length=max_length, ) - json_schema_extra: dict[str, Any] = {EXCEL_FIELD_METADATA_KEY: metadata} | extra + json_schema_extra: FieldSchemaExtra = {EXCEL_FIELD_METADATA_KEY: metadata} | dict(extra) if include is not None: json_schema_extra['include'] = include if const is not None: @@ -859,7 +972,7 @@ def FieldMeta( if unique_items is not None: json_schema_extra['unique_items'] = unique_items - field_kwargs: dict[str, Any] = { + field_kwargs: FieldKwargs = { 'repr': repr, 'json_schema_extra': json_schema_extra, } diff --git a/src/excelalchemy/util/file.py b/src/excelalchemy/util/file.py index 536c9cc..eaf11a9 100644 --- a/src/excelalchemy/util/file.py +++ b/src/excelalchemy/util/file.py @@ -1,6 +1,6 @@ import math from collections.abc import Mapping, Sequence -from typing import Any, cast +from typing import cast from excelalchemy._primitives.constants import UNIQUE_HEADER_CONNECTOR @@ -37,7 +37,7 @@ def flatten(data: Mapping[str, object], level: list[str] | None = None) -> dict[ return tmp_dict -def value_is_nan(value: Any) -> bool: +def value_is_nan(value: object) -> bool: """Return whether a worksheet value should be treated as empty or NaN.""" if value is None: return True diff --git a/tests/integration/test_examples_smoke.py b/tests/integration/test_examples_smoke.py new file mode 100644 index 0000000..0138050 --- /dev/null +++ b/tests/integration/test_examples_smoke.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import contextlib +import importlib.util +import io +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[2] +EXAMPLES_DIR = REPO_ROOT / 'examples' + + +def _load_example_module(module_name: str, filename: str): + module_path = EXAMPLES_DIR / filename + spec = importlib.util.spec_from_file_location(module_name, module_path) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_annotated_schema_example_main_generates_template_output() -> None: + module = _load_example_module('example_annotated_schema', 'annotated_schema.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Generated template:' in output + assert 'employee-template.xlsx' in output + + +def test_custom_storage_example_main_uploads_to_memory_storage() -> None: + module = _load_example_module('example_custom_storage', 'custom_storage.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'memory://employees.xlsx' in output + assert 'Uploaded bytes:' in output + + +def test_employee_import_workflow_example_main_runs_end_to_end() -> None: + module = _load_example_module('example_employee_import_workflow', 'employee_import_workflow.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Employee import workflow completed' in output + assert 'Success rows: 1' in output + assert 'Failed rows: 0' in output + + +def test_create_or_update_import_example_main_runs_end_to_end() -> None: + module = _load_example_module('example_create_or_update_import', 'create_or_update_import.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Create-or-update import workflow completed' in output + assert 'Success rows: 2' in output + assert 'Failed rows: 0' in output + assert 'Created rows: 1' in output + assert 'Updated rows: 1' in output + + +def test_date_and_range_fields_example_main_generates_template_output() -> None: + module = _load_example_module('example_date_and_range_fields', 'date_and_range_fields.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Generated template:' in output + assert 'compensation-template.xlsx' in output + + +def test_selection_fields_example_main_generates_template_output() -> None: + module = _load_example_module('example_selection_fields', 'selection_fields.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Generated template:' in output + assert 'selection-fields-template.xlsx' in output + + +def test_export_workflow_example_main_runs_end_to_end() -> None: + module = _load_example_module('example_export_workflow', 'export_workflow.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Export workflow completed' in output + assert 'Artifact filename: employees-export.xlsx' in output + assert 'Upload URL: memory://employees-export-upload.xlsx' in output + + +def test_fastapi_example_source_compiles() -> None: + source = (EXAMPLES_DIR / 'fastapi_upload.py').read_text(encoding='utf-8') + compile(source, str(EXAMPLES_DIR / 'fastapi_upload.py'), 'exec') + + +@pytest.mark.skipif(importlib.util.find_spec('minio') is None, reason='minio is not installed') +def test_minio_storage_example_main_builds_gateway() -> None: + module = _load_example_module('example_minio_storage', 'minio_storage.py') + + buffer = io.StringIO() + with contextlib.redirect_stdout(buffer): + module.main() + + output = buffer.getvalue() + assert 'Built gateway: MinioStorageGateway' in output + assert 'Uses built-in Minio path: True' in output + assert 'Gateway type check: True' in output + + +@pytest.mark.skipif(importlib.util.find_spec('fastapi') is None, reason='fastapi is not installed') +def test_fastapi_example_module_imports_when_optional_dependency_is_available() -> None: + module = _load_example_module('example_fastapi_upload', 'fastapi_upload.py') + assert module.app is not None + assert module.download_template is not None + assert module.import_employees is not None diff --git a/tests/integration/test_excelalchemy_workflows.py b/tests/integration/test_excelalchemy_workflows.py index da64713..916a92d 100644 --- a/tests/integration/test_excelalchemy_workflows.py +++ b/tests/integration/test_excelalchemy_workflows.py @@ -305,6 +305,11 @@ async def test_import_records_cell_errors_for_invalid_simple_workbook(self): assert result is not None assert result.result == ValidateResult.DATA_INVALID + assert alchemy.worksheet_table is alchemy.df + assert alchemy.header_table is alchemy.header_df + assert alchemy.cell_error_map == alchemy.cell_errors + assert alchemy.row_error_map == alchemy.row_errors + assert alchemy.cell_errors == { 0: { 6: [ExcelCellError(label=Label('出生日期'), message='Enter a date in yyyy format')], diff --git a/tests/unit/test_field_metadata.py b/tests/unit/test_field_metadata.py index 91f661c..8d27cfd 100644 --- a/tests/unit/test_field_metadata.py +++ b/tests/unit/test_field_metadata.py @@ -358,6 +358,32 @@ class Importer(BaseModel): assert field_meta.runtime_binding.parent_key == 'email' assert field_meta.presentation_meta.hint == '请输入邮箱' assert field_meta.import_constraints.max_length == 10 + assert field_meta.declared is field_meta.declared_meta + assert field_meta.runtime is field_meta.runtime_binding + assert field_meta.presentation is field_meta.presentation_meta + assert field_meta.constraints is field_meta.import_constraints + + async def test_split_layers_own_comment_and_option_mapping_logic(self): + class Importer(BaseModel): + email: Email = FieldMeta( + label='邮箱', + order=1, + unique=True, + hint='请输入邮箱', + max_length=10, + options=[Option(id=OptionId('work'), name='工作邮箱')], + ) + + alchemy = self.build_alchemy(Importer) + field_meta = alchemy.ordered_field_meta[0] + + assert field_meta.declared.comment_required == '必填性:必填' + assert field_meta.declared.comment_unique == '唯一性:唯一' + assert field_meta.presentation.comment_hint == '提示:请输入邮箱' + assert field_meta.presentation.options_name_map(field_label=field_meta.label) == { + '工作邮箱': Option(id=OptionId('work'), name='工作邮箱') + } + assert field_meta.constraints.comment_max_length == '最大长度:10' async def test_clone_keeps_split_internal_layers_independent(self): class Importer(BaseModel):