CF-1871 : Add Catalog Update Command#3304
CF-1871 : Add Catalog Update Command#3304Paras Negi (paras-negi-flink) wants to merge 1 commit intomainfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
Adds support for updating CMF Kafka Catalogs via the confluent flink catalog update command (Confluent Platform/on-prem), including REST client support and integration test coverage with golden fixtures.
Changes:
- Introduces
flink catalog update <resourceFilePath>command (JSON/YAML resource file) and shared catalog output formatting. - Adds CMF REST client support for updating Kafka catalogs and extends the on-prem test server to handle PUT requests.
- Adds on-prem integration tests plus input/output fixtures and help goldens for the new command.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
internal/flink/command_catalog.go |
Registers the new update subcommand; adds helpers for printing catalog output and reading JSON/YAML resource files. |
internal/flink/command_catalog_update.go |
Implements the flink catalog update command behavior (read file → update → describe → print). |
pkg/flink/cmf_rest_client.go |
Adds UpdateCatalog wrapper around the CMF SDK update endpoint with CLI-friendly error wrapping. |
test/test-server/flink_onprem_handler.go |
Extends the CMF catalog handler to support PUT for update scenarios. |
test/flink_onprem_test.go |
Adds integration test cases for catalog update (JSON + YAML). |
test/fixtures/input/flink/catalog/* |
Adds JSON/YAML resource files for update success and failure cases. |
test/fixtures/output/flink/catalog/* |
Adds golden outputs for update (human/json/yaml), update failure, and update help; updates catalog help to include update. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func printCatalogOutput(cmd *cobra.Command, sdkOutputCatalog cmfsdk.KafkaCatalog) error { | ||
| if output.GetFormat(cmd) == output.Human { | ||
| table := output.NewTable(cmd) | ||
| databases := make([]string, 0, len(sdkOutputCatalog.Spec.GetKafkaClusters())) | ||
| for _, kafkaCluster := range sdkOutputCatalog.Spec.GetKafkaClusters() { | ||
| databases = append(databases, kafkaCluster.DatabaseName) | ||
| } | ||
| var creationTime string | ||
| if sdkOutputCatalog.GetMetadata().CreationTimestamp != nil { | ||
| creationTime = *sdkOutputCatalog.GetMetadata().CreationTimestamp | ||
| } | ||
| table.Add(&catalogOut{ | ||
| CreationTime: creationTime, | ||
| Name: sdkOutputCatalog.GetMetadata().Name, | ||
| Databases: databases, | ||
| }) | ||
| return table.Print() | ||
| } | ||
|
|
||
| localCatalog := convertSdkCatalogToLocalCatalog(sdkOutputCatalog) | ||
| return output.SerializedOutput(cmd, localCatalog) | ||
| } |
There was a problem hiding this comment.
printCatalogOutput centralizes catalog rendering, but catalogCreate and catalogDescribe still contain their own (nearly identical) output formatting logic. To avoid future drift between commands, consider switching those commands to call printCatalogOutput as well.
There was a problem hiding this comment.
Robert Metzger (@rmetzger) Kumar Mallikarjuna (@kumar-mallikarjuna) does it make sense to touch other existing files to reduce duplication, or should we tackle that in another change ?
|




Release Notes
Breaking Changes
New Features
Bug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
This PR implements CF-1871 — CLI: Update SQL catalog for the Confluent CLI, targeting Confluent Platform / CP Flink (CMF on-prem):
Adds a new command group under
confluent flink catalog:confluent flink catalog update <resourceFilePath>Wires this command to the existing CMF Catalog REST APIs:
PUT /cmf/api/v1/catalogs/kafka/{catName}.Updates
CmfRestClientwrapper forCatalogoperations and corresponding local types/output formatting, following existing patterns used for catalogs and compute pools.Blast Radius
confluent flinkandconfluent kafkacommands) is unchanged.References
KafkaDatabaseand the Flyway migration (e.g.,[CF-1772] Add flyway migration for KafkaCatalog -> KafkaDatabase).Test & Review
Environment
confluentinc/cliCF-18712.3-SNAPSHOT(image:confluentinc/cp-cmf:c505ee8b) - Kubernetes: local cluster with CMF deployed (cmf-serviceexposed viakubectl port-forward svc/cmf-service 8080:80 -n e2e)parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent flink catalog describe new-kcat --url http://localhost:8080 --output json { "apiVersion": "cmf.confluent.io/v1", "kind": "KafkaCatalog", "metadata": { "name": "new-kcat", "creationTimestamp": "2026-04-03T14:46:29.297Z", "uid": "41acaae2-62f0-4d00-a8db-4235701ac37e", "labels": {}, "annotations": {} }, "spec": { "srInstance": { "connectionConfig": { "url": "http://localhost:8081" } }, "kafkaClusters": [] } }parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent flink catalog describe new-kcat --url http://localhost:8080 --output json { "apiVersion": "cmf.confluent.io/v1", "kind": "KafkaCatalog", "metadata": { "name": "new-kcat", "creationTimestamp": "2026-04-03T14:46:29.297Z", "uid": "41acaae2-62f0-4d00-a8db-4235701ac37e", "labels": {}, "annotations": {} }, "spec": { "srInstance": { "connectionConfig": { "url": "http://localhost:8082" } }, "kafkaClusters": [] } }Non-empty KafkaCatalog.spec.kafkaClusters is resricted owing to the Flyway Migration