Skip to content

Conversation

@camweston-stripe
Copy link
Collaborator

@camweston-stripe camweston-stripe commented Apr 24, 2025

CHIP-10: Chronon PySpark Notebooks Interface

Summary

This PR contributes Stripe's PySpark notebooks integration for Chronon to the open-source repository. It enables ML engineers to define, execute, and analyze Chronon features (GroupBy, Join, and StagingQuery) entirely within notebook environments like Databricks and Jupyter.

The implementation uses a Python-JVM bridge (Py4J) to communicate with Chronon's Scala computation engine, allowing feature development without leaving the notebook environment.

Key Features

1. Platform-Agnostic Executable Framework

  • PySparkExecutable - Abstract base class providing common Python-JVM bridge functionality
  • GroupByExecutable / JoinExecutable / StagingQueryExecutable - Type-specific interfaces with run() and analyze() methods
  • PlatformInterface - Abstract interface for platform-specific behavior (logging, UDF registration, etc.)

2. Databricks Integration (databricks.py)

  • DatabricksPlatform - Platform implementation with DBUtils integration and JVM log capture
  • DatabricksGroupBy / DatabricksJoin / DatabricksStagingQuery - Ready-to-use executables
  • Automatic username prefixing for table isolation

3. Jupyter Integration (jupyter.py) - NEW

  • JupyterPlatform - Platform implementation supporting both file-based and in-memory log capture
  • JupyterGroupBy / JupyterJoin / JupyterStagingQuery - Executables for JupyterLab, JupyterHub, and classic notebooks
  • Configurable name_prefix, output_namespace, and use_username_prefix options
  • JupyterLogCapture - Helper class for capturing py4j/pyspark logs in notebooks

4. StagingQuery Support - NEW

  • Full implementation of StagingQueryExecutable for materializing intermediate SQL transformations
  • Scala JVM bridge methods: parseStagingQuery(), runStagingQuery(), getBooleanOptional()
  • Support for date templates: {{ start_date }}, {{ end_date }}, {{ latest_date }}, {{ max_date(table=...) }}
  • Parameters: step_days, enable_auto_expand, override_start_partition, skip_first_hole

5. PySpark Integration Tests - NEW

  • test_pyspark.py - End-to-end integration tests verifying Python-JVM communication
  • test_helpers.py - Utility functions for creating mock GroupBys and running aggregations through JVM
  • Tests verify the complete flow: Python thrift → JSON → Scala parsing → Spark execution → DataFrame return

6. Scala Unit Tests (PySparkUtilsTest.scala)

  • 18 unit tests covering all PySparkUtils methods
  • Tests for parsing (GroupBy, Join, StagingQuery), Optional helpers, and resolution utilities

Architecture

Python (Notebook)  <--Py4J-->  JVM (Spark/Chronon)
     |                              |
  GroupBy/Join                 PySparkUtils.scala
  (Thrift Obj)    --> JSON -->  parseGroupBy/Join
     |                              |
  .run()          <-- DF  <--   runGroupBy/Join

CI Changes

To enable PySpark integration tests in CI, we made the following changes to .circleci/config.yml:

  1. JAR Building - Added sbt publishLocal step to Python test job to compile Chronon JARs
  2. SBT Caching - Added cache restore/save for ~/.sbt, ~/.ivy2/cache, and ~/.cache/coursier to speed up builds
  3. JAR Discovery - Dynamic JAR path discovery using glob patterns to handle versioned artifacts

The Python tests now:

  • Build the Scala JARs before running
  • Configure SparkSession with correct classpath
  • Verify JVM classes are loadable before running tests

Files Changed

Python (api/py/ai/chronon/pyspark/)

File Description
executables.py Base classes: PySparkExecutable, GroupByExecutable, JoinExecutable, StagingQueryExecutable, PlatformInterface
databricks.py Databricks-specific: DatabricksPlatform, DatabricksGroupBy, DatabricksJoin, DatabricksStagingQuery
jupyter.py NEW - Jupyter-specific: JupyterPlatform, JupyterGroupBy, JupyterJoin, JupyterStagingQuery
constants.py Platform-specific constants (log paths, namespaces)
README.md Comprehensive documentation with architecture, examples, and setup guide

Python Tests (api/py/)

File Description
test/test_pyspark.py NEW - PySpark integration tests
ai/chronon/repo/test_helpers.py NEW - Test utilities for JVM bridge testing

Scala (spark/src/main/scala/ai/chronon/spark/)

File Description
PySparkUtils.scala JVM entry points: parseGroupBy, parseJoin, parseStagingQuery, runGroupBy, runJoin, runStagingQuery, various getOptional helpers
GroupBy.scala Added usingArrayList() method for Py4J-friendly GroupBy construction

