From e33ffed52482e7771774b2f3ed8ce9723fbb1ab8 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Thu, 24 Mar 2016 10:15:38 +0100 Subject: [PATCH 1/3] registry: remove units from etcd registry upon DestroyUnit() So far each command "fleetctl destroy unit" has removed job entries from the etcd registry, under /_coreos.com/fleet/job. But it has not removed its unit file, under /_coreos.com/fleet/unit. As a result, fleet left lots of garbages in the etcd registry, so users had to manually clean them up. So this patch gets unit contents deleted actually from etcd registry when DestroyUnit() gets called. To avoid potential hash collisions, it first fetches a list of units from registry, to check there's any duplicated entry. Only if no duplicated unit is found, fleetd actually deletes the unit from registry. Fixes: https://github.com/coreos/fleet/issues/1456 Fixes: https://github.com/coreos/fleet/issues/1290 Reference: https://github.com/coreos/fleet/pull/1291 --- registry/job.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) diff --git a/registry/job.go b/registry/job.go index 4f4248a62..3fb50b013 100644 --- a/registry/job.go +++ b/registry/job.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" "path" + "reflect" "sort" + "strings" etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" @@ -285,6 +287,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc } +// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and +// instantiates and returns a representative *job.Unit, transitively fetching +// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(), +// this does not use not.Value itself as a hash key, but it uses the last part +// of node.Key for the hash key. +func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) { + var err error + var jm jobModel + + if err = unmarshal(node.Value, &jm); err != nil { + return nil, err + } + + parts := strings.Split(node.Key, "/") + if len(parts) == 0 { + log.Errorf("key '%v' doesn't have enough parts", node.Key) + return nil, nil + } + stringHash := parts[len(parts)-1] + + hashKey, err := unit.HashFromHexString(stringHash) + if err != nil { + log.Errorf("cannot convert key string into hash. %v", err) + return nil, nil + } + + var unit *unit.UnitFile + + unit = unitHashLookupFunc(hashKey) + if unit == nil { + log.Warningf("No Unit found in Registry for Job(%s)", jm.Name) + return nil, nil + } + + ju := &job.Unit{ + Name: jm.Name, + Unit: *unit, + } + return ju, nil + +} + // jobModel is used for serializing and deserializing Jobs stored in the Registry type jobModel struct { Name string @@ -298,7 +342,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { opts := &etcd.DeleteOptions{ Recursive: true, } - _, err := r.kAPI.Delete(context.Background(), key, opts) + u, err := r.Unit(name) + if err != nil { + log.Warningf("r.Unit error, name=%s\n", name) + u = nil + } + _, err = r.kAPI.Delete(context.Background(), key, opts) if err != nil { if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { err = errors.New("job does not exist") @@ -308,9 +357,78 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { } // TODO(jonboulle): add unit reference counting and actually destroying Units + + // Delete unit from the etcd registry + if u != nil { + // check if the unit is really valid. If not, return err. + key = r.hashedUnitPath(u.Unit.Hash()) + unitMatch, err := r.checkUnitMatch(u) + if err != nil { + return fmt.Errorf("Failed checking unit validity") + } + if unitMatch { + return fmt.Errorf("Invalid unit in the etcd registry: not deleting from registry.") + } + _, err = r.kAPI.Delete(context.Background(), key, opts) + if err != nil { + // NOTE: unable to delete the key, but it's practically no big deal, + // as the garbage will be later cleaned up on the etcd side. + return fmt.Errorf("Failed deleting unit from registry: %s", err) + } + } return nil } +// checkUnitMatch() determines if the given unit is a really valid entry in the +// etcd registry, by querying the entries via RPC. +func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) { + key := path.Join(r.keyPrefix, unitPrefix) + opts := &etcd.GetOptions{ + Recursive: true, + } + res, err := r.kAPI.Get(context.Background(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = nil + } + return false, err + } + + return r.checkUnitSiblings(unitDel, res.Node) +} + +// checkUnitSiblings() returns true if there's a duplicated entry already in +// the etcd registry. +func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) { + uhashKey := dir.Key + unitDelName := r.hashedUnitPath(unitDel.Unit.Hash()) + for _, uhashNode := range dir.Nodes { + newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash) + if err != nil { + log.Errorf("cannot get unit. err: %v", err) + return false, err + } + if newUnit == nil { + log.Debugf("unable to parse Unit in Registry at key %s", uhashKey) + continue + } + + if unitDelName == uhashNode.Key { + log.Debugf("skipping the entry itself.") + continue + } + + if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) { + // matched. so this unit has a duplicated entry, so return + log.Debugf("won't erase this key, as a duplicated entry is found.") + return true, nil + } + } + + log.Debugf("no matching entry, so it can be removed.") + return false, nil +} + // CreateUnit attempts to store a Unit and its associated unit file in the registry func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { if err := r.storeOrGetUnitFile(u.Unit); err != nil { From bc26df17a1ad2ebcc2b0a8390a929642174b56d2 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Wed, 23 Mar 2016 11:40:39 +0100 Subject: [PATCH 2/3] functional: support running etcdctl, as well as using Cluster.Keyspace Introduce new helpers for running etcdctl for functional tests. Also export a method Cluster.Keyspace() to get it called by functional tests. --- build-env | 1 + functional/platform/cluster.go | 2 ++ functional/platform/nspawn.go | 6 ++--- functional/util/util.go | 42 ++++++++++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/build-env b/build-env index f068b51bd..b2475b8c1 100755 --- a/build-env +++ b/build-env @@ -24,3 +24,4 @@ export PATH="${GOROOT}/bin:${PATH}" export FLEETD_BIN="${CDIR}/bin/fleetd" export FLEETCTL_BIN="${CDIR}/bin/fleetctl" export FLEETD_TEST_ENV="enable_grpc=false" +export ETCDCTL_BIN="/usr/bin/etcdctl" diff --git a/functional/platform/cluster.go b/functional/platform/cluster.go index e6cc54d49..900dd0d3d 100644 --- a/functional/platform/cluster.go +++ b/functional/platform/cluster.go @@ -42,6 +42,8 @@ type Cluster interface { WaitForNActiveUnits(Member, int) (map[string][]util.UnitState, error) WaitForNUnitFiles(Member, int) (map[string][]util.UnitFileState, error) WaitForNMachines(Member, int) ([]string, error) + + Keyspace() string } func CreateNClusterMembers(cl Cluster, count int) ([]Member, error) { diff --git a/functional/platform/nspawn.go b/functional/platform/nspawn.go index ac1577ace..38895d6f5 100644 --- a/functional/platform/nspawn.go +++ b/functional/platform/nspawn.go @@ -100,7 +100,7 @@ func (nc *nspawnCluster) nextID() string { return strconv.Itoa(nc.maxID) } -func (nc *nspawnCluster) keyspace() string { +func (nc *nspawnCluster) Keyspace() string { // TODO(jonboulle): generate this dynamically with atomic in order keys? return fmt.Sprintf("/fleet_functional/%s", nc.name) } @@ -367,7 +367,7 @@ func (nc *nspawnCluster) buildConfigDrive(dir, ip string) error { defer userFile.Close() etcd := "http://172.18.0.1:4001" - return util.BuildCloudConfig(userFile, ip, etcd, nc.keyspace()) + return util.BuildCloudConfig(userFile, ip, etcd, nc.Keyspace()) } func (nc *nspawnCluster) Members() []Member { @@ -601,7 +601,7 @@ func (nc *nspawnCluster) Destroy(t *testing.T) error { // TODO(bcwaldon): This returns 4 on success, but we can't easily // ignore just that return code. Ignore the returned error // altogether until this is fixed. - run("etcdctl rm --recursive " + nc.keyspace()) + run("etcdctl rm --recursive " + nc.Keyspace()) run("ip link del fleet0") diff --git a/functional/util/util.go b/functional/util/util.go index 7e690f6f2..7455d147a 100644 --- a/functional/util/util.go +++ b/functional/util/util.go @@ -28,6 +28,7 @@ import ( ) var fleetctlBinPath string +var etcdctlBinPath string func init() { fleetctlBinPath = os.Getenv("FLEETCTL_BIN") @@ -39,6 +40,15 @@ func init() { os.Exit(1) } + etcdctlBinPath = os.Getenv("ETCDCTL_BIN") + if etcdctlBinPath == "" { + fmt.Println("ETCDCTL_BIN environment variable must be set") + os.Exit(1) + } else if _, err := os.Stat(etcdctlBinPath); err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } + if os.Getenv("SSH_AUTH_SOCK") == "" { fmt.Println("SSH_AUTH_SOCK environment variable must be set") os.Exit(1) @@ -79,6 +89,38 @@ func RunFleetctlWithInput(input string, args ...string) (string, string, error) return stdoutBytes.String(), stderrBytes.String(), err } +func RunEtcdctl(args ...string) (string, string, error) { + log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " ")) + var stdoutBytes, stderrBytes bytes.Buffer + cmd := exec.Command(etcdctlBinPath, args...) + cmd.Stdout = &stdoutBytes + cmd.Stderr = &stderrBytes + err := cmd.Run() + return stdoutBytes.String(), stderrBytes.String(), err +} + +func RunEtcdctlWithInput(input string, args ...string) (string, string, error) { + log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " ")) + var stdoutBytes, stderrBytes bytes.Buffer + cmd := exec.Command(etcdctlBinPath, args...) + cmd.Stdout = &stdoutBytes + cmd.Stderr = &stderrBytes + stdin, err := cmd.StdinPipe() + if err != nil { + return "", "", err + } + + if err = cmd.Start(); err != nil { + return "", "", err + } + + stdin.Write([]byte(input)) + stdin.Close() + err = cmd.Wait() + + return stdoutBytes.String(), stderrBytes.String(), err +} + // ActiveToSingleStates takes a map of active states (such as that returned by // WaitForNActiveUnits) and ensures that each unit has at most a single active // state. It returns a mapping of unit name to a single UnitState. From 60c9c8007e5237ba1a0a67312a73f1c47633eab8 Mon Sep 17 00:00:00 2001 From: Dongsu Park Date: Thu, 24 Mar 2016 10:16:16 +0100 Subject: [PATCH 3/3] functional: add a new test TestUnitDestroyFromRegistry A new test TestUnitDestroyFromRegistry() checks for a submitted unit being actually deleted from the etcd registry. To compare the old unit body with the one registered in the etcd registry, we need to go through several steps for queries to etcd. --- functional/unit_action_test.go | 148 +++++++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 8a5d95122..1b337da08 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -15,6 +15,7 @@ package functional import ( + "encoding/json" "fmt" "io/ioutil" "os" @@ -26,6 +27,7 @@ import ( "github.com/coreos/fleet/functional/platform" "github.com/coreos/fleet/functional/util" + "github.com/coreos/fleet/unit" ) const ( @@ -678,3 +680,149 @@ func waitForNUnitsCmd(cl platform.Cluster, m platform.Member, cmd string, nu int // frequently than before, it's a huge pain to make it succeed every time. // The failure brings a negative impact on productivity. So remove the entire // test for now. - dpark 20160829 + +// TestUnitDestroyFromRegistry() checks for a submitted unit being removed +// from the etcd registry. It compares a local unit body with the unit in +// the etcd registry, to verify the body is identical. +func TestUnitDestroyFromRegistry(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy(t) + + m, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + t.Fatal(err) + } + + // submit a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, "submit", "fixtures/units/hello.service"); err != nil { + t.Fatalf("Unable to submit fleet unit: %v", err) + } + stdout, _, err := cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-units: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout) + } + + // cat the unit and compare it with the value in etcd registry + unitBody, _, err := cluster.Fleetctl(m, "cat", "hello.service") + if err != nil { + t.Fatalf("Unable to retrieve the fleet unit: %v", err) + } + + var hashUnit string + if hashUnit, err = retrieveJobObjectHash(cluster, "hello.service"); err != nil { + t.Fatalf("Failed to retrieve hash of job object hello.service: %v", err) + } + + var regBody string + if regBody, err = retrieveUnitBody(cluster, hashUnit); err != nil { + t.Fatalf("Failed to retrieve unit body for hello.service: %v", err) + } + + // compare it with unitBody + if regBody != unitBody { + t.Fatalf("Failed to verify fleet unit: %v", err) + } + + // destroy the unit again + if _, _, err := cluster.Fleetctl(m, "destroy", "hello.service"); err != nil { + t.Fatalf("Failed to destroy unit: %v", err) + } + + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-units: %v", err) + } + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) != 0 && len(units) != 1 { + t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout) + } + + // check for the unit being destroyed from the etcd registry, + // /fleet_functional/smoke/unit/. + // NOTE: do not check error of etcdctl, as it returns 4 on an empty list. + etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit") + etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit) + stdout, _, _ = util.RunEtcdctl("ls", etcdUnitPath) + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) != 0 && len(units) != 1 { + t.Fatalf("The unit still remains in the registry: %v") + } +} + +// retrieveJobObjectHash fetches the job hash value from +// /fleet_functional/smoke/job//object in the etcd registry. +func retrieveJobObjectHash(cluster platform.Cluster, jobName string) (hash string, err error) { + etcdJobPrefix := path.Join(cluster.Keyspace(), "job") + etcdJobPath := path.Join(etcdJobPrefix, jobName, "object") + + var stdout string + if stdout, _, err = util.RunEtcdctl("ls", etcdJobPath); err != nil { + return "", fmt.Errorf("Failed to list a unit from the registry: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) == 0 || len(units) == 0 { + return "", fmt.Errorf("No such unit in the registry: %v", err) + } + + stdout, _, err = util.RunEtcdctl("get", etcdJobPath) + stdout = strings.TrimSpace(stdout) + objectBody := strings.Split(stdout, "\n") + if err != nil || len(stdout) == 0 || len(objectBody) == 0 { + return "", fmt.Errorf("Failed to get unit from the registry: %v", err) + } + + type jobModel struct { + Name string + UnitHash unit.Hash + } + var jm jobModel + if err = json.Unmarshal([]byte(stdout), &jm); err != nil { + return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err) + } + + return jm.UnitHash.String(), nil +} + +// retrieveUnitBody fetches unit body from /fleet_functional/smoke/unit/ +// in the etcd registry. +func retrieveUnitBody(cluster platform.Cluster, hashUnit string) (regBody string, err error) { + etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit") + etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit) + + var stdout string + if stdout, _, err = util.RunEtcdctl("ls", etcdUnitPath); err != nil { + return "", fmt.Errorf("Failed to list a unit from the registry: %v", err) + } + + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) == 0 || len(units) == 0 { + return "", fmt.Errorf("No such unit in the registry: %v", err) + } + stdout, _, err = util.RunEtcdctl("get", etcdUnitPath) + stdout = strings.TrimSpace(stdout) + unitBody := strings.Split(stdout, "\n") + if err != nil || len(stdout) == 0 || len(unitBody) == 0 { + return "", fmt.Errorf("Failed to get unit from the registry: %v", err) + } + + type rawModel struct { + Raw string + } + + var rm rawModel + if err = json.Unmarshal([]byte(stdout), &rm); err != nil { + return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err) + } + return rm.Raw, nil +}