Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion e2e/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ def verify_role():
"Origin": 2,
"IsDbOwner": False,
"Deleted": False,
"Rotated": False
"Rotated": False,
"Degraded": False,
})
return True
except:
Expand Down
61 changes: 44 additions & 17 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,40 +1059,52 @@ func (c *Cluster) syncStandbyClusterConfiguration() error {
func (c *Cluster) syncSecrets() error {
c.logger.Debug("syncing secrets")
c.setProcessName("syncing secrets")
errors := make([]string, 0)
generatedSecrets := c.generateUserSecrets()
retentionUsers := make([]string, 0)
currentTime := time.Now()

for secretUsername, generatedSecret := range generatedSecrets {
secret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{})
pgUserDegraded := false
createdSecret, err := c.KubeClient.Secrets(generatedSecret.Namespace).Create(context.TODO(), generatedSecret, metav1.CreateOptions{})
if err == nil {
c.Secrets[secret.UID] = secret
c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(secret.ObjectMeta), generatedSecret.Namespace, secret.UID)
c.Secrets[createdSecret.UID] = createdSecret
c.logger.Infof("created new secret %s, namespace: %s, uid: %s", util.NameFromMeta(createdSecret.ObjectMeta), generatedSecret.Namespace, createdSecret.UID)
continue
}
if k8sutil.ResourceAlreadyExists(err) {
if err = c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime); err != nil {
c.logger.Warningf("syncing secret %s failed: %v", util.NameFromMeta(secret.ObjectMeta), err)
updatedSecret, err := c.updateSecret(secretUsername, generatedSecret, &retentionUsers, currentTime)
if err == nil {
c.Secrets[updatedSecret.UID] = updatedSecret
continue
}
errors = append(errors, fmt.Sprintf("syncing secret %s failed: %v", util.NameFromMeta(updatedSecret.ObjectMeta), err))
pgUserDegraded = true
} else {
return fmt.Errorf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err)
errors = append(errors, fmt.Sprintf("could not create secret for user %s: in namespace %s: %v", secretUsername, generatedSecret.Namespace, err))
pgUserDegraded = true
}
c.updatePgUser(secretUsername, pgUserDegraded)
}

// remove rotation users that exceed the retention interval
if len(retentionUsers) > 0 {
err := c.initDbConn()
if err != nil {
return fmt.Errorf("could not init db connection: %v", err)
errors = append(errors, fmt.Sprintf("could not init db connection: %v", err))
}
if err = c.cleanupRotatedUsers(retentionUsers, c.pgDb); err != nil {
return fmt.Errorf("error removing users exceeding configured retention interval: %v", err)
errors = append(errors, fmt.Sprintf("error removing users exceeding configured retention interval: %v", err))
}
if err := c.closeDbConn(); err != nil {
c.logger.Errorf("could not close database connection after removing users exceeding configured retention interval: %v", err)
errors = append(errors, fmt.Sprintf("could not close database connection after removing users exceeding configured retention interval: %v", err))
}
}

if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

Expand All @@ -1105,7 +1117,7 @@ func (c *Cluster) updateSecret(
secretUsername string,
generatedSecret *v1.Secret,
retentionUsers *[]string,
currentTime time.Time) error {
currentTime time.Time) (*v1.Secret, error) {
var (
secret *v1.Secret
err error
Expand All @@ -1115,7 +1127,7 @@ func (c *Cluster) updateSecret(

// get the secret first
if secret, err = c.KubeClient.Secrets(generatedSecret.Namespace).Get(context.TODO(), generatedSecret.Name, metav1.GetOptions{}); err != nil {
return fmt.Errorf("could not get current secret: %v", err)
return generatedSecret, fmt.Errorf("could not get current secret: %v", err)
}
c.Secrets[secret.UID] = secret

Expand Down Expand Up @@ -1211,24 +1223,22 @@ func (c *Cluster) updateSecret(
if updateSecret {
c.logger.Infof("%s", updateSecretMsg)
if secret, err = c.KubeClient.Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("could not update secret %s: %v", secretName, err)
return secret, fmt.Errorf("could not update secret %s: %v", secretName, err)
}
c.Secrets[secret.UID] = secret
}

if changed, _ := c.compareAnnotations(secret.Annotations, generatedSecret.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(generatedSecret.Annotations)
if err != nil {
return fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err)
return secret, fmt.Errorf("could not form patch for secret %q annotations: %v", secret.Name, err)
}
secret, err = c.KubeClient.Secrets(secret.Namespace).Patch(context.TODO(), secret.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err)
return secret, fmt.Errorf("could not patch annotations for secret %q: %v", secret.Name, err)
}
c.Secrets[secret.UID] = secret
}

return nil
return secret, nil
}