Scala Tests

File Description
PySparkUtilsTest.scala NEW - Unit tests for all PySparkUtils methods

Other

File Description
.circleci/config.yml Updated to build JARs and cache SBT dependencies for Python tests
api/py/requirements/dev.in Added pyspark>=3.0.0 dependency

Usage Examples

Databricks

from ai.chronon.pyspark.databricks import DatabricksGroupBy, DatabricksJoin

executable = DatabricksGroupBy(my_group_by, spark)
result_df = executable.run(start_date='20250101', end_date='20250107')

Jupyter

from ai.chronon.pyspark.jupyter import JupyterGroupBy, JupyterJoin

executable = JupyterGroupBy(
    my_group_by, spark,
    name_prefix="my_project",
    output_namespace="my_database"
)
result_df = executable.run(start_date='20250101', end_date='20250107')

StagingQuery

from ai.chronon.pyspark.jupyter import JupyterStagingQuery

executable = JupyterStagingQuery(my_staging_query, spark)
result_df = executable.run(
    end_date='20250107',
    step_days=7,
    enable_auto_expand=True
)

Limitations

  • S3 source resolution - Stripe-specific S3Utils removed; raw table paths must be used

Testing

  • Scala compilation passes
  • Scalafmt check passes
  • Python lint (flake8) passes
  • Scala unit tests pass (18 tests in PySparkUtilsTest)
  • Python unit tests pass (including PySpark integration tests)

Related

camweston-stripe and others added 11 commits December 31, 2025 13:11
Adds test infrastructure for validating the Py4J bridge between Python
GroupBy/Source definitions and their Scala counterparts:

- test_helpers.py: Utility functions to create mock GroupBys and Sources
  via PySparkUtils, and run them through the JVM
- test_pyspark.py: Integration tests that verify end-to-end execution
  of GroupBy aggregations and Source query rendering

These tests exercise the PySparkUtils methods (parseGroupBy, parseSource,
getTimeRangeOptional, getFiveMinuteResolution, etc.) through actual
Spark execution.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
- Fix flake8 E501 line too long errors in test_helpers.py
  - Break long comments into multiple lines
  - Break long function signature into multiple lines
  - Break long docstrings into multiple lines
  - Break long print statement into multiple lines
- Fix pyspark import error in test_pyspark.py
  - Use pytest.importorskip to skip tests if pyspark not installed
  - Reorder imports so skip happens before pyspark imports

Committed-By-Agent: claude
- Remove unused TYPE_CHECKING import from test_helpers.py
- Add noqa: E402 comments for imports after importorskip
- Fix blank line issues (E302, E303)
- Fix line length issues (E501)
- Remove trailing blank line (W391)
- Break long function signatures into multiple lines

All 139 Python tests pass with 1 skip (pyspark tests skipped as expected)

Committed-By-Agent: claude
- Add pyspark>=3.0.0 to dev requirements (regenerated dev.txt)
- Remove importorskip hack - pyspark is now a proper dev dependency
- Use pytest.skip() instead of raising Exception when JARs not found
- Clean up imports (alphabetized, removed noqa comments)

Committed-By-Agent: claude
- Add sbt publishLocal step to "Chronon Python Tests" job
- Use sbt cache to speed up builds
- Remove pytest.skip() - tests should run and fail properly if JARs missing
- Improve error messages when JARs not found

Committed-By-Agent: claude
JARs built from branches have the branch name in the version:
  api_2.12-camweston-CHIP-10-0.0.110-SNAPSHOT.jar

Instead of looking for exact version match, use glob pattern to
find any JAR matching the artifact name (excluding sources/javadoc).

