Skip to content

AIP-99: Add LLMSchemaCompareOperator#62793

Merged
gopidesupavan merged 7 commits intoapache:mainfrom
gopidesupavan:llm-schema-compare-operator
Mar 4, 2026
Merged

AIP-99: Add LLMSchemaCompareOperator#62793
gopidesupavan merged 7 commits intoapache:mainfrom
gopidesupavan:llm-schema-compare-operator

Conversation

@gopidesupavan
Copy link
Member

closes: #62734

Add a new operator for cross-system schema drift detection powered by LLM reasoning to the common.ai provider.

LLMSchemaCompareOperator introspects schemas from multiple data sources (databases via DbApiHook, object storage via DataFusionEngine) and uses an LLM to identify mismatches that would break data loading. The LLM handles complex cross-system type mapping that simple equality checks miss (e.g., varchar(255) vs string, timestamp vs timestamptz).

from airflow.sdk import dag
import datetime

@dag
def example_llm_schema_compare():

    @task.llm_schema_compare(
        db_conn_ids=["postgres_default"],
        table_names=["customers", "secondory_customers"],
        llm_conn_id="llm_conn_id",
        model_id="google-gla:gemini-2.5-pro",
    )
    def compare_postgres_customer_tables(ds=None):
        return "Identify schema mismatches that would break data loading between systems"

    compare_postgres_customer_tables()

example_llm_schema_compare()

An example output view:

{
  "summary": "The schemas are not compatible. The target table 'secondory_customers' is missing the 'postal_code' column, and the 'address' column has a restrictive VARCHAR(255) type which can cause data truncation from the source TEXT type. Several other VARCHAR columns have different lengths but are compatible.",
  "compatible": false,
  "mismatches": [
    {
      "column": "postal_code",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "critical",
      "description": "The column 'postal_code' is missing in the target table 'secondory_customers'.",
      "source_type": "VARCHAR(20)",
      "target_type": "Column not found",
      "migration_query": "ALTER TABLE secondory_customers ADD COLUMN postal_code VARCHAR(20);",
      "suggested_action": "Add the 'postal_code' column to the target table."
    },
    {
      "column": "address",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "critical",
      "description": "The 'address' column type in the target is VARCHAR(255), which is more restrictive than the source type TEXT and can lead to data truncation.",
      "source_type": "TEXT",
      "target_type": "VARCHAR(255)",
      "migration_query": "ALTER TABLE secondory_customers ALTER COLUMN address TYPE TEXT;",
      "suggested_action": "Change the data type of the 'address' column in the target table to TEXT to match the source."
    },
    {
      "column": "customer_name",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "info",
      "description": "The VARCHAR length for 'customer_name' differs, but the target type is larger, so it is compatible.",
      "source_type": "VARCHAR(100)",
      "target_type": "VARCHAR(255)",
      "migration_query": "Provide migration query",
      "suggested_action": "No action required, but for consistency you may want to align the varchar lengths."
    },
    {
      "column": "contact_name",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "info",
      "description": "The VARCHAR length for 'contact_name' differs, but the target type is larger, so it is compatible.",
      "source_type": "VARCHAR(100)",
      "target_type": "VARCHAR(255)",
      "migration_query": "Provide migration query",
      "suggested_action": "No action required, but for consistency you may want to align the varchar lengths."
    },
    {
      "column": "city",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "info",
      "description": "The VARCHAR length for 'city' differs, but the target type is larger, so it is compatible.",
      "source_type": "VARCHAR(50)",
      "target_type": "VARCHAR(100)",
      "migration_query": "Provide migration query",
      "suggested_action": "No action required, but for consistency you may want to align the varchar lengths."
    },
    {
      "column": "country",
      "source": "customers",
      "target": "secondory_customers",
      "severity": "info",
      "description": "The VARCHAR length for 'country' differs, but the target type is larger, so it is compatible.",
      "source_type": "VARCHAR(50)",
      "target_type": "VARCHAR(100)",
      "migration_query": "Provide migration query",
      "suggested_action": "No action required, but for consistency you may want to align the varchar lengths."
    }
  ]
}


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@gopidesupavan gopidesupavan requested a review from kaxil as a code owner March 3, 2026 15:43
@gopidesupavan gopidesupavan changed the title Add LLMSchemaCompareOperator AIP-99: Add LLMSchemaCompareOperator Mar 3, 2026
@gopidesupavan gopidesupavan force-pushed the llm-schema-compare-operator branch from e341889 to 4c62fbf Compare March 3, 2026 15:49
@gopidesupavan gopidesupavan force-pushed the llm-schema-compare-operator branch from 4c62fbf to 00e3995 Compare March 3, 2026 16:18
@gopidesupavan gopidesupavan force-pushed the llm-schema-compare-operator branch from 8329899 to b5cbe02 Compare March 3, 2026 22:15
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the first round of feedback — DataFusionEngine lazy import, Literal types, exception handling in _is_dbapi_connection, and the prompt duplication fix all look good. A few new issues from the changes.

@gopidesupavan gopidesupavan force-pushed the llm-schema-compare-operator branch 2 times, most recently from 4533efc to 28506d7 Compare March 3, 2026 23:03
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better. Round 2 issues all addressed: log format strings fixed, type-equivalence hints folded into DEFAULT_SYSTEM_PROMPT, conn_id restored in Source labels, AirflowException through compat, example DAGs cleaned up.

Two remaining items — one typo bug and one design question.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with minor nits (already commented inline):

  1. Missing ) in f-string at line 267 of the operator — ({dialect_name} should be ({dialect_name})
  2. Docstring for system_prompt says "appended" but the behavior is "replaces" — update the wording
  3. Warning logs for PK/FK/index failures should include table_name for context

@gopidesupavan gopidesupavan force-pushed the llm-schema-compare-operator branch from 28506d7 to 06fb433 Compare March 3, 2026 23:49
@gopidesupavan
Copy link
Member Author

LGTM with minor nits (already commented inline):

  1. Missing ) in f-string at line 267 of the operator — ({dialect_name} should be ({dialect_name})
  2. Docstring for system_prompt says "appended" but the behavior is "replaces" — update the wording
  3. Warning logs for PK/FK/index failures should include table_name for context

thank you, if you would like to take a look one more time?

@gopidesupavan gopidesupavan merged commit d8c9fef into apache:main Mar 4, 2026
129 checks passed
@gopidesupavan gopidesupavan deleted the llm-schema-compare-operator branch March 4, 2026 06:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

LLMSchemaCompareOperator / @task.llm_schema_compare

2 participants