Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.7.0
3.7.1
12 changes: 12 additions & 0 deletions src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
SourceType,
TopicConfig,
)
from .stateless_transformation import (
ExpressionConfig,
StatelessTransformationConfig,
StatelessTransformationConfigPatch,
StatelessTransformationType,
Transformation,
)

__all__ = [
"ClickhouseDataType",
Expand Down Expand Up @@ -57,4 +64,9 @@
"KafkaConnectionParamsPatch",
"DeduplicationConfigPatch",
"JoinConfigPatch",
"StatelessTransformationConfig",
"StatelessTransformationConfigPatch",
"StatelessTransformationType",
"ExpressionConfig",
"Transformation",
]
26 changes: 24 additions & 2 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from .schema import Schema
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch
from .stateless_transformation import (
StatelessTransformationConfig,
StatelessTransformationConfigPatch,
)


class PipelineStatus(CaseInsensitiveStrEnum):
Expand All @@ -34,6 +38,9 @@ class PipelineConfig(BaseModel):
metadata: Optional[MetadataConfig] = Field(default=MetadataConfig())
sink: SinkConfig
pipeline_schema: Schema = Field(alias="schema")
stateless_transformation: Optional[StatelessTransformationConfig] = Field(
default=StatelessTransformationConfig()
)

@field_validator("pipeline_id")
@classmethod
Expand Down Expand Up @@ -63,10 +70,14 @@ def validate_config(self) -> "PipelineConfig":

# Validate schema
topic_names = [topic.name for topic in self.source.topics]
source_ids = topic_names.copy()
if self.stateless_transformation.enabled:
source_ids.append(self.stateless_transformation.id)
for field in self.pipeline_schema.fields:
if field.source_id not in topic_names:
if field.source_id not in source_ids:
raise ValueError(
f"Source '{field.source_id}' does not exist in any topic"
f"Source '{field.source_id}' does not exist in any topic "
"or is not a stateless transformation id"
)

# Validate deduplication ID fields
Expand Down Expand Up @@ -147,6 +158,13 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
if config_patch.metadata is not None:
updated_config.metadata = config_patch.metadata

# Update stateless transformation if provided
if config_patch.stateless_transformation is not None:
updated_config.stateless_transformation = (
updated_config.stateless_transformation
or StatelessTransformationConfig()
).update(config_patch.stateless_transformation)

return updated_config


Expand All @@ -158,3 +176,7 @@ class PipelineConfigPatch(BaseModel):
pipeline_schema: Optional[Schema] = Field(default=None, alias="schema")
sink: Optional[SinkConfigPatch] = Field(default=None)
source: Optional[SourceConfigPatch] = Field(default=None)
stateless_transformation: Optional[StatelessTransformationConfigPatch] = Field(
default=None
)
version: Optional[str] = Field(default=None)
80 changes: 80 additions & 0 deletions src/glassflow/etl/models/stateless_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Any, List, Optional

from pydantic import BaseModel, Field, model_validator

from .base import CaseInsensitiveStrEnum


class StatelessTransformationType(CaseInsensitiveStrEnum):
EXPRESSION = "expr_lang_transform"


class Transformation(BaseModel):
expression: str = Field(description="The transformation expression")
output_name: str = Field(description="The name of the output column")
output_type: str = Field(description="The type of the output column")


class ExpressionConfig(BaseModel):
transform: List[Transformation] = Field(description="The transformation expression")


class StatelessTransformationConfig(BaseModel):
enabled: bool = Field(
description="Whether the stateless transformation is enabled",
default=False,
)
id: Optional[str] = Field(
description="The ID of the stateless transformation", default=None
)
type: Optional[StatelessTransformationType] = Field(
description="The type of the stateless transformation",
default=StatelessTransformationType.EXPRESSION,
)
config: Optional[ExpressionConfig] = Field(
description="The configuration of the stateless transformation", default=None
)

@model_validator(mode="after")
def validate(self) -> "StatelessTransformationConfig":
if self.enabled:
if not self.id:
raise ValueError(
"id is required when stateless transformation is enabled"
)
if not self.type:
raise ValueError(
"type is required when stateless transformation is enabled"
)
if not self.config:
raise ValueError(
"config is required when stateless transformation is enabled"
)
return self

def update(
self, patch: "StatelessTransformationConfigPatch"
) -> "StatelessTransformationConfig":
"""Apply a patch to this stateless transformation config."""
update_dict: dict[str, Any] = {}

if patch.enabled is not None:
update_dict["enabled"] = patch.enabled
if patch.id is not None:
update_dict["id"] = patch.id
if patch.type is not None:
update_dict["type"] = patch.type
if patch.config is not None:
update_dict["config"] = patch.config

if update_dict:
return self.model_copy(update=update_dict)

return self


class StatelessTransformationConfigPatch(BaseModel):
enabled: Optional[bool] = Field(default=None)
id: Optional[str] = Field(default=None)
type: Optional[StatelessTransformationType] = Field(default=None)
config: Optional[ExpressionConfig] = Field(default=None)
29 changes: 22 additions & 7 deletions tests/data/pipeline_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ def get_valid_pipeline_config() -> dict:
"enabled": True,
"expression": "user_id = '123'",
},
"stateless_transformation": {
"enabled": True,
"id": "my_transformation",
"type": "expr_lang_transform",
"config": {
"transform": [
{
"expression": "upper(user_id)",
"output_name": "upper_user_id",
"output_type": "string",
},
],
},
},
"sink": {
"type": "clickhouse",
"provider": "aiven",
Expand Down Expand Up @@ -104,13 +118,7 @@ def get_valid_pipeline_config() -> dict:
"column_name": "order_id",
"column_type": "String",
},
{
"source_id": "orders",
"name": "user_id",
"type": "string",
"column_name": "user_id",
"column_type": "String",
},
{"source_id": "orders", "name": "user_id", "type": "string"},
{
"source_id": "user_logins",
"name": "timestamp",
Expand All @@ -130,6 +138,13 @@ def get_valid_pipeline_config() -> dict:
"name": "skip_sink_field",
"type": "string",
},
{
"source_id": "my_transformation",
"name": "upper_user_id",
"type": "string",
"column_name": "upper_user_id",
"column_type": "String",
},
],
},
}
Expand Down
25 changes: 22 additions & 3 deletions tests/data/valid_pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@
"enabled": true,
"expression": "user_id = '123'"
},
"stateless_transformation": {
"enabled": true,
"id": "my_transformation",
"type": "expr_lang_transform",
"config": {
"transform": [
{
"expression": "upper(user_id)",
"output_name": "upper_user_id",
"output_type": "string"
}
]
}
},
"sink": {
"type": "clickhouse",
"provider": "aiven",
Expand Down Expand Up @@ -101,9 +115,7 @@
{
"source_id": "orders",
"name": "user_id",
"type": "string",
"column_name": "user_id",
"column_type": "String"
"type": "string"
},
{
"source_id": "user_logins",
Expand All @@ -123,6 +135,13 @@
"source_id": "orders",
"name": "skip_sink_field",
"type": "string"
},
{
"source_id": "my_transformation",
"name": "upper_user_id",
"type": "string",
"column_name": "upper_user_id",
"column_type": "String"
}
]
}
Expand Down
20 changes: 16 additions & 4 deletions tests/data/valid_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ pipeline_id: test-pipeline
filter:
enabled: true
expression: user_id = '123'
stateless_transformation:
enabled: true
id: my_transformation
type: expr_lang_transform
config:
transform:
- expression: upper(user_id)
output_name: upper_user_id
output_type: string
sink:
database: default
host: <host>
Expand Down Expand Up @@ -73,9 +82,7 @@ schema:
name: order_id
source_id: orders
type: string
- column_name: user_id
column_type: String
name: user_id
- name: user_id
source_id: orders
type: string
- column_name: login_at
Expand All @@ -90,4 +97,9 @@ schema:
type: string
- name: skip_sink_field
source_id: orders
type: string
type: string
- name: upper_user_id
source_id: my_transformation
type: string
column_name: upper_user_id
column_type: String
48 changes: 48 additions & 0 deletions tests/test_models/test_config_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ def test_update_name(self, valid_config):
assert updated.pipeline_id == config.pipeline_id
assert updated.source == config.source
assert updated.sink == config.sink
assert updated.stateless_transformation == config.stateless_transformation
assert updated.join == config.join
assert updated.filter == config.filter
assert updated.metadata == config.metadata
assert updated.pipeline_schema == config.pipeline_schema
# Original config should be unchanged (immutable)
assert config.name != "Updated Name"

Expand Down Expand Up @@ -112,6 +117,49 @@ def test_update_multiple_fields(self, valid_config):
assert updated.source.provider == "updated-provider"
assert updated.sink.host == "updated-host"

def test_update_filter(self, valid_config):
"""Test updating filter configuration."""
config = models.PipelineConfig(**valid_config)
patch = models.PipelineConfigPatch(
filter=models.FilterConfigPatch(expression="user_id = '321'")
)

updated = config.update(patch)

assert updated.filter.expression == "user_id = '321'"
assert updated.filter.enabled is True

def test_update_stateless_transformation(self, valid_config):
"""Test updating stateless transformation configuration."""
config = models.PipelineConfig(**valid_config)
patch = models.PipelineConfigPatch(
stateless_transformation=models.StatelessTransformationConfigPatch(
config={
"transform": [
{
"expression": "lower(user_id)",
"output_name": "lower_user_id",
"output_type": "string",
}
]
}
)
)

updated = config.update(patch)

assert (
updated.stateless_transformation.config.transform[0].expression
== "lower(user_id)"
)
assert (
updated.stateless_transformation.config.transform[0].output_name
== "lower_user_id"
)
assert (
updated.stateless_transformation.config.transform[0].output_type == "string"
)

def test_update_empty_patch(self, valid_config):
"""Test updating with an empty patch (all None)."""
config = models.PipelineConfig(**valid_config)
Expand Down
1 change: 1 addition & 0 deletions tests/test_models/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ def test_join_validation_error_scenarios(self, valid_config, scenario):
join=scenario["join"](valid_config),
sink=valid_config["sink"],
schema=valid_config["schema"],
stateless_transformation=valid_config["stateless_transformation"],
)
assert scenario["error_message"] in str(exc_info.value)
3 changes: 3 additions & 0 deletions tests/test_models/test_pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
join=valid_config["join"],
sink=valid_config["sink"],
schema=valid_config["schema"],
stateless_transformation=valid_config["stateless_transformation"],
)
assert config.pipeline_id == "test-pipeline-123a"

Expand Down Expand Up @@ -97,6 +98,7 @@ def test_pipeline_config_pipeline_name_provided(self, valid_config):
join=valid_config["join"],
sink=valid_config["sink"],
schema=valid_config["schema"],
stateless_transformation=valid_config["stateless_transformation"],
)
assert config.pipeline_id == "test-pipeline"
assert config.name == "My Custom Pipeline Name"
Expand All @@ -109,6 +111,7 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config):
join=valid_config["join"],
sink=valid_config["sink"],
schema=valid_config["schema"],
stateless_transformation=valid_config["stateless_transformation"],
)
assert config.pipeline_id == "test-pipeline"
assert config.name == "Test Pipeline"