Committed-By-Agent: claude
- Use explicit colon-separated JAR paths instead of glob pattern
- Make fixture module-scoped to ensure JARs are loaded before tests
- Stop any existing SparkSession before creating new one with correct classpath
- Add verification that PySparkUtils class is loadable
- Remove unused tempfile/os imports

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
- Add usingArrayList factory method to GroupBy object for Py4J compatibility
- Skip test_source test (requires renderUnpartitionedDataSourceQuery not in OSS)
- test_group_by validates the core PySpark integration

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
Note: This module requires pyspark to be installed. Import will fail
if pyspark is not available.
"""

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12/31/25 update:

Wanted to provide better testing for this PR before merging. Added pyspark unit tests that will run in CI and execute the JVM group by code. This will help us ensure the connection between Python <> JVM is unaffected by any future changes.

executor: docker_baseimg_executor
steps:
- checkout
- restore_cache:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will result CircleCi caching the jars for the jvm code for the python test stages. As a result we can run our pyspark tests to verify the integration between pyspark <> jvm isn't broken.

camweston-stripe and others added 7 commits December 31, 2025 15:54
- Add jupyter.py with JupyterPlatform, JupyterGroupBy, and JupyterJoin
- Implement proper log capture with both file-based and in-memory options
- Add get_jupyter_user() using getpass for username detection
- Add configurable name_prefix, output_namespace, and use_username_prefix options
- Add Jupyter constants to constants.py
- Update README with Jupyter documentation and examples

Committed-By-Agent: claude
This commit adds full StagingQuery support to the PySpark notebooks
interface, completing the feature parity for all three Chronon types
(GroupBy, Join, StagingQuery).

Changes:
- PySparkUtils.scala: Add parseStagingQuery(), runStagingQuery(), and
  getBooleanOptional() helper methods
- executables.py: Add StagingQueryExecutable base class with run()
  method and staging_query_to_java() helper
- databricks.py: Add DatabricksStagingQuery class
- jupyter.py: Add JupyterStagingQuery class
- README.md: Update documentation with StagingQuery examples and
  updated class diagrams
- PySparkUtilsTest.scala: Add tests for parseStagingQuery and
  getBooleanOptional

StagingQuery allows users to materialize intermediate data
transformations with SQL-based queries, supporting date templates
like {{ start_date }}, {{ end_date }}, {{ latest_date }}, and
{{ max_date(table=...) }}.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
When executing GroupBy or Join with windowed aggregations, the source
data query needs to fetch data going back further than the feature
computation start_date. For example, computing a 3-day window feature
for 20251201-20251202 requires source data starting from 20251128.

This fix applies the same pattern already used in
_execute_underlying_join_sources to:
- _update_source_dates_for_group_by (GroupByExecutable)
- _update_source_dates_for_join_parts (JoinExecutable)

Without this fix, windowed aggregations would produce incorrect results
because the source query would not fetch enough historical data.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
Thrift's JSON codec represents booleans as integers (1/0) rather than
JSON's true/false. The test was failing because the round-trip check
expected the serialized JSON to match the input exactly.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
Remove create_mock_source function from test_helpers.py and the skipped
test_source test from test_pyspark.py. This code relied on the
renderUnpartitionedDataSourceQueryWithArrayList method which is
Stripe-specific and not available in OSS.

The create_mock_source function was only used by test_source, which was
already skipped. Removing this dead code simplifies the test suite.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
The formula `window.length / 60 * 24` was incorrect due to operator
precedence - it evaluated as `(window.length / 60) * 24` which converts
minutes to hours then multiplies by 24 (nonsensical).

Fixed to `window.length / (60 * 24)` which correctly divides by the
total minutes in a day (1440).

Also added comprehensive unit tests for the function covering:
- Default return value when no aggregations
- Windows in days, hours, and minutes
- Rounding behavior (ceiling)
- Multiple windows and aggregations
- Mixed time units

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
High Priority fixes:
- Add StagingQuery to type annotation in set_metadata() obj_type variable

Medium Priority fixes:
- Remove unused print_with_timestamp() method from PySparkExecutable
- Fix variable shadowing by renaming reassigned start_date/end_date
  parameters to effective_start_date/effective_end_date in GroupBy.run()
  and Join.analyze() methods
- Remove unused sampleNumOfRows parameter from PySparkUtils.runJoin()
  and sample_num_of_rows from JoinExecutable.run()

Note: The "resource leak" issues flagged by the reviewer were false
positives - both databricks.py and jupyter.py already use context
managers (with open(...)) for file operations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
camweston-stripe and others added 5 commits December 31, 2025 18:28
- Remove print_with_timestamp() from PySparkExecutable diagram (method was removed)
- Add set_metadata() and get_table_utils() to PlatformInterface diagram

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
The validator functionality exists in OSS as part of the analyzer,
so it's not actually a limitation. Removed references from:
- README.md "Current Limitations" section (removed entirely)
- PR_DESCRIPTION.md limitations section

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Committed-By-Agent: claude
- Remove MINUTES case from get_max_window_for_gb_in_days()
- Remove test_window_in_minutes and test_window_in_minutes_rounds_up tests
- Update test_mixed_time_units_returns_max to use HOURS instead of MINUTES
- Rename test_small_minute_window_returns_minimum_of_one to test_small_hour_window_returns_minimum_of_one

Committed-By-Agent: claude
@camweston-stripe camweston-stripe changed the title [WIP] CHIP-10: PySpark + Notebook Integration for Chronon Feature Development CHIP-10: PySpark + Notebook Integration for Chronon Feature Development Jan 1, 2026
@camweston-stripe camweston-stripe marked this pull request as ready for review January 1, 2026 03:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants