Skip to content

Commit 17c21a9

Browse files
Rahul Naskarrahul810050
authored andcommitted
feat(scheduledsparkapplication): configurable timestampPrecision (nanos|micros|millis|seconds|minutes)
Add optional spec.timestampPrecision to configure the precision of the timestamp suffix appended to generated SparkApplication names for scheduled runs. Default remains 'nanos' for backward compatibility. Adds 'minutes' option to match CronJob granularity and keep generated names short. Includes helper function, unit tests and optional chart value. Fixes: #2602 Signed-off-by: rahul810050 <rahul810050@gmail.com>
1 parent f53373e commit 17c21a9

File tree

7 files changed

+209
-5
lines changed

7 files changed

+209
-5
lines changed

api/v1beta2/scheduledsparkapplication_types.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
2424
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
2525

26+
// ScheduledSparkApplicationSpec defines the desired state of ScheduledSparkApplication.
2627
// ScheduledSparkApplicationSpec defines the desired state of ScheduledSparkApplication.
2728
type ScheduledSparkApplicationSpec struct {
2829
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
@@ -38,6 +39,16 @@ type ScheduledSparkApplicationSpec struct {
3839
TimeZone string `json:"timeZone,omitempty"`
3940
// Template is a template from which SparkApplication instances can be created.
4041
Template SparkApplicationSpec `json:"template"`
42+
// TimestampPrecision controls the precision of the timestamp appended to generated
43+
// SparkApplication names for scheduled runs.
44+
//
45+
// Allowed values: "nanos", "micros", "millis", "seconds", "minutes"
46+
// +kubebuilder:validation:Enum=nanos;micros;millis;seconds;minutes
47+
// +optional
48+
// +kubebuilder:default:=nanos
49+
// Defaults to "nanos" to preserve current behavior.
50+
TimestampPrecision string `json:"timestampPrecision,omitempty"`
51+
4152
// Suspend is a flag telling the controller to suspend subsequent runs of the application if set to true.
4253
// +optional
4354
// Defaults to false.

charts/spark-operator-chart/templates/controller/deployment.yaml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,15 @@ spec:
135135
containerPort: {{ .Values.prometheus.metrics.port }}
136136
{{- end }}
137137
{{- end }}
138-
{{- with .Values.controller.env }}
138+
139+
# --- Global scheduled-spark-app timestamp precision env var (default from values.yaml)
139140
env:
140-
{{- toYaml . | nindent 8 }}
141+
- name: SCHEDULED_SA_TIMESTAMP_PRECISION
142+
value: {{ .Values.controller.scheduledSparkApplication.timestampPrecision | quote }}
143+
{{- with .Values.controller.env }}
144+
{{ toYaml . | nindent 8 }}
141145
{{- end }}
146+
142147
{{- with .Values.controller.envFrom }}
143148
envFrom:
144149
{{- toYaml . | nindent 8 }}

charts/spark-operator-chart/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ hook:
5757
tag: ""
5858

5959
controller:
60+
# -- default precision for ScheduledSparkApplication timestamp suffix
61+
scheduledSparkApplication:
62+
timestampPrecision: "nanos"
63+
6064
# -- Number of replicas of controller.
6165
replicas: 1
6266

config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ spec:
6060
metadata:
6161
type: object
6262
spec:
63-
description: ScheduledSparkApplicationSpec defines the desired state of
64-
ScheduledSparkApplication.
63+
description: |-
64+
ScheduledSparkApplicationSpec defines the desired state of ScheduledSparkApplication.
65+
ScheduledSparkApplicationSpec defines the desired state of ScheduledSparkApplication.
6566
properties:
6667
concurrencyPolicy:
6768
description: ConcurrencyPolicy is the policy governing concurrent
@@ -12418,6 +12419,21 @@ spec:
1241812419
or a valid IANA location name e.g. "America/New_York".
1241912420
Defaults to "Local".
1242012421
type: string
12422+
timestampPrecision:
12423+
default: nanos
12424+
description: |-
12425+
TimestampPrecision controls the precision of the timestamp appended to generated
12426+
SparkApplication names for scheduled runs.
12427+
12428+
Allowed values: "nanos", "micros", "millis", "seconds", "minutes"
12429+
Defaults to "nanos" to preserve current behavior.
12430+
enum:
12431+
- nanos
12432+
- micros
12433+
- millis
12434+
- seconds
12435+
- minutes
12436+
type: string
1242112437
required:
1242212438
- schedule
1242312439
- template

internal/controller/scheduledsparkapplication/controller.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ import (
4242
"sigs.k8s.io/controller-runtime/pkg/log"
4343
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4444

45+
"os"
46+
"strconv"
47+
4548
"github.com/kubeflow/spark-operator/v2/api/v1beta2"
4649
"github.com/kubeflow/spark-operator/v2/pkg/common"
4750
)
@@ -236,9 +239,24 @@ func (r *Reconciler) createSparkApplication(
236239
for key, value := range scheduledApp.Labels {
237240
labels[key] = value
238241
}
242+
243+
// Determine timestamp precision with precedence:
244+
// 1) controller-wide env var SCHEDULED_SA_TIMESTAMP_PRECISION (if set and non-empty)
245+
// 2) per-app scheduledApp.Spec.TimestampPrecision (if set)
246+
// 3) default "nanos" for backward compatibility
247+
precision := "nanos" // fallback default
248+
249+
if envPrecision, ok := os.LookupEnv("SCHEDULED_SA_TIMESTAMP_PRECISION"); ok && strings.TrimSpace(envPrecision) != "" {
250+
precision = strings.TrimSpace(envPrecision)
251+
} else if strings.TrimSpace(scheduledApp.Spec.TimestampPrecision) != "" {
252+
precision = strings.TrimSpace(scheduledApp.Spec.TimestampPrecision)
253+
}
254+
255+
suffix := formatTimestamp(precision, t)
256+
239257
app := &v1beta2.SparkApplication{
240258
ObjectMeta: metav1.ObjectMeta{
241-
Name: fmt.Sprintf("%s-%d", scheduledApp.Name, t.UnixNano()),
259+
Name: fmt.Sprintf("%s-%s", scheduledApp.Name, suffix),
242260
Namespace: scheduledApp.Namespace,
243261
Labels: labels,
244262
OwnerReferences: []metav1.OwnerReference{{
@@ -257,6 +275,28 @@ func (r *Reconciler) createSparkApplication(
257275
return app, nil
258276
}
259277

278+
// formatTimestamp returns a decimal timestamp string according to the requested precision.
279+
// Allowed precisions: "nanos", "micros", "millis", "seconds", "minutes".
280+
// If precision is empty or unrecognized, defaults to "nanos" (current behavior).
281+
func formatTimestamp(precision string, t time.Time) string {
282+
switch precision {
283+
case "minutes":
284+
// Use Unix epoch minutes. matches CronJob approach which is per-minute granularity.
285+
return strconv.FormatInt(t.Unix()/60, 10)
286+
case "seconds":
287+
return strconv.FormatInt(t.Unix(), 10)
288+
case "millis":
289+
// Use UnixNano()/1e6 for compatibility with older Go versions.
290+
return strconv.FormatInt(t.UnixNano()/1e6, 10)
291+
case "micros":
292+
return strconv.FormatInt(t.UnixNano()/1e3, 10)
293+
case "nanos":
294+
fallthrough
295+
default:
296+
return strconv.FormatInt(t.UnixNano(), 10)
297+
}
298+
}
299+
260300
// shouldStartNextRun checks if the next run should be started.
261301
func (r *Reconciler) shouldStartNextRun(scheduledApp *v1beta2.ScheduledSparkApplication) (bool, error) {
262302
apps, err := r.listSparkApplications(scheduledApp)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package scheduledsparkapplication
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestFormatTimestampLengths(t *testing.T) {
9+
// deterministic time so tests are stable
10+
now := time.Unix(1700000000, 123456789) // arbitrary fixed timestamp
11+
12+
cases := map[string]int{
13+
"minutes": 8, // 1700000000 / 60 ~= 28333333 (8 digits)
14+
"seconds": 10, // 1700000000 -> 10 digits
15+
"millis": 13,
16+
"micros": 16,
17+
"nanos": 19,
18+
}
19+
20+
for precision, wantLen := range cases {
21+
s := formatTimestamp(precision, now)
22+
if len(s) != wantLen {
23+
t.Fatalf("precision=%s: got len %d, want %d (value=%s)", precision, len(s), wantLen, s)
24+
}
25+
}
26+
}

scripts/setup-envtest-binaries.sh

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#!/usr/bin/env bash
2+
# scripts/setup-envtest-binaries.sh
3+
# Download kube-apiserver and etcd for envtest to the location expected by tests.
4+
# Usage: from repo root: bash scripts/setup-envtest-binaries.sh
5+
# (Make executable with chmod +x scripts/setup-envtest-binaries.sh)
6+
7+
set -euo pipefail
8+
9+
# --- Configuration: edit if your tests expect a different K8S version
10+
K8S_VERSION="v1.32.0" # kube-apiserver/kube-controller-manager version envtest expects
11+
ETCD_VERSION="v3.5.11" # etcd version (3.5.x recommended)
12+
TARGET_DIR="./bin/k8s/${K8S_VERSION}-linux-amd64" # target where envtest looked for binaries
13+
14+
echo
15+
echo "==> Setting up envtest binaries"
16+
echo "Kubernetes version: ${K8S_VERSION}"
17+
echo "etcd version: ${ETCD_VERSION}"
18+
echo "Target directory: ${TARGET_DIR}"
19+
echo
20+
21+
mkdir -p "${TARGET_DIR}"
22+
23+
# Helper to check for curl or wget
24+
download() {
25+
local url="$1" out="$2"
26+
if command -v curl >/dev/null 2>&1; then
27+
curl -fsSL -o "${out}" "${url}"
28+
elif command -v wget >/dev/null 2>&1; then
29+
wget -qO "${out}" "${url}"
30+
else
31+
echo "Error: need curl or wget to download files." >&2
32+
exit 2
33+
fi
34+
}
35+
36+
# Download Kubernetes server tarball and extract kube-apiserver
37+
K8S_TAR="/tmp/k8s-server-${K8S_VERSION}.tar.gz"
38+
echo "Downloading Kubernetes server ${K8S_VERSION} -> ${K8S_TAR} ..."
39+
download "https://dl.k8s.io/${K8S_VERSION}/kubernetes-server-linux-amd64.tar.gz" "${K8S_TAR}"
40+
41+
echo "Extracting kube-apiserver from ${K8S_TAR}..."
42+
rm -rf /tmp/kubernetes || true
43+
tar -C /tmp -xzf "${K8S_TAR}"
44+
45+
KUBE_APISERVER_SRC="/tmp/kubernetes/server/bin/kube-apiserver"
46+
if [ ! -f "${KUBE_APISERVER_SRC}" ]; then
47+
echo "Error: kube-apiserver not found at ${KUBE_APISERVER_SRC}" >&2
48+
echo "Listing /tmp/kubernetes contents:"
49+
ls -la /tmp/kubernetes || true
50+
exit 3
51+
fi
52+
53+
cp -v "${KUBE_APISERVER_SRC}" "${TARGET_DIR}/"
54+
55+
# Download etcd tarball and extract etcd binary
56+
ETCD_TAR="/tmp/etcd-${ETCD_VERSION}-linux-amd64.tar.gz"
57+
echo "Downloading etcd ${ETCD_VERSION} -> ${ETCD_TAR} ..."
58+
download "https://github.com/etcd-io/etcd/releases/download/${ETCD_VERSION}/etcd-${ETCD_VERSION}-linux-amd64.tar.gz" "${ETCD_TAR}"
59+
60+
echo "Extracting etcd from ${ETCD_TAR}..."
61+
rm -rf "/tmp/etcd-${ETCD_VERSION}-linux-amd64" || true
62+
tar -C /tmp -xzf "${ETCD_TAR}"
63+
64+
ETCD_SRC="/tmp/etcd-${ETCD_VERSION}-linux-amd64/etcd"
65+
if [ -f "${ETCD_SRC}" ]; then
66+
cp -v "${ETCD_SRC}" "${TARGET_DIR}/"
67+
else
68+
# fallback: search extracted tmp for etcd binary
69+
found_etcd=$(find /tmp -type f -name etcd -print -quit || true)
70+
if [ -n "${found_etcd}" ]; then
71+
cp -v "${found_etcd}" "${TARGET_DIR}/"
72+
else
73+
echo "ERROR: etcd binary not found inside extracted etcd tarball." >&2
74+
exit 4
75+
fi
76+
fi
77+
78+
# Make everything executable
79+
chmod +x "${TARGET_DIR}"/*
80+
81+
# Export KUBEBUILDER_ASSETS for current shell user (print instruction)
82+
ABS_TARGET_DIR="$(cd "$(dirname "${TARGET_DIR}")" && pwd)/$(basename "${TARGET_DIR}")"
83+
echo
84+
echo "Binaries installed in: ${ABS_TARGET_DIR}"
85+
echo
86+
echo "To use these binaries for running tests in this shell, run:"
87+
echo " export KUBEBUILDER_ASSETS=\"${ABS_TARGET_DIR}\""
88+
echo
89+
echo "You can also add that export line to your shell profile (e.g. ~/.bashrc) if you want it persistent."
90+
echo
91+
92+
# Add bin/ to .gitignore if not present (local convenience)
93+
if ! rg -q "^bin/" .gitignore 2>/dev/null || [ ! -f .gitignore ]; then
94+
echo "Adding 'bin/' to .gitignore (local convenience)"
95+
echo "bin/" >> .gitignore
96+
echo "Note: bin/ contains large local binaries; do not commit them."
97+
fi
98+
99+
echo "Done. Now run the tests that failed, e.g.:"
100+
echo " go test ./internal/controller/scheduledsparkapplication -v"
101+
echo "or to run all controller tests:"
102+
echo " go test ./internal/controller/... -v"

0 commit comments

Comments
 (0)