Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions api/v1alpha1/memgraphcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,18 @@ type SnapshotSpec struct {
// +optional
RetentionCount int32 `json:"retentionCount,omitempty"`

// ServiceAccountName is the Kubernetes service account to use for the snapshot CronJob pod.
// Required for both S3 (IRSA on EKS) and GCS (Workload Identity on GKE).
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`

// S3 defines optional S3 backup configuration
// +optional
S3 *S3BackupSpec `json:"s3,omitempty"`

// GCS defines optional GCS backup configuration
// +optional
GCS *GCSBackupSpec `json:"gcs,omitempty"`
}

// S3BackupSpec defines S3 backup configuration
Expand Down Expand Up @@ -219,6 +228,22 @@ type S3BackupSpec struct {
RetentionDays int32 `json:"retentionDays,omitempty"`
}

// GCSBackupSpec defines GCS backup configuration
type GCSBackupSpec struct {
// Enabled enables GCS backups
// +optional
Enabled bool `json:"enabled,omitempty"`

// Bucket is the GCS bucket name
// +optional
Bucket string `json:"bucket,omitempty"`

// Prefix is the path prefix within the bucket
// +kubebuilder:default="memgraph/snapshots"
// +optional
Prefix string `json:"prefix,omitempty"`
}

// MemgraphClusterStatus defines the observed state of MemgraphCluster
type MemgraphClusterStatus struct {
// Phase is the current phase of the cluster
Expand Down Expand Up @@ -249,6 +274,10 @@ type MemgraphClusterStatus struct {
// +optional
LastS3BackupTime *metav1.Time `json:"lastS3BackupTime,omitempty"`

// LastGCSBackupTime is the time of the last successful GCS backup
// +optional
LastGCSBackupTime *metav1.Time `json:"lastGCSBackupTime,omitempty"`

// Validation contains real-time validation test results
// +optional
Validation *ValidationStatus `json:"validation,omitempty"`
Expand Down
24 changes: 24 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions config/crd/bases/memgraph.base14.io_memgraphclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,20 @@ spec:
default: true
description: Enabled enables periodic snapshots
type: boolean
gcs:
description: GCS defines optional GCS backup configuration
properties:
bucket:
description: Bucket is the GCS bucket name
type: string
enabled:
description: Enabled enables GCS backups
type: boolean
prefix:
default: memgraph/snapshots
description: Prefix is the path prefix within the bucket
type: string
type: object
retentionCount:
default: 5
description: RetentionCount is the number of snapshots to retain
Expand Down Expand Up @@ -1183,6 +1197,11 @@ spec:
default: '*/15 * * * *'
description: Schedule is a cron expression for snapshot frequency
type: string
serviceAccountName:
description: |-
ServiceAccountName is the Kubernetes service account to use for the snapshot CronJob pod.
Required for both S3 (IRSA on EKS) and GCS (Workload Identity on GKE).
type: string
type: object
storage:
description: Storage defines the persistent storage configuration
Expand Down Expand Up @@ -1302,6 +1321,11 @@ spec:
- type
type: object
type: array
lastGCSBackupTime:
description: LastGCSBackupTime is the time of the last successful
GCS backup
format: date-time
type: string
lastS3BackupTime:
description: LastS3BackupTime is the time of the last successful S3
backup
Expand Down
150 changes: 97 additions & 53 deletions internal/controller/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
)

const (
// Default AWS CLI image for S3 uploads
defaultAWSCLIImage = "amazon/aws-cli:latest"
// Default rclone image for S3/GCS uploads
defaultRcloneImage = "rclone/rclone:1.73.0"

// Shared volume name for snapshot data between containers
snapshotDataVolume = "snapshot-data"
Expand Down Expand Up @@ -104,7 +104,8 @@ func buildSnapshotCronJob(cluster *memgraphv1alpha1.MemgraphCluster) *batchv1.Cr
Labels: labelsForCluster(cluster),
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
RestartPolicy: corev1.RestartPolicyOnFailure,
ServiceAccountName: cluster.Spec.Snapshot.ServiceAccountName,
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: &runAsUser,
RunAsGroup: &runAsGroup,
Expand Down Expand Up @@ -151,8 +152,8 @@ echo "Snapshot created successfully at $(date)"
},
}

// Init container 2: Copy snapshot files to shared volume (if S3 enabled)
if cluster.Spec.Snapshot.S3 != nil && cluster.Spec.Snapshot.S3.Enabled {
// Init container 2: Copy snapshot files to shared volume (if S3 or GCS enabled)
if isRemoteBackupEnabled(cluster) {
// Use bitnami/kubectl for copying files from the main pod
copyCmd := fmt.Sprintf(`
set -e
Expand Down Expand Up @@ -202,12 +203,14 @@ ls -la /snapshot-data/snapshots/

// buildSnapshotMainContainers builds the main containers for the snapshot job
func buildSnapshotMainContainers(cluster *memgraphv1alpha1.MemgraphCluster) []corev1.Container {
// If S3 is enabled, main container uploads to S3
if cluster.Spec.Snapshot.S3 != nil && cluster.Spec.Snapshot.S3.Enabled {
return []corev1.Container{buildS3UploadContainer(cluster)}
return []corev1.Container{buildRcloneUploadContainer(cluster, "s3")}
}

if cluster.Spec.Snapshot.GCS != nil && cluster.Spec.Snapshot.GCS.Enabled {
return []corev1.Container{buildRcloneUploadContainer(cluster, "gcs")}
}

// Otherwise, just a completion container
return []corev1.Container{
{
Name: "complete",
Expand All @@ -224,50 +227,42 @@ func buildSnapshotMainContainers(cluster *memgraphv1alpha1.MemgraphCluster) []co
}
}

// buildS3UploadContainer builds the S3 upload container
func buildS3UploadContainer(cluster *memgraphv1alpha1.MemgraphCluster) corev1.Container {
s3 := cluster.Spec.Snapshot.S3
prefix := s3.Prefix
if prefix == "" {
prefix = "memgraph/snapshots"
// isRemoteBackupEnabled returns true if either S3 or GCS backup is enabled
func isRemoteBackupEnabled(cluster *memgraphv1alpha1.MemgraphCluster) bool {
if cluster.Spec.Snapshot.S3 != nil && cluster.Spec.Snapshot.S3.Enabled {
return true
}
if cluster.Spec.Snapshot.GCS != nil && cluster.Spec.Snapshot.GCS.Enabled {
return true
}
return false
}

// Build S3 upload command
s3Cmd := fmt.Sprintf(`
set -e

TIMESTAMP=$(cat /snapshot-data/timestamp)
BACKUP_PATH="s3://%s/%s/%s/${TIMESTAMP}"

echo "Uploading snapshot to ${BACKUP_PATH}..."

# Configure endpoint if specified
%s

# Upload to S3
if [ -d "/snapshot-data/snapshots" ] && [ "$(ls -A /snapshot-data/snapshots 2>/dev/null)" ]; then
aws s3 cp /snapshot-data/snapshots/ ${BACKUP_PATH}/snapshots/ --recursive
echo "Snapshot uploaded successfully to ${BACKUP_PATH}"
else
echo "No snapshot files found to upload"
exit 1
fi
// buildRcloneUploadContainer builds the rclone upload container for S3 or GCS
func buildRcloneUploadContainer(cluster *memgraphv1alpha1.MemgraphCluster, backend string) corev1.Container {
var rcloneCmd string
var envVars []corev1.EnvVar

echo "S3 backup completed at $(date)"
`, s3.Bucket, prefix, cluster.Name, buildS3EndpointConfig(s3))
switch backend {
case "s3":
rcloneCmd = buildRcloneS3Command(cluster)
envVars = buildS3Env(cluster)
case "gcs":
rcloneCmd = buildRcloneGCSCommand(cluster)
}

return corev1.Container{
Name: "s3-upload",
Image: defaultAWSCLIImage,
Name: "rclone-upload",
Image: defaultRcloneImage,
Command: []string{"/bin/sh", "-c"},
Args: []string{s3Cmd},
Args: []string{rcloneCmd},
SecurityContext: &corev1.SecurityContext{
AllowPrivilegeEscalation: ptr(false),
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
},
Env: buildS3Env(cluster),
Env: envVars,
VolumeMounts: []corev1.VolumeMount{
{
Name: snapshotDataVolume,
Expand All @@ -277,12 +272,69 @@ echo "S3 backup completed at $(date)"
}
}

// buildRcloneS3Command builds the rclone command for S3 uploads
func buildRcloneS3Command(cluster *memgraphv1alpha1.MemgraphCluster) string {
s3 := cluster.Spec.Snapshot.S3
prefix := s3.Prefix
if prefix == "" {
prefix = "memgraph/snapshots"
}

endpointFlag := ""
if s3.Endpoint != "" {
endpointFlag = fmt.Sprintf("--s3-endpoint %s", s3.Endpoint)
}

regionFlag := ""
if s3.Region != "" {
regionFlag = fmt.Sprintf("--s3-region %s", s3.Region)
}

return fmt.Sprintf(`
set -e
TIMESTAMP=$(cat /snapshot-data/timestamp)
DEST=":s3:%s/%s/%s/${TIMESTAMP}/snapshots"
echo "Uploading snapshot to ${DEST}..."
if [ -d "/snapshot-data/snapshots" ] && [ "$(ls -A /snapshot-data/snapshots 2>/dev/null)" ]; then
rclone copy /snapshot-data/snapshots/ "${DEST}" --s3-provider AWS --s3-env-auth %s %s -v
echo "Snapshot uploaded successfully"
else
echo "No snapshot files found to upload"
exit 1
fi
echo "S3 backup completed at $(date)"
`, s3.Bucket, prefix, cluster.Name, regionFlag, endpointFlag)
}

// buildRcloneGCSCommand builds the rclone command for GCS uploads
func buildRcloneGCSCommand(cluster *memgraphv1alpha1.MemgraphCluster) string {
gcs := cluster.Spec.Snapshot.GCS
prefix := gcs.Prefix
if prefix == "" {
prefix = "memgraph/snapshots"
}

return fmt.Sprintf(`
set -e
TIMESTAMP=$(cat /snapshot-data/timestamp)
DEST=":gcs:%s/%s/%s/${TIMESTAMP}/snapshots"
echo "Uploading snapshot to ${DEST}..."
if [ -d "/snapshot-data/snapshots" ] && [ "$(ls -A /snapshot-data/snapshots 2>/dev/null)" ]; then
rclone copy /snapshot-data/snapshots/ "${DEST}" --gcs-env-auth -v
echo "Snapshot uploaded successfully"
else
echo "No snapshot files found to upload"
exit 1
fi
echo "GCS backup completed at $(date)"
`, gcs.Bucket, prefix, cluster.Name)
}

// buildSnapshotVolumes builds the volumes for the snapshot job
func buildSnapshotVolumes(cluster *memgraphv1alpha1.MemgraphCluster) []corev1.Volume {
var volumes []corev1.Volume

// Add shared volume if S3 is enabled
if cluster.Spec.Snapshot.S3 != nil && cluster.Spec.Snapshot.S3.Enabled {
if isRemoteBackupEnabled(cluster) {
volumes = append(volumes, corev1.Volume{
Name: snapshotDataVolume,
VolumeSource: corev1.VolumeSource{
Expand All @@ -294,14 +346,6 @@ func buildSnapshotVolumes(cluster *memgraphv1alpha1.MemgraphCluster) []corev1.Vo
return volumes
}

// buildS3EndpointConfig builds AWS CLI endpoint configuration
func buildS3EndpointConfig(s3 *memgraphv1alpha1.S3BackupSpec) string {
if s3.Endpoint == "" {
return ""
}
return fmt.Sprintf(`export AWS_ENDPOINT_URL="%s"`, s3.Endpoint)
}

// buildS3Env builds environment variables for the S3 upload container
func buildS3Env(cluster *memgraphv1alpha1.MemgraphCluster) []corev1.EnvVar {
var envVars []corev1.EnvVar
Expand All @@ -312,7 +356,6 @@ func buildS3Env(cluster *memgraphv1alpha1.MemgraphCluster) []corev1.EnvVar {

s3 := cluster.Spec.Snapshot.S3

// Add S3 credentials if configured
if s3.SecretRef != nil {
envVars = append(envVars,
corev1.EnvVar{
Expand Down Expand Up @@ -375,9 +418,10 @@ func (r *MemgraphClusterReconciler) reconcileSnapshotCronJob(ctx context.Context
return err
}

// Update if schedule or S3 config changed
// Update if schedule, backup config, or service account changed
needsUpdate := existing.Spec.Schedule != desired.Spec.Schedule ||
len(existing.Spec.JobTemplate.Spec.Template.Spec.Containers) != len(desired.Spec.JobTemplate.Spec.Template.Spec.Containers)
len(existing.Spec.JobTemplate.Spec.Template.Spec.Containers) != len(desired.Spec.JobTemplate.Spec.Template.Spec.Containers) ||
existing.Spec.JobTemplate.Spec.Template.Spec.ServiceAccountName != desired.Spec.JobTemplate.Spec.Template.Spec.ServiceAccountName

if needsUpdate {
log.Info("updating snapshot CronJob",
Expand Down
Loading