func (c *Cluster) rotatePasswordInSecret(
Expand Down Expand Up @@ -1334,6 +1344,23 @@ func (c *Cluster) rotatePasswordInSecret(
return updateSecretMsg, nil
}

func (c *Cluster) updatePgUser(secretUsername string, degraded bool) {
for key, pgUser := range c.pgUsers {
if pgUser.Name == secretUsername {
pgUser.Degraded = degraded
c.pgUsers[key] = pgUser
return
}
}
for key, pgUser := range c.systemUsers {
if pgUser.Name == secretUsername {
pgUser.Degraded = degraded
c.systemUsers[key] = pgUser
return
}
}
}

func (c *Cluster) syncRoles() (err error) {
c.setProcessName("syncing roles")

Expand Down
20 changes: 18 additions & 2 deletions pkg/cluster/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
k8stesting "k8s.io/client-go/testing"

"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/zalando/postgres-operator/mocks"
Expand Down Expand Up @@ -50,6 +53,16 @@ func newFakeK8sSyncClient() (k8sutil.KubernetesClient, *fake.Clientset) {
}

func newFakeK8sSyncSecretsClient() (k8sutil.KubernetesClient, *fake.Clientset) {
// add a reactor that checks namespace existence before creating secrets
clientSet.PrependReactor("create", "secrets", func(action k8stesting.Action) (bool, runtime.Object, error) {
createAction := action.(k8stesting.CreateAction)
secret := createAction.GetObject().(*v1.Secret)
if secret.Namespace != "default" {
return true, nil, errors.New("namespace does not exist")
}
return false, nil, nil
})

return k8sutil.KubernetesClient{
SecretsGetter: clientSet.CoreV1(),
}, clientSet
Expand Down Expand Up @@ -810,7 +823,7 @@ func TestUpdateSecret(t *testing.T) {
},
Spec: acidv1.PostgresSpec{
Databases: map[string]string{dbname: dbowner},
Users: map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}},
Users: map[string]acidv1.UserFlags{appUser: {}, "bar": {}, dbowner: {}, "not-exist.test_user": {}},
UsersIgnoringSecretRotation: []string{"bar"},
UsersWithInPlaceSecretRotation: []string{dbowner},
Streams: []acidv1.Stream{
Expand Down Expand Up @@ -842,6 +855,7 @@ func TestUpdateSecret(t *testing.T) {
PasswordRotationInterval: 1,
PasswordRotationUserRetention: 3,
},
EnableCrossNamespaceSecret: true,
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
Expand All @@ -864,7 +878,9 @@ func TestUpdateSecret(t *testing.T) {

allUsers := make(map[string]spec.PgUser)
for _, pgUser := range cluster.pgUsers {
allUsers[pgUser.Name] = pgUser
if !pgUser.Degraded {
allUsers[pgUser.Name] = pgUser
}
}
for _, systemUser := range cluster.systemUsers {
allUsers[systemUser.Name] = systemUser
Expand Down
1 change: 1 addition & 0 deletions pkg/spec/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type PgUser struct {
IsDbOwner bool `yaml:"is_db_owner"`
Deleted bool `yaml:"deleted"`
Rotated bool `yaml:"rotated"`
Degraded bool `yaml:"degraded"`
}

func (user *PgUser) Valid() bool {
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/users/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (strategy DefaultUserSyncStrategy) ProduceSyncRequests(dbUsers spec.PgUserM
if newUser.Deleted {
continue
}
// when the secret of the user could not be created or updated skip any database actions
if newUser.Degraded {
continue
}
dbUser, exists := dbUsers[name]
if !exists {
reqs = append(reqs, spec.PgSyncUserRequest{Kind: spec.PGSyncUserAdd, User: newUser})
Expand Down
Loading