diff --git a/VERSION b/VERSION index 7c69a55..a76ccff 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.7.0 +3.7.1 diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 7955940..683f900 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -25,6 +25,13 @@ SourceType, TopicConfig, ) +from .stateless_transformation import ( + ExpressionConfig, + StatelessTransformationConfig, + StatelessTransformationConfigPatch, + StatelessTransformationType, + Transformation, +) __all__ = [ "ClickhouseDataType", @@ -57,4 +64,9 @@ "KafkaConnectionParamsPatch", "DeduplicationConfigPatch", "JoinConfigPatch", + "StatelessTransformationConfig", + "StatelessTransformationConfigPatch", + "StatelessTransformationType", + "ExpressionConfig", + "Transformation", ] diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index 629e29a..61404e6 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -10,6 +10,10 @@ from .schema import Schema from .sink import SinkConfig, SinkConfigPatch from .source import SourceConfig, SourceConfigPatch +from .stateless_transformation import ( + StatelessTransformationConfig, + StatelessTransformationConfigPatch, +) class PipelineStatus(CaseInsensitiveStrEnum): @@ -34,6 +38,9 @@ class PipelineConfig(BaseModel): metadata: Optional[MetadataConfig] = Field(default=MetadataConfig()) sink: SinkConfig pipeline_schema: Schema = Field(alias="schema") + stateless_transformation: Optional[StatelessTransformationConfig] = Field( + default=StatelessTransformationConfig() + ) @field_validator("pipeline_id") @classmethod @@ -63,10 +70,14 @@ def validate_config(self) -> "PipelineConfig": # Validate schema topic_names = [topic.name for topic in self.source.topics] + source_ids = topic_names.copy() + if self.stateless_transformation.enabled: + source_ids.append(self.stateless_transformation.id) for field in self.pipeline_schema.fields: - if field.source_id not in topic_names: + if field.source_id not in source_ids: raise ValueError( - f"Source '{field.source_id}' does not exist in any topic" + f"Source '{field.source_id}' does not exist in any topic " + "or is not a stateless transformation id" ) # Validate deduplication ID fields @@ -147,6 +158,13 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": if config_patch.metadata is not None: updated_config.metadata = config_patch.metadata + # Update stateless transformation if provided + if config_patch.stateless_transformation is not None: + updated_config.stateless_transformation = ( + updated_config.stateless_transformation + or StatelessTransformationConfig() + ).update(config_patch.stateless_transformation) + return updated_config @@ -158,3 +176,7 @@ class PipelineConfigPatch(BaseModel): pipeline_schema: Optional[Schema] = Field(default=None, alias="schema") sink: Optional[SinkConfigPatch] = Field(default=None) source: Optional[SourceConfigPatch] = Field(default=None) + stateless_transformation: Optional[StatelessTransformationConfigPatch] = Field( + default=None + ) + version: Optional[str] = Field(default=None) diff --git a/src/glassflow/etl/models/stateless_transformation.py b/src/glassflow/etl/models/stateless_transformation.py new file mode 100644 index 0000000..07a7f28 --- /dev/null +++ b/src/glassflow/etl/models/stateless_transformation.py @@ -0,0 +1,80 @@ +from typing import Any, List, Optional + +from pydantic import BaseModel, Field, model_validator + +from .base import CaseInsensitiveStrEnum + + +class StatelessTransformationType(CaseInsensitiveStrEnum): + EXPRESSION = "expr_lang_transform" + + +class Transformation(BaseModel): + expression: str = Field(description="The transformation expression") + output_name: str = Field(description="The name of the output column") + output_type: str = Field(description="The type of the output column") + + +class ExpressionConfig(BaseModel): + transform: List[Transformation] = Field(description="The transformation expression") + + +class StatelessTransformationConfig(BaseModel): + enabled: bool = Field( + description="Whether the stateless transformation is enabled", + default=False, + ) + id: Optional[str] = Field( + description="The ID of the stateless transformation", default=None + ) + type: Optional[StatelessTransformationType] = Field( + description="The type of the stateless transformation", + default=StatelessTransformationType.EXPRESSION, + ) + config: Optional[ExpressionConfig] = Field( + description="The configuration of the stateless transformation", default=None + ) + + @model_validator(mode="after") + def validate(self) -> "StatelessTransformationConfig": + if self.enabled: + if not self.id: + raise ValueError( + "id is required when stateless transformation is enabled" + ) + if not self.type: + raise ValueError( + "type is required when stateless transformation is enabled" + ) + if not self.config: + raise ValueError( + "config is required when stateless transformation is enabled" + ) + return self + + def update( + self, patch: "StatelessTransformationConfigPatch" + ) -> "StatelessTransformationConfig": + """Apply a patch to this stateless transformation config.""" + update_dict: dict[str, Any] = {} + + if patch.enabled is not None: + update_dict["enabled"] = patch.enabled + if patch.id is not None: + update_dict["id"] = patch.id + if patch.type is not None: + update_dict["type"] = patch.type + if patch.config is not None: + update_dict["config"] = patch.config + + if update_dict: + return self.model_copy(update=update_dict) + + return self + + +class StatelessTransformationConfigPatch(BaseModel): + enabled: Optional[bool] = Field(default=None) + id: Optional[str] = Field(default=None) + type: Optional[StatelessTransformationType] = Field(default=None) + config: Optional[ExpressionConfig] = Field(default=None) diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index d9c2f87..a0f9ca6 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -68,6 +68,20 @@ def get_valid_pipeline_config() -> dict: "enabled": True, "expression": "user_id = '123'", }, + "stateless_transformation": { + "enabled": True, + "id": "my_transformation", + "type": "expr_lang_transform", + "config": { + "transform": [ + { + "expression": "upper(user_id)", + "output_name": "upper_user_id", + "output_type": "string", + }, + ], + }, + }, "sink": { "type": "clickhouse", "provider": "aiven", @@ -104,13 +118,7 @@ def get_valid_pipeline_config() -> dict: "column_name": "order_id", "column_type": "String", }, - { - "source_id": "orders", - "name": "user_id", - "type": "string", - "column_name": "user_id", - "column_type": "String", - }, + {"source_id": "orders", "name": "user_id", "type": "string"}, { "source_id": "user_logins", "name": "timestamp", @@ -130,6 +138,13 @@ def get_valid_pipeline_config() -> dict: "name": "skip_sink_field", "type": "string", }, + { + "source_id": "my_transformation", + "name": "upper_user_id", + "type": "string", + "column_name": "upper_user_id", + "column_type": "String", + }, ], }, } diff --git a/tests/data/valid_pipeline.json b/tests/data/valid_pipeline.json index 57559a1..02cd676 100644 --- a/tests/data/valid_pipeline.json +++ b/tests/data/valid_pipeline.json @@ -61,6 +61,20 @@ "enabled": true, "expression": "user_id = '123'" }, + "stateless_transformation": { + "enabled": true, + "id": "my_transformation", + "type": "expr_lang_transform", + "config": { + "transform": [ + { + "expression": "upper(user_id)", + "output_name": "upper_user_id", + "output_type": "string" + } + ] + } + }, "sink": { "type": "clickhouse", "provider": "aiven", @@ -101,9 +115,7 @@ { "source_id": "orders", "name": "user_id", - "type": "string", - "column_name": "user_id", - "column_type": "String" + "type": "string" }, { "source_id": "user_logins", @@ -123,6 +135,13 @@ "source_id": "orders", "name": "skip_sink_field", "type": "string" + }, + { + "source_id": "my_transformation", + "name": "upper_user_id", + "type": "string", + "column_name": "upper_user_id", + "column_type": "String" } ] } diff --git a/tests/data/valid_pipeline.yaml b/tests/data/valid_pipeline.yaml index b19e662..75ad5c0 100644 --- a/tests/data/valid_pipeline.yaml +++ b/tests/data/valid_pipeline.yaml @@ -15,6 +15,15 @@ pipeline_id: test-pipeline filter: enabled: true expression: user_id = '123' +stateless_transformation: + enabled: true + id: my_transformation + type: expr_lang_transform + config: + transform: + - expression: upper(user_id) + output_name: upper_user_id + output_type: string sink: database: default host: @@ -73,9 +82,7 @@ schema: name: order_id source_id: orders type: string - - column_name: user_id - column_type: String - name: user_id + - name: user_id source_id: orders type: string - column_name: login_at @@ -90,4 +97,9 @@ schema: type: string - name: skip_sink_field source_id: orders - type: string \ No newline at end of file + type: string + - name: upper_user_id + source_id: my_transformation + type: string + column_name: upper_user_id + column_type: String \ No newline at end of file diff --git a/tests/test_models/test_config_update.py b/tests/test_models/test_config_update.py index 0052ead..451dfa4 100644 --- a/tests/test_models/test_config_update.py +++ b/tests/test_models/test_config_update.py @@ -17,6 +17,11 @@ def test_update_name(self, valid_config): assert updated.pipeline_id == config.pipeline_id assert updated.source == config.source assert updated.sink == config.sink + assert updated.stateless_transformation == config.stateless_transformation + assert updated.join == config.join + assert updated.filter == config.filter + assert updated.metadata == config.metadata + assert updated.pipeline_schema == config.pipeline_schema # Original config should be unchanged (immutable) assert config.name != "Updated Name" @@ -112,6 +117,49 @@ def test_update_multiple_fields(self, valid_config): assert updated.source.provider == "updated-provider" assert updated.sink.host == "updated-host" + def test_update_filter(self, valid_config): + """Test updating filter configuration.""" + config = models.PipelineConfig(**valid_config) + patch = models.PipelineConfigPatch( + filter=models.FilterConfigPatch(expression="user_id = '321'") + ) + + updated = config.update(patch) + + assert updated.filter.expression == "user_id = '321'" + assert updated.filter.enabled is True + + def test_update_stateless_transformation(self, valid_config): + """Test updating stateless transformation configuration.""" + config = models.PipelineConfig(**valid_config) + patch = models.PipelineConfigPatch( + stateless_transformation=models.StatelessTransformationConfigPatch( + config={ + "transform": [ + { + "expression": "lower(user_id)", + "output_name": "lower_user_id", + "output_type": "string", + } + ] + } + ) + ) + + updated = config.update(patch) + + assert ( + updated.stateless_transformation.config.transform[0].expression + == "lower(user_id)" + ) + assert ( + updated.stateless_transformation.config.transform[0].output_name + == "lower_user_id" + ) + assert ( + updated.stateless_transformation.config.transform[0].output_type == "string" + ) + def test_update_empty_patch(self, valid_config): """Test updating with an empty patch (all None).""" config = models.PipelineConfig(**valid_config) diff --git a/tests/test_models/test_join.py b/tests/test_models/test_join.py index c6c6bbd..230ec65 100644 --- a/tests/test_models/test_join.py +++ b/tests/test_models/test_join.py @@ -58,5 +58,6 @@ def test_join_validation_error_scenarios(self, valid_config, scenario): join=scenario["join"](valid_config), sink=valid_config["sink"], schema=valid_config["schema"], + stateless_transformation=valid_config["stateless_transformation"], ) assert scenario["error_message"] in str(exc_info.value) diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py index 6a5d221..cfe85e7 100644 --- a/tests/test_models/test_pipeline_config.py +++ b/tests/test_models/test_pipeline_config.py @@ -27,6 +27,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): join=valid_config["join"], sink=valid_config["sink"], schema=valid_config["schema"], + stateless_transformation=valid_config["stateless_transformation"], ) assert config.pipeline_id == "test-pipeline-123a" @@ -97,6 +98,7 @@ def test_pipeline_config_pipeline_name_provided(self, valid_config): join=valid_config["join"], sink=valid_config["sink"], schema=valid_config["schema"], + stateless_transformation=valid_config["stateless_transformation"], ) assert config.pipeline_id == "test-pipeline" assert config.name == "My Custom Pipeline Name" @@ -109,6 +111,7 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config): join=valid_config["join"], sink=valid_config["sink"], schema=valid_config["schema"], + stateless_transformation=valid_config["stateless_transformation"], ) assert config.pipeline_id == "test-pipeline" assert config.name == "Test Pipeline"