-
Notifications
You must be signed in to change notification settings - Fork 51
feat(docs): KEP- Spark Client for Kubeflow SDK #163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
dfdb297 to
8ad3b6e
Compare
0eafa4c to
b458571
Compare
b458571 to
30f3336
Compare
Pull Request Test Coverage Report for Build 19438736784Details
💛 - Coveralls |
Added reference link to issue kubeflow#107 for context. Signed-off-by: Shekhar Prasad Rajak <5774448+Shekharrajak@users.noreply.github.com>
| # Custom backend implementation | ||
| from kubeflow.spark.backends.base import SparkBackend | ||
|
|
||
| class CustomBackend(SparkBackend): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can extend the backend, if they want to have any specific changes or different way to connect or submit spark job
andreyvelich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this great effort @Shekharrajak!
I left my initial thoughts.
| ### Goals | ||
|
|
||
| - Design a unified, Pythonic SDK for managing Spark applications on Kubernetes | ||
| - Support multiple backends (Kubernetes Operator, REST Gateway, Spark Connect) following the Trainer pattern |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Trainer, backends represent various job submission (local subprocess, container, and Kubernetes). I am not sure if we can replicate it for Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we have job submission using K8S Operator backend, Spark Connector backend , Gateway backend (not implemented - we just have abstract class).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering what is the main motivation to separate SessionClient() and BatchClient()?
Alternatively, we can just have unified SparkClient() which has sessions and batch APIs:
submit_job() <-- to create Spark Application and submit batch job
connect() <-- to create session and connect to existing Spark clusterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- BatchSparkClient users never see connect(), create_session() methods
- SparkSessionClient users never see submit_application(), wait_for_job_status() methods
- This prevents runtime errors: Can't call wait_for_job_status() on a session object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helps user to know which methods are available clearly :
client = SparkClient()
# User sees BOTH batch AND session methods
client.submit_job(...) # For batch
client.connect(...) # For session
client.get_job(...) # Works with connect() or batch() ?
# event if we take arg in config:
client = SparkClient(mode="batch")
client.create_session(...) # IDE will not show error, but runtime error
| ┌───────────┴─────────────┬──────────────────┬────────────────┐ | ||
| ▼ ▼ ▼ ▼ | ||
| ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ | ||
| │OperatorBackend │ │ GatewayBackend │ │ ConnectBackend │ │ LocalBackend │ | ||
| │(Spark Operator │ │ (REST Gateway) │ │ (Spark Connect/ │ │ (Future) │ | ||
| │ CRDs on K8s) │ │ │ │ Interactive) │ │ │ | ||
| └──────────────────┘ └──────────────────┘ └──────────────────┘ └──────────────────┘ | ||
| Batch Jobs Batch Jobs Sessions Local Dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain reason of creating various backends? Can we just have an API: SparkClient().connect() which creates SparkConnect CR and connects to the existing cluster as we discussed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batch backend will have APIs like submit_application, wait_for_completion, get_logs, .. where user will just submit the job and can check the logs/results.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-e692a5819ee6b1dc00cba3b58e91f058c0022d3ca9aa6f3ee468126f245eef89R89
But with interactive session spark client user will be able to run interactive SQL queries and DataFrame operations.
example: https://github.com/kubeflow/sdk/pull/158/files#diff-a5011f48c9d6d16ff6ddd65588f65a7c78abf5fbeb121cccb693c0892ce3a5aeR275
| # Submit a Spark application | ||
| response = client.submit_application( | ||
| app_name="spark-pi", | ||
| main_application_file="local:///opt/spark/examples/src/main/python/pi.py", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if there is a way to allow users bypass function to SparkApplication similar to Trainer API: https://github.com/kubeflow/sdk?tab=readme-ov-file#run-your-first-pytorch-distributed-job
That might be interesting to explore how we can allow to submit SparkApplication without building an image.
| # Step 1: Interactive development with ConnectBackend | ||
| connect_config = ConnectBackendConfig(connect_url="sc://dev-cluster:15002") | ||
| dev_client = SparkClient(backend_config=connect_config) | ||
|
|
||
| with dev_client.create_session("dev") as session: | ||
| # Test and validate query | ||
| test_df = session.sql("SELECT * FROM data LIMIT 1000") | ||
| test_df.show() | ||
| # Iterate and refine... | ||
|
|
||
| # Step 2: Production batch job with OperatorBackend | ||
| prod_config = OperatorBackendConfig(namespace="production") | ||
| prod_client = SparkClient(backend_config=prod_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty interesting experience for dev -> prod Spark lifecycle.
cc @shravan-achar @akshaychitneni @bigsur0 to explore.
| trainer_client.train( | ||
| name="train-model", | ||
| func=train_func, | ||
| num_nodes=4, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| trainer_client.train( | |
| name="train-model", | |
| func=train_func, | |
| num_nodes=4, | |
| ) | |
| trainer_client.train( | |
| trainer=CustomTrainer( | |
| func=train_func, | |
| num_nodes=4 | |
| ) | |
| ) |
| ) | ||
| ``` | ||
|
|
||
| #### Integration with Pipelines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @kubeflow/wg-pipeline-leads to explore
…parkClient and SparkSessionClient
f428a9d to
33a9ad1
Compare
Ref https://docs.google.com/document/d/1l57bBlpxrW4gLgAGnoq9Bg7Shre7Cglv4OLCox7ER_s/edit?tab=t.0
PR: #158