Skip to content

Conversation

@ericm-db
Copy link
Contributor

@ericm-db ericm-db commented Jan 5, 2026

What changes were proposed in this pull request?

This PR introduces the NameStreamingSources analyzer rule and supporting infrastructure to enable streaming source evolution. This allows streaming queries to add, remove, or reorder sources without losing state by assigning stable names to sources.

Key changes:

  • Added HasStreamingSourceIdentifyingName trait for uniform name propagation
  • Updated StreamingRelationV2 to support source identifying names
  • Created NameStreamingSources analyzer rule to propagate names from NamedStreamingRelation wrappers
  • Added spark.sql.streaming.queryEvolution.enableStreamingSourceEvolution config flag
  • Added error handling for unnamed sources when enforcement is enabled

Why are the changes needed?

Currently, streaming sources are identified by their position in the query plan (sources/0, sources/1, etc.). This makes it impossible to add, remove, or reorder sources without breaking checkpoint compatibility. By assigning stable names to sources, we enable:

  1. Source evolution: Add/remove/reorder sources without losing state
  2. Stable checkpoint locations: sources/ instead of sources/0, sources/1
  3. Better debugging: Named sources are easier to identify and debug

Does this PR introduce any user-facing change?

No. The infrastructure is in place but the user-facing .name() DataFrame API is not yet exposed. The analyzer rule handles existing NamedStreamingRelation nodes that may be created internally.

How was this patch tested?

  • Added comprehensive unit tests in NameStreamingSourcesSuite (15 test cases)
  • Tests cover name propagation, enforcement checks, error messages, and edge cases
  • Tests verify behavior with UserProvided, FlowAssigned, and Unassigned names

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions
Copy link

github-actions bot commented Jan 5, 2026

JIRA Issue Information

=== Task SPARK-54907 ===
Summary: Introduce NameStreamingSources analyzer rule for streaming source evolution
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@ericm-db ericm-db changed the title [SPARK-54684][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution [SPARK-54907][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution Jan 5, 2026
@ericm-db ericm-db changed the title [SPARK-54907][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution [SPARK-54909][SPARK-54907][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution Jan 5, 2026
@ericm-db ericm-db changed the title [SPARK-54909][SPARK-54907][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution [SPARK-54907][SS] Introduce NameStreamingSources analyzer rule for streaming source evolution Jan 5, 2026
@ericm-db ericm-db force-pushed the SPARK-54684-streaming-source-naming branch 2 times, most recently from 2515f71 to 7dd09f9 Compare January 6, 2026 23:51
…reaming source evolution

This PR introduces the `NameStreamingSources` analyzer rule and supporting infrastructure to enable streaming source evolution. This allows streaming queries to add, remove, or reorder sources without losing state by assigning stable names to sources.

Key changes:
- Added `HasStreamingSourceIdentifyingName` trait for uniform name propagation
- Updated `StreamingRelationV2` to support source identifying names
- Created `NameStreamingSources` analyzer rule to propagate names from `NamedStreamingRelation` wrappers
- Added `spark.sql.streaming.queryEvolution.enableStreamingSourceEvolution` config flag
- Added error handling for unnamed sources when enforcement is enabled

Currently, streaming sources are identified by their position in the query plan (sources/0, sources/1, etc.). This makes it impossible to add, remove, or reorder sources without breaking checkpoint compatibility. By assigning stable names to sources, we enable:

1. **Source evolution**: Add/remove/reorder sources without losing state
2. **Stable checkpoint locations**: sources/<name> instead of sources/0, sources/1
3. **Better debugging**: Named sources are easier to identify and debug

No. The infrastructure is in place but the user-facing `.name()` DataFrame API is not yet exposed. The analyzer rule handles existing `NamedStreamingRelation` nodes that may be created internally.

- Added comprehensive unit tests in `NameStreamingSourcesSuite` (15 test cases)
- Tests cover name propagation, enforcement checks, error messages, and edge cases
- Tests verify behavior with UserProvided, FlowAssigned, and Unassigned names

No.
@ericm-db ericm-db force-pushed the SPARK-54684-streaming-source-naming branch from 7dd09f9 to 22baf62 Compare January 9, 2026 18:15
@ericm-db
Copy link
Contributor Author

ericm-db commented Jan 9, 2026

@dtenedor PTAL

Copy link
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked offline about a couple small mismatches between config names in error messages, etc. LGTM now.

@dtenedor
Copy link
Contributor

dtenedor commented Jan 9, 2026

LGTM, merging to master.

@dtenedor dtenedor closed this in cc9e411 Jan 9, 2026
Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Jan 9, 2026
…reaming source evolution

## What changes were proposed in this pull request?

This PR introduces the `NameStreamingSources` analyzer rule and supporting infrastructure to enable streaming source evolution. This allows streaming queries to add, remove, or reorder sources without losing state by assigning stable names to sources.

Key changes:
- Added `HasStreamingSourceIdentifyingName` trait for uniform name propagation
- Updated `StreamingRelationV2` to support source identifying names
- Created `NameStreamingSources` analyzer rule to propagate names from `NamedStreamingRelation` wrappers
- Added `spark.sql.streaming.queryEvolution.enableStreamingSourceEvolution` config flag
- Added error handling for unnamed sources when enforcement is enabled

## Why are the changes needed?

Currently, streaming sources are identified by their position in the query plan (sources/0, sources/1, etc.). This makes it impossible to add, remove, or reorder sources without breaking checkpoint compatibility. By assigning stable names to sources, we enable:

1. **Source evolution**: Add/remove/reorder sources without losing state
2. **Stable checkpoint locations**: sources/<name> instead of sources/0, sources/1
3. **Better debugging**: Named sources are easier to identify and debug

## Does this PR introduce _any_ user-facing change?

No. The infrastructure is in place but the user-facing `.name()` DataFrame API is not yet exposed. The analyzer rule handles existing `NamedStreamingRelation` nodes that may be created internally.

## How was this patch tested?

- Added comprehensive unit tests in `NameStreamingSourcesSuite` (15 test cases)
- Tests cover name propagation, enforcement checks, error messages, and edge cases
- Tests verify behavior with UserProvided, FlowAssigned, and Unassigned names

## Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#53684 from ericm-db/SPARK-54684-streaming-source-naming.

Lead-authored-by: Eric Marnadi <132308037+ericm-db@users.noreply.github.com>
Co-authored-by: ericm-db <eric.marnadi@databricks.com>
Signed-off-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants