diff --git a/api/v4/objectstorage_types.go b/api/v4/objectstorage_types.go index 9e95392ce..08205743f 100644 --- a/api/v4/objectstorage_types.go +++ b/api/v4/objectstorage_types.go @@ -55,7 +55,7 @@ type S3Spec struct { // ObjectStorageStatus defines the observed state of ObjectStorage. type ObjectStorageStatus struct { - // Phase of the large message store + // Phase of the object storage Phase Phase `json:"phase"` // Resource revision tracker diff --git a/api/v4/queue_types.go b/api/v4/queue_types.go index 9828f7301..4c3ff9861 100644 --- a/api/v4/queue_types.go +++ b/api/v4/queue_types.go @@ -61,6 +61,10 @@ type SQSSpec struct { // +kubebuilder:validation:Pattern=`^https?://[^\s/$.?#].[^\s]*$` // Amazon SQS Service endpoint Endpoint string `json:"endpoint"` + + // +optional + // List of remote storage volumes + VolList []VolumeSpec `json:"volumes,omitempty"` } // QueueStatus defines the observed state of Queue diff --git a/api/v4/zz_generated.deepcopy.go b/api/v4/zz_generated.deepcopy.go index dd9b2f347..1f2215a9a 100644 --- a/api/v4/zz_generated.deepcopy.go +++ b/api/v4/zz_generated.deepcopy.go @@ -548,7 +548,7 @@ func (in *IndexerClusterStatus) DeepCopyInto(out *IndexerClusterStatus) { if in.Queue != nil { in, out := &in.Queue, &out.Queue *out = new(QueueSpec) - **out = **in + (*in).DeepCopyInto(*out) } if in.ObjectStorage != nil { in, out := &in.ObjectStorage, &out.ObjectStorage @@ -651,7 +651,7 @@ func (in *IngestorClusterStatus) DeepCopyInto(out *IngestorClusterStatus) { if in.Queue != nil { in, out := &in.Queue, &out.Queue *out = new(QueueSpec) - **out = **in + (*in).DeepCopyInto(*out) } if in.ObjectStorage != nil { in, out := &in.ObjectStorage, &out.ObjectStorage @@ -1002,7 +1002,7 @@ func (in *Queue) DeepCopyInto(out *Queue) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -1051,7 +1051,7 @@ func (in *QueueList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *QueueSpec) DeepCopyInto(out *QueueSpec) { *out = *in - out.SQS = in.SQS + in.SQS.DeepCopyInto(&out.SQS) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueSpec. @@ -1104,6 +1104,11 @@ func (in *S3Spec) DeepCopy() *S3Spec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SQSSpec) DeepCopyInto(out *SQSSpec) { *out = *in + if in.VolList != nil { + in, out := &in.VolList, &out.VolList + *out = make([]VolumeSpec, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQSSpec. diff --git a/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml b/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml index 59faab055..af672ce67 100644 --- a/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml +++ b/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml @@ -8480,6 +8480,40 @@ spec: description: Name of the queue minLength: 1 type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where + apps reside. Used for aws, if provided. Not used for + minio and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: + s3, blob, gcs. s3 works with aws or minio providers, + whereas blob works with azure provider, gcs works + for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml b/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml index 7432e96b4..6ce4c8488 100644 --- a/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml +++ b/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml @@ -4661,6 +4661,40 @@ spec: description: Name of the queue minLength: 1 type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where + apps reside. Used for aws, if provided. Not used for + minio and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: + s3, blob, gcs. s3 works with aws or minio providers, + whereas blob works with azure provider, gcs works + for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/config/crd/bases/enterprise.splunk.com_objectstorages.yaml b/config/crd/bases/enterprise.splunk.com_objectstorages.yaml index 2fac45707..c84474921 100644 --- a/config/crd/bases/enterprise.splunk.com_objectstorages.yaml +++ b/config/crd/bases/enterprise.splunk.com_objectstorages.yaml @@ -87,7 +87,7 @@ spec: description: Auxillary message describing CR status type: string phase: - description: Phase of the large message store + description: Phase of the object storage enum: - Pending - Ready diff --git a/config/crd/bases/enterprise.splunk.com_queues.yaml b/config/crd/bases/enterprise.splunk.com_queues.yaml index 2ba8d03f5..f4ed36a45 100644 --- a/config/crd/bases/enterprise.splunk.com_queues.yaml +++ b/config/crd/bases/enterprise.splunk.com_queues.yaml @@ -78,6 +78,39 @@ spec: description: Name of the queue minLength: 1 type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where apps + reside. Used for aws, if provided. Not used for minio + and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: s3, + blob, gcs. s3 works with aws or minio providers, whereas + blob works with azure provider, gcs works for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/docs/CustomResources.md b/docs/CustomResources.md index 157a9b123..bd85c05ca 100644 --- a/docs/CustomResources.md +++ b/docs/CustomResources.md @@ -404,21 +404,21 @@ spec: endpoint: https://s3.us-west-2.amazonaws.com ``` -ObjectStorage inputs can be found in the table below. As of now, only S3 provider of large message store is supported. +ObjectStorage inputs can be found in the table below. As of now, only S3 provider of object storage is supported. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | -| provider | string | [Required] Provider of large message store (Allowed values: s3) | -| s3 | S3 | [Required if provider=s3] S3 large message store inputs | +| provider | string | [Required] Provider of object storage (Allowed values: s3) | +| s3 | S3 | [Required if provider=s3] S3 object storage inputs | -S3 large message store inputs can be found in the table below. +S3 object storage inputs can be found in the table below. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | | path | string | [Required] Remote storage location for messages that are larger than the underlying maximum message size | | endpoint | string | [Optional, if not provided formed based on region] S3-compatible service endpoint -Change of any of the large message queue inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +Change of any of the object storage inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. ## MonitoringConsole Resource Spec Parameters diff --git a/docs/IndexIngestionSeparation.md b/docs/IndexIngestionSeparation.md index bd5d97579..c7b05dcae 100644 --- a/docs/IndexIngestionSeparation.md +++ b/docs/IndexIngestionSeparation.md @@ -1,3 +1,9 @@ +--- +title: Index and Ingestion Separation +parent: Deploy & Configure +nav_order: 6 +--- + # Background Separation between ingestion and indexing services within Splunk Operator for Kubernetes enables the operator to independently manage the ingestion service while maintaining seamless integration with the indexing service. @@ -10,7 +16,7 @@ This separation enables: # Important Note > [!WARNING] -> **As of now, only brand new deployments are supported for Index and Ingestion Separation. No migration path is implemented, described or tested for existing deployments to move from a standard model to Index & Ingestion separation model.** +> **For customers deploying SmartBus on CMP, the Splunk Operator for Kubernetes (SOK) manages the configuration and lifecycle of the ingestor tier. The following SOK guide provides implementation details for setting up ingestion separation and integrating with existing indexers. This reference is primarily intended for CMP users leveraging SOK-managed ingestors.** # Document Variables @@ -38,7 +44,7 @@ SQS message queue inputs can be found in the table below. | endpoint | string | [Optional, if not provided formed based on region] AWS SQS Service endpoint | dlq | string | [Required] Name of the dead letter queue | -Change of any of the queue inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +**First provisioning or update of any of the queue inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.** ## Example ``` @@ -61,21 +67,21 @@ ObjectStorage is introduced to store large message (messages that exceed the siz ## Spec -ObjectStorage inputs can be found in the table below. As of now, only S3 provider of large message store is supported. +ObjectStorage inputs can be found in the table below. As of now, only S3 provider of object storage is supported. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | -| provider | string | [Required] Provider of large message store (Allowed values: s3) | -| s3 | S3 | [Required if provider=s3] S3 large message store inputs | +| provider | string | [Required] Provider of object storage (Allowed values: s3) | +| s3 | S3 | [Required if provider=s3] S3 object storage inputs | -S3 large message store inputs can be found in the table below. +S3 object storage inputs can be found in the table below. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | | path | string | [Required] Remote storage location for messages that are larger than the underlying maximum message size | | endpoint | string | [Optional, if not provided formed based on region] S3-compatible service endpoint -Change of any of the large message queue inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +Change of any of the object storage inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. ## Example ``` @@ -102,13 +108,13 @@ In addition to common spec inputs, the IngestorCluster resource provides the fol | ---------- | ------- | ------------------------------------------------- | | replicas | integer | The number of replicas (defaults to 3) | | queueRef | corev1.ObjectReference | Message queue reference | -| objectStorageRef | corev1.ObjectReference | Large message store reference | +| objectStorageRef | corev1.ObjectReference | Object storage reference | ## Example The example presented below configures IngestorCluster named ingestor with Splunk ${SPLUNK_IMAGE_VERSION} image that resides in a default namespace and is scaled to 3 replicas that serve the ingestion traffic. This IngestorCluster custom resource is set up with the service account named ingestor-sa allowing it to perform SQS and S3 operations. Queue and ObjectStorage references allow the user to specify queue and bucket settings for the ingestion process. -In this case, the setup uses the SQS and S3 based configuration where the messages are stored in sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The large message store is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf and outputs.conf files are configured accordingly. +In this case, the setup uses the SQS and S3 based configuration where the messages are stored in sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The object storage is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf and outputs.conf files are configured accordingly. ``` apiVersion: enterprise.splunk.com/v4 @@ -139,13 +145,13 @@ In addition to common spec inputs, the IndexerCluster resource provides the foll | ---------- | ------- | ------------------------------------------------- | | replicas | integer | The number of replicas (defaults to 3) | | queueRef | corev1.ObjectReference | Message queue reference | -| objectStorageRef | corev1.ObjectReference | Large message store reference | +| objectStorageRef | corev1.ObjectReference | Object storage reference | ## Example The example presented below configures IndexerCluster named indexer with Splunk ${SPLUNK_IMAGE_VERSION} image that resides in a default namespace and is scaled to 3 replicas that serve the indexing traffic. This IndexerCluster custom resource is set up with the service account named ingestor-sa allowing it to perform SQS and S3 operations. Queue and ObjectStorage references allow the user to specify queue and bucket settings for the indexing process. -In this case, the setup uses the SQS and S3 based configuration where the messages are stored in and retrieved from sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The large message store is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf, inputs.conf and outputs.conf files are configured accordingly. +In this case, the setup uses the SQS and S3 based configuration where the messages are stored in and retrieved from sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The object storage is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf, inputs.conf and outputs.conf files are configured accordingly. ``` apiVersion: enterprise.splunk.com/v4 @@ -425,6 +431,14 @@ In the following example, the dashboard presents ingestion and indexing data in - [kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack) +# App Installation for Ingestor Cluster Instances + +Application installation is supported for Ingestor Cluster instances. However, as of now, applications are installed using local scope and if any application requires Splunk restart, there is no automated way to detect it and trigger automatically via Splunk Operator. + +Therefore, to be able to enforce Splunk restart for each of the Ingestor Cluster pods, it is recommended to add/update IngestorCluster CR annotations/labels and apply the new configuration which will trigger the rolling restart of Splunk pods for Ingestor Cluster. + +We are under the investigation on how to make it fully automated. What is more, ideally, update of annotations and labels should not trigger pod restart at all and we are investigating on how to fix this behaviour eventually. + # Example 1. Install CRDs and Splunk Operator for Kubernetes. @@ -703,7 +717,7 @@ Spec: Name: queue Namespace: default Image: splunk/splunk:${SPLUNK_IMAGE_VERSION} - Large Message Store Ref: + Object Storage Ref: Name: os Namespace: default Replicas: 3 @@ -727,7 +741,7 @@ Status: Endpoint: https://sqs.us-west-2.amazonaws.com Name: sqs-test Provider: sqs - Large Message Store: + Object Storage: S3: Endpoint: https://s3.us-west-2.amazonaws.com Path: s3://ingestion/smartbus-test diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml index 833f162aa..e5541e017 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml @@ -169,6 +169,7 @@ items: {{- if .namespace }} namespace: {{ .namespace }} {{- end }} + {{- end }} {{- with $.Values.indexerCluster.objectStorageRef }} objectStorageRef: name: {{ .name }} diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml index 7cd5bdca0..033aed904 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml @@ -1,4 +1,4 @@ -{{- if .Values.objectStorage.enabled }} +{{- if .Values.objectStorage }} {{- if .Values.objectStorage.enabled }} apiVersion: enterprise.splunk.com/v4 kind: ObjectStorage diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml index b586e45da..06a3c5dbd 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml @@ -26,8 +26,12 @@ spec: {{- if .name }} name: {{ .name | quote }} {{- end }} - {{- if .region }} - region: {{ .region | quote }} + {{- if .authRegion }} + authRegion: {{ .authRegion | quote }} + {{- end }} + {{- if .volumes }} + volumes: + {{ toYaml . | indent 4 }} {{- end }} {{- end }} {{- end }} diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml index 41f4ea2aa..e3dd6765c 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml @@ -1,136 +1,5 @@ --- -# assert for queue custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: Queue -metadata: - name: queue -spec: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test -status: - phase: Ready - ---- -# assert for large message store custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: ObjectStorage -metadata: - name: os -spec: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test -status: - phase: Ready - ---- -# assert for cluster manager custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: ClusterManager -metadata: - name: cm -status: - phase: Ready - ---- -# check if stateful sets are created -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-cm-cluster-manager -status: - replicas: 1 - ---- -# check if secret object are created -apiVersion: v1 -kind: Secret -metadata: - name: splunk-cm-cluster-manager-secret-v1 - ---- -# assert for indexer cluster custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: IndexerCluster -metadata: - name: indexer -spec: - replicas: 3 - queueRef: - name: queue -status: - phase: Ready - queue: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - objectStorage: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test - ---- -# check for stateful set and replicas as configured -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-indexer-indexer -status: - replicas: 3 - ---- -# check if secret object are created -apiVersion: v1 -kind: Secret -metadata: - name: splunk-indexer-indexer-secret-v1 - ---- -# assert for indexer cluster custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: IngestorCluster -metadata: - name: ingestor -spec: - replicas: 3 - queueRef: - name: queue -status: - phase: Ready - queue: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - objectStorage: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test - ---- -# check for stateful set and replicas as configured -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-ingestor-ingestor -status: - replicas: 3 - ---- -# check if secret object are created apiVersion: v1 kind: Secret metadata: - name: splunk-ingestor-ingestor-secret-v1 \ No newline at end of file + name: s3-secret diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-create-s3-secret.yaml b/kuttl/tests/helm/index-and-ingest-separation/01-create-s3-secret.yaml new file mode 100644 index 000000000..8f1b1b95f --- /dev/null +++ b/kuttl/tests/helm/index-and-ingest-separation/01-create-s3-secret.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl create secret generic s3-secret --from-literal=s3_access_key=$AWS_ACCESS_KEY_ID --from-literal=s3_secret_key=$AWS_SECRET_ACCESS_KEY --namespace $NAMESPACE + background: false + skipLogOutput: true \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml index 00ff26a56..ca56ca5ef 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml @@ -1,33 +1,140 @@ --- -# assert for ingestor cluster custom resource to be ready +# assert for queue custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: Queue +metadata: + name: queue +spec: + provider: sqs + sqs: + name: index-ingest-separation-test-q + authRegion: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: index-ingest-separation-test-dlq +status: + phase: Ready + +--- +# assert for object storage custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: ObjectStorage +metadata: + name: os +spec: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://index-ingest-separation-test-bucket/smartbus-test +status: + phase: Ready + +--- +# assert for cluster manager custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: ClusterManager +metadata: + name: cm +status: + phase: Ready + +--- +# check if stateful sets are created +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-cm-cluster-manager +status: + replicas: 1 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-cm-cluster-manager-secret-v1 + +--- +# assert for indexer cluster custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: IndexerCluster +metadata: + name: indexer +spec: + replicas: 3 + queueRef: + name: queue + objectStorageRef: + name: os +status: + phase: Ready + queue: + provider: sqs + sqs: + name: index-ingest-separation-test-q + authRegion: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: index-ingest-separation-test-dlq + objectStorage: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://index-ingest-separation-test-bucket/smartbus-test + +--- +# check for stateful set and replicas as configured +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-indexer-indexer +status: + replicas: 3 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-indexer-indexer-secret-v1 + +--- +# assert for indexer cluster custom resource to be ready apiVersion: enterprise.splunk.com/v4 kind: IngestorCluster metadata: name: ingestor spec: - replicas: 4 + replicas: 3 queueRef: name: queue + objectStorageRef: + name: os status: phase: Ready queue: provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test + dlq: index-ingest-separation-test-dlq objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test --- -# check for stateful sets and replicas updated +# check for stateful set and replicas as configured apiVersion: apps/v1 kind: StatefulSet metadata: name: splunk-ingestor-ingestor status: - replicas: 4 + replicas: 3 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-ingestor-ingestor-secret-v1 \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-install-setup.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-install-setup.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/01-install-setup.yaml rename to kuttl/tests/helm/index-and-ingest-separation/02-install-setup.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml new file mode 100644 index 000000000..765a22192 --- /dev/null +++ b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml @@ -0,0 +1,35 @@ +--- +# assert for ingestor cluster custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: IngestorCluster +metadata: + name: ingestor +spec: + replicas: 4 + queueRef: + name: queue + objectStorageRef: + name: os +status: + phase: Ready + queue: + provider: sqs + sqs: + name: index-ingest-separation-test-q + authRegion: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: index-ingest-separation-test-dlq + objectStorage: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://index-ingest-separation-test-bucket/smartbus-test + +--- +# check for stateful sets and replicas updated +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-ingestor-ingestor +status: + replicas: 4 diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-scaleup-ingestor.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-scaleup-ingestor.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/02-scaleup-ingestor.yaml rename to kuttl/tests/helm/index-and-ingest-separation/03-scaleup-ingestor.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-uninstall-setup.yaml b/kuttl/tests/helm/index-and-ingest-separation/04-uninstall-setup.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/03-uninstall-setup.yaml rename to kuttl/tests/helm/index-and-ingest-separation/04-uninstall-setup.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml index d05cb5bcf..46ef7fce3 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml @@ -10,10 +10,13 @@ queue: name: queue provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test + dlq: index-ingest-separation-test-dlq + volumes: + - name: helm-bus-secret-ref-test + secretRef: s3-secret objectStorage: enabled: true @@ -21,7 +24,7 @@ objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test ingestorCluster: enabled: true diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 60b4d5a9a..3808539cc 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -77,7 +77,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // updates status after function completes cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Queue = &enterpriseApi.QueueSpec{} + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) @@ -118,7 +119,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterManagerPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -251,7 +252,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller if cr.Spec.QueueRef.Namespace != "" { ns = cr.Spec.QueueRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.QueueRef.Name, Namespace: ns, }, &queue) @@ -268,14 +269,14 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() if cr.Spec.ObjectStorageRef.Namespace != "" { ns = cr.Spec.ObjectStorageRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.ObjectStorageRef.Name, Namespace: ns, }, &os) @@ -284,7 +285,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } - // Can not override original large message store spec due to comparison in the later code + // Can not override original object storage spec due to comparison in the later code osCopy := os if osCopy.Spec.Provider == "s3" { if osCopy.Spec.S3.Endpoint == "" && queueCopy.Spec.SQS.AuthRegion != "" { @@ -294,9 +295,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // If queue is updated if cr.Spec.QueueRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Queue, queue.Spec) { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - + if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePullQueueChange(ctx, cr, queueCopy, osCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) @@ -304,7 +304,17 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller return result, err } + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } + cr.Status.Queue = &queue.Spec + cr.Status.ObjectStorage = &os.Spec } } @@ -397,7 +407,8 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.Phase = enterpriseApi.PhaseError cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Queue = &enterpriseApi.QueueSpec{} + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) @@ -440,7 +451,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterMasterPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -591,7 +602,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() @@ -617,9 +628,8 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // If queue is updated if cr.Spec.QueueRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Queue, queue.Spec) { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - + if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePullQueueChange(ctx, cr, queueCopy, osCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) @@ -627,7 +637,17 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, return result, err } + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } + cr.Status.Queue = &queue.Spec + cr.Status.ObjectStorage = &os.Spec } } @@ -710,12 +730,13 @@ type indexerClusterPodManager struct { } // newIndexerClusterPodManager function to create pod manager this is added to write unit test case -var newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) indexerClusterPodManager { +var newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, secrets: secret, newSplunkClient: newSplunkClient, + c: c, } } @@ -1315,19 +1336,42 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, } splunkClient := newSplunkClientForQueuePipeline(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) + newCrStatusQueue := newCR.Status.Queue + if newCrStatusQueue == nil { + newCrStatusQueue = &enterpriseApi.QueueSpec{} + } + newCrStatusObjectStorage := newCR.Status.ObjectStorage + if newCrStatusObjectStorage == nil { + newCrStatusObjectStorage = &enterpriseApi.ObjectStorageSpec{} + } + afterDelete := false - if (queue.Spec.SQS.Name != "" && newCR.Status.Queue.SQS.Name != "" && queue.Spec.SQS.Name != newCR.Status.Queue.SQS.Name) || - (queue.Spec.Provider != "" && newCR.Status.Queue.Provider != "" && queue.Spec.Provider != newCR.Status.Queue.Provider) { - if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if (queue.Spec.SQS.Name != "" && newCrStatusQueue.SQS.Name != "" && queue.Spec.SQS.Name != newCrStatusQueue.SQS.Name) || + (queue.Spec.Provider != "" && newCrStatusQueue.Provider != "" && queue.Spec.Provider != newCrStatusQueue.Provider) { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } - if err := splunkClient.DeleteConfFileProperty(scopedLog, "inputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "inputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } afterDelete = true } - queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR, afterDelete) + // Secret reference + s3AccessKey, s3SecretKey := "", "" + if queue.Spec.Provider == "sqs" && newCR.Spec.ServiceAccount == "" { + for _, vol := range queue.Spec.SQS.VolList { + if vol.SecretRef != "" { + s3AccessKey, s3SecretKey, err = GetQueueRemoteVolumeSecrets(ctx, vol, k8s, newCR) + if err != nil { + scopedLog.Error(err, "Failed to get queue remote volume secrets") + return err + } + } + } + } + + queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range queueChangedFieldsOutputs { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -1353,22 +1397,10 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, } // getChangedQueueFieldsForIndexer returns a list of changed queue and pipeline fields for indexer pods -func getChangedQueueFieldsForIndexer(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueIndexerStatus *enterpriseApi.IndexerCluster, afterDelete bool) (queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields [][]string) { - // Compare queue fields - oldQueue := queueIndexerStatus.Status.Queue - if oldQueue == nil { - oldQueue = &enterpriseApi.QueueSpec{} - } - newQueue := queue.Spec - - oldOS := queueIndexerStatus.Status.ObjectStorage - if oldOS == nil { - oldOS = &enterpriseApi.ObjectStorageSpec{} - } - newOS := os.Spec - +func getChangedQueueFieldsForIndexer(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields [][]string) { // Push all queue fields - queueChangedFieldsInputs, queueChangedFieldsOutputs = pullQueueChanged(oldQueue, &newQueue, oldOS, &newOS, afterDelete) + queueChangedFieldsInputs, queueChangedFieldsOutputs = pullQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) + // Always set all pipeline fields, not just changed ones pipelineChangedFields = pipelineConfig(true) @@ -1386,7 +1418,7 @@ func imageUpdatedTo9(previousImage string, currentImage string) bool { return strings.HasPrefix(previousVersion, "8") && strings.HasPrefix(currentVersion, "9") } -func pullQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS *enterpriseApi.ObjectStorageSpec, afterDelete bool) (inputs, outputs [][]string) { +func pullQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (inputs, outputs [][]string) { queueProvider := "" if newQueue.Provider == "sqs" { queueProvider = "sqs_smartbus" @@ -1399,7 +1431,13 @@ func pullQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS if oldQueue.Provider != newQueue.Provider || afterDelete { inputs = append(inputs, []string{"remote_queue.type", queueProvider}) } - if newQueue.SQS.AuthRegion != "" &&(oldQueue.SQS.AuthRegion != newQueue.SQS.AuthRegion || afterDelete) { + if !reflect.DeepEqual(oldQueue.SQS.VolList, newQueue.SQS.VolList) || afterDelete { + if s3AccessKey != "" && s3SecretKey != "" { + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.access_key", queueProvider), s3AccessKey}) + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.secret_key", queueProvider), s3SecretKey}) + } + } + if oldQueue.SQS.AuthRegion != newQueue.SQS.AuthRegion || afterDelete { inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.auth_region", queueProvider), newQueue.SQS.AuthRegion}) } if newQueue.SQS.Endpoint != "" && (oldQueue.SQS.Endpoint != newQueue.SQS.Endpoint || afterDelete) { diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index a74ab4acd..2b4026ac5 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -1569,7 +1569,7 @@ func TestIndexerClusterWithReadyState(t *testing.T) { return nil } - newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) indexerClusterPodManager { + newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, @@ -1579,6 +1579,7 @@ func TestIndexerClusterWithReadyState(t *testing.T) { c.Client = mclient return c }, + c: c, } } @@ -2063,6 +2064,9 @@ func TestGetChangedQueueFieldsForIndexer(t *testing.T) { AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", DLQ: "sqs-dlq-test", + VolList: []enterpriseApi.VolumeSpec{ + {SecretRef: "secret"}, + }, }, }, } @@ -2093,12 +2097,20 @@ func TestGetChangedQueueFieldsForIndexer(t *testing.T) { Name: os.Name, }, }, + Status: enterpriseApi.IndexerClusterStatus{ + Queue: &enterpriseApi.QueueSpec{}, + ObjectStorage: &enterpriseApi.ObjectStorageSpec{}, + }, } - queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR, false) - assert.Equal(t, 8, len(queueChangedFieldsInputs)) + key := "key" + secret := "secret" + queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR.Status.Queue, newCR.Status.ObjectStorage, false, key, secret) + assert.Equal(t, 10, len(queueChangedFieldsInputs)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), queue.Spec.SQS.AuthRegion}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), queue.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), os.Spec.S3.Endpoint}, @@ -2108,9 +2120,11 @@ func TestGetChangedQueueFieldsForIndexer(t *testing.T) { {fmt.Sprintf("remote_queue.%s.retry_policy", provider), "max_count"}, }, queueChangedFieldsInputs) - assert.Equal(t, 10, len(queueChangedFieldsOutputs)) + assert.Equal(t, 12, len(queueChangedFieldsOutputs)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), queue.Spec.SQS.AuthRegion}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), queue.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), os.Spec.S3.Endpoint}, @@ -2395,7 +2409,7 @@ func TestApplyIndexerClusterManager_Queue_Success(t *testing.T) { c := fake.NewClientBuilder().WithScheme(scheme).Build() // Object definitions - queue := enterpriseApi.Queue{ + queue := &enterpriseApi.Queue{ TypeMeta: metav1.TypeMeta{ Kind: "Queue", APIVersion: "enterprise.splunk.com/v4", @@ -2414,7 +2428,26 @@ func TestApplyIndexerClusterManager_Queue_Success(t *testing.T) { }, }, } - c.Create(ctx, &queue) + c.Create(ctx, queue) + + os := &enterpriseApi.ObjectStorage{ + TypeMeta: metav1.TypeMeta{ + Kind: "ObjectStorage", + APIVersion: "enterprise.splunk.com/v4", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "os", + Namespace: "test", + }, + Spec: enterpriseApi.ObjectStorageSpec{ + Provider: "s3", + S3: enterpriseApi.S3Spec{ + Endpoint: "https://s3.us-west-2.amazonaws.com", + Path: "s3://bucket/key", + }, + }, + } + c.Create(ctx, os) cm := &enterpriseApi.ClusterManager{ TypeMeta: metav1.TypeMeta{Kind: "ClusterManager"}, @@ -2440,6 +2473,10 @@ func TestApplyIndexerClusterManager_Queue_Success(t *testing.T) { Name: queue.Name, Namespace: queue.Namespace, }, + ObjectStorageRef: corev1.ObjectReference{ + Name: os.Name, + Namespace: os.Namespace, + }, CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ ClusterManagerRef: corev1.ObjectReference{ Name: "cm", diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 0fc94487b..78a51ede2 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -71,9 +71,9 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Update the CR Status defer updateCRStatus(ctx, client, cr, &err) - if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Queue = &enterpriseApi.QueueSpec{} + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil } cr.Status.Replicas = cr.Spec.Replicas @@ -234,14 +234,14 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() if cr.Spec.ObjectStorageRef.Namespace != "" { ns = cr.Spec.ObjectStorageRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.ObjectStorageRef.Name, Namespace: ns, }, &os) @@ -259,9 +259,8 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } // If queue is updated - if !reflect.DeepEqual(cr.Status.Queue, queue.Spec) { - mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - + if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { + mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePushQueueChange(ctx, cr, queueCopy, osCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) @@ -269,7 +268,17 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr return result, err } + for i := int32(0); i < cr.Spec.Replicas; i++ { + ingClient := mgr.getClient(ctx, i) + err = ingClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "ingestor", i) + } + cr.Status.Queue = &queue.Spec + cr.Status.ObjectStorage = &os.Spec } // Upgrade fron automated MC to MC CRD @@ -312,6 +321,27 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr return result, nil } +// getClient for ingestorClusterPodManager returns a SplunkClient for the member n +func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient { + reqLogger := log.FromContext(ctx) + scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace()) + + // Get Pod Name + memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n) + + // Get Fully Qualified Domain Name + fqdnName := splcommon.GetServiceFQDN(mgr.cr.GetNamespace(), + fmt.Sprintf("%s.%s", memberName, GetSplunkServiceName(SplunkIngestor, mgr.cr.GetName(), true))) + + // Retrieve admin password from Pod + adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password") + if err != nil { + scopedLog.Error(err, "Couldn't retrieve the admin password from pod") + } + + return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd) +} + // validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error { // We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster @@ -361,16 +391,39 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } splunkClient := mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) + newCrStatusQueue := newCR.Status.Queue + if newCrStatusQueue == nil { + newCrStatusQueue = &enterpriseApi.QueueSpec{} + } + newCrStatusObjectStorage := newCR.Status.ObjectStorage + if newCrStatusObjectStorage == nil { + newCrStatusObjectStorage = &enterpriseApi.ObjectStorageSpec{} + } + afterDelete := false - if (queue.Spec.SQS.Name != "" && newCR.Status.Queue.SQS.Name != "" && queue.Spec.SQS.Name != newCR.Status.Queue.SQS.Name) || - (queue.Spec.Provider != "" && newCR.Status.Queue.Provider != "" && queue.Spec.Provider != newCR.Status.Queue.Provider) { - if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if (queue.Spec.SQS.Name != "" && newCrStatusQueue.SQS.Name != "" && queue.Spec.SQS.Name != newCrStatusQueue.SQS.Name) || + (queue.Spec.Provider != "" && newCrStatusQueue.Provider != "" && queue.Spec.Provider != newCrStatusQueue.Provider) { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } afterDelete = true } - queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR, afterDelete) + // Secret reference + s3AccessKey, s3SecretKey := "", "" + if queue.Spec.Provider == "sqs" && newCR.Spec.ServiceAccount == "" { + for _, vol := range queue.Spec.SQS.VolList { + if vol.SecretRef != "" { + s3AccessKey, s3SecretKey, err = GetQueueRemoteVolumeSecrets(ctx, vol, k8s, newCR) + if err != nil { + scopedLog.Error(err, "Failed to get queue remote volume secrets") + return err + } + } + } + } + + queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range queueChangedFields { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -390,20 +443,9 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } // getChangedQueueFieldsForIngestor returns a list of changed queue and pipeline fields for ingestor pods -func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueIngestorStatus *enterpriseApi.IngestorCluster, afterDelete bool) (queueChangedFields, pipelineChangedFields [][]string) { - oldQueue := queueIngestorStatus.Status.Queue - if oldQueue == nil { - oldQueue = &enterpriseApi.QueueSpec{} - } - newQueue := &queue.Spec - - oldOS := queueIngestorStatus.Status.ObjectStorage - if oldOS == nil { - oldOS = &enterpriseApi.ObjectStorageSpec{} - } - newOS := &os.Spec +func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFields, pipelineChangedFields [][]string) { // Push changed queue fields - queueChangedFields = pushQueueChanged(oldQueue, newQueue, oldOS, newOS, afterDelete) + queueChangedFields = pushQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) // Always changed pipeline fields pipelineChangedFields = pipelineConfig(false) @@ -412,6 +454,7 @@ func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterprise } type ingestorClusterPodManager struct { + c splcommon.ControllerClient log logr.Logger cr *enterpriseApi.IngestorCluster secrets *corev1.Secret @@ -419,12 +462,13 @@ type ingestorClusterPodManager struct { } // newIngestorClusterPodManager function to create pod manager this is added to write unit test case -var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) ingestorClusterPodManager { +var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ log: log, cr: cr, secrets: secret, newSplunkClient: newSplunkClient, + c: c, } } @@ -442,7 +486,7 @@ func pipelineConfig(isIndexer bool) (output [][]string) { return output } -func pushQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS *enterpriseApi.ObjectStorageSpec, afterDelete bool) (output [][]string) { +func pushQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (output [][]string) { queueProvider := "" if newQueue.Provider == "sqs" { queueProvider = "sqs_smartbus" @@ -455,7 +499,13 @@ func pushQueueChanged(oldQueue, newQueue *enterpriseApi.QueueSpec, oldOS, newOS if oldQueue.Provider != newQueue.Provider || afterDelete { output = append(output, []string{"remote_queue.type", queueProvider}) } - if newQueue.SQS.AuthRegion != "" && (oldQueue.SQS.AuthRegion != newQueue.SQS.AuthRegion || afterDelete) { + if !reflect.DeepEqual(oldQueue.SQS.VolList, newQueue.SQS.VolList) || afterDelete { + if s3AccessKey != "" && s3SecretKey != "" { + output = append(output, []string{fmt.Sprintf("remote_queue.%s.access_key", queueProvider), s3AccessKey}) + output = append(output, []string{fmt.Sprintf("remote_queue.%s.secret_key", queueProvider), s3SecretKey}) + } + } + if oldQueue.SQS.AuthRegion != newQueue.SQS.AuthRegion || afterDelete { output = append(output, []string{fmt.Sprintf("remote_queue.%s.auth_region", queueProvider), newQueue.SQS.AuthRegion}) } if newQueue.SQS.Endpoint != "" && (oldQueue.SQS.Endpoint != newQueue.SQS.Endpoint || afterDelete) { diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index fac91bbbe..995e52ff8 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -25,6 +25,7 @@ import ( "github.com/go-logr/logr" enterpriseApi "github.com/splunk/splunk-operator/api/v4" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" + splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" spltest "github.com/splunk/splunk-operator/pkg/splunk/test" splutil "github.com/splunk/splunk-operator/pkg/splunk/util" "github.com/stretchr/testify/assert" @@ -32,7 +33,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -117,7 +117,8 @@ func TestApplyIngestorCluster(t *testing.T) { Spec: enterpriseApi.IngestorClusterSpec{ Replicas: 3, CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ - Mock: true, + Mock: true, + ServiceAccount: "sa", }, QueueRef: corev1.ObjectReference{ Name: queue.Name, @@ -247,34 +248,12 @@ func TestApplyIngestorCluster(t *testing.T) { assert.True(t, result.Requeue) assert.NotEqual(t, enterpriseApi.PhaseError, cr.Status.Phase) - // Ensure stored StatefulSet status reflects readiness after any reconcile modifications - fetched := &appsv1.StatefulSet{} - _ = c.Get(ctx, types.NamespacedName{Name: "splunk-test-ingestor", Namespace: "test"}, fetched) - fetched.Status.Replicas = replicas - fetched.Status.ReadyReplicas = replicas - fetched.Status.UpdatedReplicas = replicas - if fetched.Status.UpdateRevision == "" { - fetched.Status.UpdateRevision = "v1" - } - c.Update(ctx, fetched) - - // Guarantee all pods have matching revision label - for _, pn := range []string{"splunk-test-ingestor-0", "splunk-test-ingestor-1", "splunk-test-ingestor-2"} { - p := &corev1.Pod{} - if err := c.Get(ctx, types.NamespacedName{Name: pn, Namespace: "test"}, p); err == nil { - if p.Labels == nil { - p.Labels = map[string]string{} - } - p.Labels["controller-revision-hash"] = fetched.Status.UpdateRevision - c.Update(ctx, p) - } - } - // outputs.conf origNew := newIngestorClusterPodManager mockHTTPClient := &spltest.MockHTTPClient{} - newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc) ingestorClusterPodManager { + newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ + c: c, log: l, cr: cr, secrets: secret, newSplunkClient: func(uri, user, pass string) *splclient.SplunkClient { return &splclient.SplunkClient{ManagementURI: uri, Username: user, Password: pass, Client: mockHTTPClient} @@ -284,6 +263,7 @@ func TestApplyIngestorCluster(t *testing.T) { defer func() { newIngestorClusterPodManager = origNew }() propertyKVList := [][]string{ + {"remote_queue.type", provider}, {fmt.Sprintf("remote_queue.%s.encoding_format", provider), "s2s"}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), queue.Spec.SQS.AuthRegion}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), queue.Spec.SQS.Endpoint}, @@ -322,6 +302,13 @@ func TestApplyIngestorCluster(t *testing.T) { } } + for i := 0; i < int(cr.Status.ReadyReplicas); i++ { + podName := fmt.Sprintf("splunk-test-ingestor-%d", i) + baseURL := fmt.Sprintf("https://%s.splunk-%s-ingestor-headless.%s.svc.cluster.local:8089/services/server/control/restart", podName, cr.GetName(), cr.GetNamespace()) + req, _ := http.NewRequest("POST", baseURL, nil) + mockHTTPClient.AddHandler(req, 200, "", nil) + } + // Second reconcile should now yield Ready cr.Status.TelAppInstalled = true result, err = ApplyIngestorCluster(ctx, c, cr) @@ -434,6 +421,9 @@ func TestGetChangedQueueFieldsForIngestor(t *testing.T) { AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", DLQ: "sqs-dlq-test", + VolList: []enterpriseApi.VolumeSpec{ + {SecretRef: "secret"}, + }, }, }, } @@ -464,14 +454,21 @@ func TestGetChangedQueueFieldsForIngestor(t *testing.T) { Name: os.Name, }, }, - Status: enterpriseApi.IngestorClusterStatus{}, + Status: enterpriseApi.IngestorClusterStatus{ + Queue: &enterpriseApi.QueueSpec{}, + ObjectStorage: &enterpriseApi.ObjectStorageSpec{}, + }, } - queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR, false) + key := "key" + secret := "secret" + queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR.Status.Queue, newCR.Status.ObjectStorage, false, key, secret) - assert.Equal(t, 10, len(queueChangedFields)) + assert.Equal(t, 12, len(queueChangedFields)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), queue.Spec.SQS.AuthRegion}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), queue.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), os.Spec.S3.Endpoint}, diff --git a/pkg/splunk/enterprise/types.go b/pkg/splunk/enterprise/types.go index fe96430e4..4267662d8 100644 --- a/pkg/splunk/enterprise/types.go +++ b/pkg/splunk/enterprise/types.go @@ -66,7 +66,7 @@ const ( // SplunkQueue is the queue instance SplunkQueue InstanceType = "queue" - // SplunkObjectStorage is the large message store instance + // SplunkObjectStorage is the object storage instance SplunkObjectStorage InstanceType = "object-storage" // SplunkDeployer is an instance that distributes baseline configurations and apps to search head cluster members diff --git a/pkg/splunk/enterprise/util.go b/pkg/splunk/enterprise/util.go index afafa6ede..882a96ff3 100644 --- a/pkg/splunk/enterprise/util.go +++ b/pkg/splunk/enterprise/util.go @@ -417,6 +417,25 @@ func GetSmartstoreRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi. return accessKey, secretKey, namespaceScopedSecret.ResourceVersion, nil } +// GetQueueRemoteVolumeSecrets is used to retrieve access key and secrete key for Index & Ingestion separation +func GetQueueRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi.VolumeSpec, client splcommon.ControllerClient, cr splcommon.MetaObject) (string, string, error) { + namespaceScopedSecret, err := splutil.GetSecretByName(ctx, client, cr.GetNamespace(), cr.GetName(), volume.SecretRef) + if err != nil { + return "", "", err + } + + accessKey := string(namespaceScopedSecret.Data[s3AccessKey]) + secretKey := string(namespaceScopedSecret.Data[s3SecretKey]) + + if accessKey == "" { + return "", "", errors.New("access Key is missing") + } else if secretKey == "" { + return "", "", errors.New("secret Key is missing") + } + + return accessKey, secretKey, nil +} + // getLocalAppFileName generates the local app file name // For e.g., if the app package name is sample_app.tgz // and etag is "abcd1234", then it will be downloaded locally as sample_app.tgz_abcd1234 diff --git a/pkg/splunk/enterprise/util_test.go b/pkg/splunk/enterprise/util_test.go index f5405b2cf..35523a028 100644 --- a/pkg/splunk/enterprise/util_test.go +++ b/pkg/splunk/enterprise/util_test.go @@ -2624,6 +2624,9 @@ func TestUpdateCRStatus(t *testing.T) { WithStatusSubresource(&enterpriseApi.Standalone{}). WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). + WithStatusSubresource(&enterpriseApi.Queue{}). + WithStatusSubresource(&enterpriseApi.ObjectStorage{}). + WithStatusSubresource(&enterpriseApi.IngestorCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}) c := builder.Build() ctx := context.TODO() @@ -3304,6 +3307,8 @@ func TestGetCurrentImage(t *testing.T) { WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}). + WithStatusSubresource(&enterpriseApi.Queue{}). + WithStatusSubresource(&enterpriseApi.ObjectStorage{}). WithStatusSubresource(&enterpriseApi.IngestorCluster{}) client := builder.Build() client.Create(ctx, ¤t) diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go index 86231df14..8aac52220 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go @@ -42,29 +42,29 @@ var ( queue = enterpriseApi.QueueSpec{ Provider: "sqs", SQS: enterpriseApi.SQSSpec{ - Name: "test-queue", + Name: "index-ingest-separation-test-q", AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", - DLQ: "test-dead-letter-queue", + DLQ: "index-ingest-separation-test-dlq", }, } objectStorage = enterpriseApi.ObjectStorageSpec{ Provider: "s3", S3: enterpriseApi.S3Spec{ Endpoint: "https://s3.us-west-2.amazonaws.com", - Path: "s3://test-bucket/smartbus-test", + Path: "s3://index-ingest-separation-test-bucket/smartbus-test", }, } serviceAccountName = "index-ingest-sa" inputs = []string{ - "[remote_queue:test-queue]", + "[remote_queue:index-ingest-separation-test-q]", "remote_queue.type = sqs_smartbus", "remote_queue.sqs_smartbus.auth_region = us-west-2", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq", "remote_queue.sqs_smartbus.endpoint = https://sqs.us-west-2.amazonaws.com", "remote_queue.sqs_smartbus.large_message_store.endpoint = https://s3.us-west-2.amazonaws.com", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket/smartbus-test", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max_count", "remote_queue.sqs_smartbus.max_count.max_retries_per_part = 4"} outputs = append(inputs, "remote_queue.sqs_smartbus.encoding_format = s2s", "remote_queue.sqs_smartbus.send_interval = 5s") @@ -88,21 +88,21 @@ var ( updateQueue = enterpriseApi.QueueSpec{ Provider: "sqs", SQS: enterpriseApi.SQSSpec{ - Name: "test-queue-updated", + Name: "index-ingest-separation-test-q-updated", AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", - DLQ: "test-dead-letter-queue-updated", + DLQ: "index-ingest-separation-test-dlq-updated", }, } updatedInputs = []string{ - "[remote_queue:test-queue-updated]", + "[remote_queue:index-ingest-separation-test-q-updated]", "remote_queue.type = sqs_smartbus", "remote_queue.sqs_smartbus.auth_region = us-west-2", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue-updated", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq-updated", "remote_queue.sqs_smartbus.endpoint = https://sqs.us-west-2.amazonaws.com", "remote_queue.sqs_smartbus.large_message_store.endpoint = https://s3.us-west-2.amazonaws.com", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket-updated/smartbus-test", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max", "remote_queue.max.sqs_smartbus.max_retries_per_part = 5"} updatedOutputs = append(updatedInputs, "remote_queue.sqs_smartbus.encoding_format = s2s", "remote_queue.sqs_smartbus.send_interval = 4s") @@ -116,9 +116,9 @@ var ( updatedDefaultsIngest = append(updatedDefaultsAll, "[pipeline:indexerPipe]\ndisabled = true") inputsShouldNotContain = []string{ - "[remote_queue:test-queue]", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket/smartbus-test", + "[remote_queue:index-ingest-separation-test-q]", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max_count", "remote_queue.sqs_smartbus.max_count.max_retries_per_part = 4"} outputsShouldNotContain = append(inputs, "remote_queue.sqs_smartbus.send_interval = 5s") diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index 41beae4bc..85069a071 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -75,9 +75,15 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, smoke, indingsep: Splunk Operator can deploy Ingestors and Indexers", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) + + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + queue.SQS.VolList = volumeSpec + updateQueue.SQS.VolList = volumeSpec // Deploy Queue testcaseEnvInst.Log.Info("Deploy Queue") @@ -91,7 +97,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -101,7 +107,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase @@ -131,11 +137,11 @@ var _ = Describe("indingsep test", func() { Expect(err).To(Succeed(), "Unable to delete Ingestor Cluster instance", "Ingestor Cluster Name", ingest) // Delete the Queue - queue := &enterpriseApi.Queue{} - err = deployment.GetInstance(ctx, "queue", queue) - Expect(err).To(Succeed(), "Unable to get Queue instance", "Queue Name", queue) - err = deployment.DeleteCR(ctx, queue) - Expect(err).To(Succeed(), "Unable to delete Queue", "Queue Name", queue) + q = &enterpriseApi.Queue{} + err = deployment.GetInstance(ctx, "queue", q) + Expect(err).To(Succeed(), "Unable to get Queue instance", "Queue Name", q) + err = deployment.DeleteCR(ctx, q) + Expect(err).To(Succeed(), "Unable to delete Queue", "Queue Name", q) // Delete the ObjectStorage objStorage = &enterpriseApi.ObjectStorage{} @@ -148,9 +154,15 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, smoke, indingsep: Splunk Operator can deploy Ingestors and Indexers with additional configurations", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) + + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + queue.SQS.VolList = volumeSpec + updateQueue.SQS.VolList = volumeSpec // Deploy Queue testcaseEnvInst.Log.Info("Deploy Queue") @@ -162,24 +174,19 @@ var _ = Describe("indingsep test", func() { objStorage, err := deployment.DeployObjectStorage(ctx, "os", objectStorage) Expect(err).To(Succeed(), "Unable to deploy ObjectStorage") - // Upload apps to S3 - testcaseEnvInst.Log.Info("Upload apps to S3") - appFileList := testenv.GetAppFileList(appListV1) - _, err = testenv.UploadFilesToS3(testS3Bucket, s3TestDir, appFileList, downloadDirV1) - Expect(err).To(Succeed(), "Unable to upload V1 apps to S3 test directory for IngestorCluster") - // Deploy Ingestor Cluster with additional configurations (similar to standalone app framework test) appSourceName := "appframework-" + enterpriseApi.ScopeLocal + testenv.RandomDNSName(3) appFrameworkSpec := testenv.GenerateAppFrameworkSpec(ctx, testcaseEnvInst, appSourceVolumeName, enterpriseApi.ScopeLocal, appSourceName, s3TestDir, 60) appFrameworkSpec.MaxConcurrentAppDownloads = uint64(5) ic := &enterpriseApi.IngestorCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: deployment.GetName() + "-ingest", - Namespace: testcaseEnvInst.GetName(), + Name: deployment.GetName() + "-ingest", + Namespace: testcaseEnvInst.GetName(), + Finalizers: []string{"enterprise.splunk.com/delete-pvc"}, }, Spec: enterpriseApi.IngestorClusterSpec{ CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ - ServiceAccount: serviceAccountName, + // ServiceAccount: serviceAccountName, LivenessInitialDelaySeconds: 600, ReadinessInitialDelaySeconds: 50, StartupProbe: &enterpriseApi.Probe{ @@ -205,10 +212,10 @@ var _ = Describe("indingsep test", func() { Image: testcaseEnvInst.GetSplunkImage(), }, }, - QueueRef: v1.ObjectReference{Name: q.Name}, - ObjectStorageRef: v1.ObjectReference{Name: objStorage.Name}, - Replicas: 3, - AppFrameworkConfig: appFrameworkSpec, + QueueRef: v1.ObjectReference{Name: q.Name}, + ObjectStorageRef: v1.ObjectReference{Name: objStorage.Name}, + Replicas: 3, + AppFrameworkConfig: appFrameworkSpec, }, } @@ -220,6 +227,12 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster is in Ready phase") testenv.IngestorReady(ctx, deployment, testcaseEnvInst) + // Upload apps to S3 + testcaseEnvInst.Log.Info("Upload apps to S3") + appFileList := testenv.GetAppFileList(appListV1) + _, err = testenv.UploadFilesToS3(testS3Bucket, s3TestDir, appFileList, downloadDirV1) + Expect(err).To(Succeed(), "Unable to upload V1 apps to S3 test directory for IngestorCluster") + // Verify Ingestor Cluster Pods have apps installed testcaseEnvInst.Log.Info("Verify Ingestor Cluster Pods have apps installed") ingestorPod := []string{fmt.Sprintf(testenv.IngestorPod, deployment.GetName()+"-ingest", 0)} @@ -252,9 +265,14 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, integration, indingsep: Splunk Operator can deploy Ingestors and Indexers with correct setup", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) + + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + queue.SQS.VolList = volumeSpec // Deploy Queue testcaseEnvInst.Log.Info("Deploy Queue") @@ -268,7 +286,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -278,7 +296,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase @@ -301,7 +319,7 @@ var _ = Describe("indingsep test", func() { // Verify Ingestor Cluster Status testcaseEnvInst.Log.Info("Verify Ingestor Cluster Status") - Expect(ingest.Status.Queue).To(Equal(queue), "Ingestor queue status is not the same as provided as input") + Expect(*ingest.Status.Queue).To(Equal(queue), "Ingestor queue status is not the same as provided as input") // Get instance of current Indexer Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Indexer Cluster CR with latest config") @@ -311,7 +329,7 @@ var _ = Describe("indingsep test", func() { // Verify Indexer Cluster Status testcaseEnvInst.Log.Info("Verify Indexer Cluster Status") - Expect(index.Status.Queue).To(Equal(queue), "Indexer queue status is not the same as provided as input") + Expect(*index.Status.Queue).To(Equal(queue), "Indexer queue status is not the same as provided as input") // Verify conf files testcaseEnvInst.Log.Info("Verify conf files") @@ -359,9 +377,15 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, integration, indingsep: Splunk Operator can update Ingestors and Indexers with correct setup", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) + + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + queue.SQS.VolList = volumeSpec + updateQueue.SQS.VolList = volumeSpec // Deploy Queue testcaseEnvInst.Log.Info("Deploy Queue") @@ -375,7 +399,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -385,7 +409,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase @@ -412,14 +436,10 @@ var _ = Describe("indingsep test", func() { err = deployment.UpdateCR(ctx, queue) Expect(err).To(Succeed(), "Unable to deploy Queue with updated CR") - // Ensure that Ingestor Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster has not been restarted") + // Ensure that Ingestor Cluster is in Ready phase + testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster is in Ready phase") testenv.IngestorReady(ctx, deployment, testcaseEnvInst) - // Ensure that Indexer Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Indexer Cluster has not been restarted") - testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst) - // Get instance of current Ingestor Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config") ingest := &enterpriseApi.IngestorCluster{} @@ -428,7 +448,11 @@ var _ = Describe("indingsep test", func() { // Verify Ingestor Cluster Status testcaseEnvInst.Log.Info("Verify Ingestor Cluster Status") - Expect(ingest.Status.Queue).To(Equal(updateQueue), "Ingestor queue status is not the same as provided as input") + Expect(*ingest.Status.Queue).To(Equal(updateQueue), "Ingestor queue status is not the same as provided as input") + + // Ensure that Indexer Cluster is in Ready phase + testcaseEnvInst.Log.Info("Ensure that Indexer Cluster is in Ready phase") + testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst) // Get instance of current Indexer Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Indexer Cluster CR with latest config") @@ -438,7 +462,7 @@ var _ = Describe("indingsep test", func() { // Verify Indexer Cluster Status testcaseEnvInst.Log.Info("Verify Indexer Cluster Status") - Expect(index.Status.Queue).To(Equal(updateQueue), "Indexer queue status is not the same as provided as input") + Expect(*index.Status.Queue).To(Equal(updateQueue), "Indexer queue status is not the same as provided as input") // Verify conf files testcaseEnvInst.Log.Info("Verify conf files") diff --git a/test/testenv/remote_index_utils.go b/test/testenv/remote_index_utils.go index 0eb2b485c..f696a4a17 100644 --- a/test/testenv/remote_index_utils.go +++ b/test/testenv/remote_index_utils.go @@ -86,6 +86,14 @@ func RollHotToWarm(ctx context.Context, deployment *Deployment, podName string, return true } +// GenerateQueueVolumeSpec return VolumeSpec struct with given values +func GenerateQueueVolumeSpec(name, secretRef string) enterpriseApi.VolumeSpec { + return enterpriseApi.VolumeSpec{ + Name: name, + SecretRef: secretRef, + } +} + // GenerateIndexVolumeSpec return VolumeSpec struct with given values func GenerateIndexVolumeSpec(volumeName string, endpoint string, secretRef string, provider string, storageType string, region string) enterpriseApi.VolumeSpec { return enterpriseApi.VolumeSpec{ diff --git a/test/testenv/util.go b/test/testenv/util.go index d9c6d5807..366ea3668 100644 --- a/test/testenv/util.go +++ b/test/testenv/util.go @@ -396,8 +396,8 @@ func newIndexerCluster(name, ns, licenseManagerName string, replicas int, cluste }, Defaults: ansibleConfig, }, - Replicas: int32(replicas), - QueueRef: queue, + Replicas: int32(replicas), + QueueRef: queue, ObjectStorageRef: os, }, } @@ -426,8 +426,8 @@ func newIngestorCluster(name, ns string, replicas int, splunkImage string, queue Image: splunkImage, }, }, - Replicas: int32(replicas), - QueueRef: queue, + Replicas: int32(replicas), + QueueRef: queue, ObjectStorageRef: os, }, }