Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/flink/command_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command {
cmd.AddCommand(c.newCatalogDeleteCommand())
cmd.AddCommand(c.newCatalogDescribeCommand())
cmd.AddCommand(c.newCatalogListCommand())
cmd.AddCommand(c.newCatalogDatabaseCommand())

return cmd
}
Expand Down
113 changes: 113 additions & 0 deletions internal/flink/command_catalog_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package flink

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we expect a CMF SDK upgrade to support this feature? I don't see any go.mod changes.

Or a Docker image change is good enough?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an ongoing effort in parallel for CMF SDK upgrade in CLI repo. Will be updating this PR once that change is merged.


pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/output"
)

type databaseOut struct {
CreationTime string `human:"Creation Time" serialized:"creation_time"`
Name string `human:"Name" serialized:"name"`
Catalog string `human:"Catalog" serialized:"catalog"`
}

func (c *command) newCatalogDatabaseCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "database",
Short: "Manage Flink databases in Confluent Platform.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout},
}

Comment thread
paras-negi-flink marked this conversation as resolved.
cmd.AddCommand(c.newCatalogDatabaseCreateCommand())
cmd.AddCommand(c.newCatalogDatabaseDeleteCommand())
cmd.AddCommand(c.newCatalogDatabaseDescribeCommand())
cmd.AddCommand(c.newCatalogDatabaseListCommand())
cmd.AddCommand(c.newCatalogDatabaseUpdateCommand())

return cmd
}

func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error {
if output.GetFormat(cmd) == output.Human {
table := output.NewTable(cmd)
var creationTime string
if sdkDatabase.GetMetadata().CreationTimestamp != nil {
creationTime = *sdkDatabase.GetMetadata().CreationTimestamp
}
table.Add(&databaseOut{
CreationTime: creationTime,
Name: sdkDatabase.GetMetadata().Name,
Catalog: catalogName,
})
Comment on lines +48 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be a concern if human mode has such a difference compared to json or yml mode through convertSdkDatabaseToLocalDatabase()?

return table.Print()
}

localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase)
return output.SerializedOutput(cmd, localDatabase)
}

func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) {
data, err := os.ReadFile(resourceFilePath)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err)
}

var genericData map[string]interface{}
ext := filepath.Ext(resourceFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &genericData)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &genericData)
default:
return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err)
}

jsonBytes, err := json.Marshal(genericData)
if err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err)
}

var sdkDatabase cmfsdk.KafkaDatabase
if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil {
return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err)
}

return sdkDatabase, nil
}

func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase {
return LocalKafkaDatabase{
ApiVersion: sdkDatabase.ApiVersion,
Kind: sdkDatabase.Kind,
Metadata: LocalDatabaseMetadata{
Name: sdkDatabase.Metadata.Name,
CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp,
UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp,
Uid: sdkDatabase.Metadata.Uid,
Labels: sdkDatabase.Metadata.Labels,
Annotations: sdkDatabase.Metadata.Annotations,
},
Spec: LocalKafkaDatabaseSpec{
KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{
ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig,
ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId,
},
AlterEnvironments: sdkDatabase.Spec.AlterEnvironments,
},
}
}
50 changes: 50 additions & 0 deletions internal/flink/command_catalog_database_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "create <resourceFilePath>",
Short: "Create a Flink database.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it consistent.

Suggested change
Short: "Create a Flink database.",
Short: "Create a Flink database in Confluent Platform.",

Long: "Create a Flink database in a catalog in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseCreate,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error {
resourceFilePath := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabase, err := readDatabaseResourceFile(resourceFilePath)
if err != nil {
return err
}

sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
58 changes: 58 additions & 0 deletions internal/flink/command_catalog_database_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/deletion"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/resource"
)

func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "delete <name>",
Short: "Delete a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDelete,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddForceFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

existenceFunc := func(name string) bool {
_, err := client.DescribeDatabase(c.createContext(), catalogName, name)
return err == nil
}

if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil {
// We are validating only the existence of the resources (there is no prefix validation).
// Thus, we can add some extra context for the error.
suggestions := "List available Flink databases with `confluent flink catalog database list`."
Comment thread
paras-negi-flink marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is user facing suggestion, if --catalog is required flag and not something we save into CLI config, we should add the flag to the suggestion.

suggestions += "\nCheck that CMF is running and accessible."
return errors.NewErrorWithSuggestions(err.Error(), suggestions)
}
Comment thread
paras-negi-flink marked this conversation as resolved.

deleteFunc := func(name string) error {
return client.DeleteDatabase(c.createContext(), catalogName, name)
}

_, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase)
return err
}
44 changes: 44 additions & 0 deletions internal/flink/command_catalog_database_describe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
)

func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <name>",
Short: "Describe a Flink database in Confluent Platform.",
Args: cobra.ExactArgs(1),
RunE: c.catalogDatabaseDescribe,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error {
name := args[0]

catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name)
if err != nil {
return err
}

return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName)
}
64 changes: 64 additions & 0 deletions internal/flink/command_catalog_database_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package flink

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *command) newCatalogDatabaseListCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Flink databases in a catalog in Confluent Platform.",
Args: cobra.NoArgs,
RunE: c.catalogDatabaseList,
}

cmd.Flags().String("catalog", "", "Name of the catalog.")
cobra.CheckErr(cmd.MarkFlagRequired("catalog"))
addCmfFlagSet(cmd)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error {
catalogName, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

client, err := c.GetCmfClient(cmd)
if err != nil {
return err
}

sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName)
if err != nil {
return err
}

if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, db := range sdkDatabases {
var creationTime string
if db.GetMetadata().CreationTimestamp != nil {
creationTime = *db.GetMetadata().CreationTimestamp
}
list.Add(&databaseOut{
CreationTime: creationTime,
Name: db.GetMetadata().Name,
Catalog: catalogName,
})
}
return list.Print()
}

localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases))
for _, sdkDatabase := range sdkDatabases {
localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase))
}

return output.SerializedOutput(cmd, localDatabases)
}
Loading