diff --git a/build-env b/build-env index f068b51bd..b2475b8c1 100755 --- a/build-env +++ b/build-env @@ -24,3 +24,4 @@ export PATH="${GOROOT}/bin:${PATH}" export FLEETD_BIN="${CDIR}/bin/fleetd" export FLEETCTL_BIN="${CDIR}/bin/fleetctl" export FLEETD_TEST_ENV="enable_grpc=false" +export ETCDCTL_BIN="/usr/bin/etcdctl" diff --git a/functional/platform/cluster.go b/functional/platform/cluster.go index e6cc54d49..900dd0d3d 100644 --- a/functional/platform/cluster.go +++ b/functional/platform/cluster.go @@ -42,6 +42,8 @@ type Cluster interface { WaitForNActiveUnits(Member, int) (map[string][]util.UnitState, error) WaitForNUnitFiles(Member, int) (map[string][]util.UnitFileState, error) WaitForNMachines(Member, int) ([]string, error) + + Keyspace() string } func CreateNClusterMembers(cl Cluster, count int) ([]Member, error) { diff --git a/functional/platform/nspawn.go b/functional/platform/nspawn.go index ac1577ace..38895d6f5 100644 --- a/functional/platform/nspawn.go +++ b/functional/platform/nspawn.go @@ -100,7 +100,7 @@ func (nc *nspawnCluster) nextID() string { return strconv.Itoa(nc.maxID) } -func (nc *nspawnCluster) keyspace() string { +func (nc *nspawnCluster) Keyspace() string { // TODO(jonboulle): generate this dynamically with atomic in order keys? return fmt.Sprintf("/fleet_functional/%s", nc.name) } @@ -367,7 +367,7 @@ func (nc *nspawnCluster) buildConfigDrive(dir, ip string) error { defer userFile.Close() etcd := "http://172.18.0.1:4001" - return util.BuildCloudConfig(userFile, ip, etcd, nc.keyspace()) + return util.BuildCloudConfig(userFile, ip, etcd, nc.Keyspace()) } func (nc *nspawnCluster) Members() []Member { @@ -601,7 +601,7 @@ func (nc *nspawnCluster) Destroy(t *testing.T) error { // TODO(bcwaldon): This returns 4 on success, but we can't easily // ignore just that return code. Ignore the returned error // altogether until this is fixed. - run("etcdctl rm --recursive " + nc.keyspace()) + run("etcdctl rm --recursive " + nc.Keyspace()) run("ip link del fleet0") diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 8a5d95122..1b337da08 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -15,6 +15,7 @@ package functional import ( + "encoding/json" "fmt" "io/ioutil" "os" @@ -26,6 +27,7 @@ import ( "github.com/coreos/fleet/functional/platform" "github.com/coreos/fleet/functional/util" + "github.com/coreos/fleet/unit" ) const ( @@ -678,3 +680,149 @@ func waitForNUnitsCmd(cl platform.Cluster, m platform.Member, cmd string, nu int // frequently than before, it's a huge pain to make it succeed every time. // The failure brings a negative impact on productivity. So remove the entire // test for now. - dpark 20160829 + +// TestUnitDestroyFromRegistry() checks for a submitted unit being removed +// from the etcd registry. It compares a local unit body with the unit in +// the etcd registry, to verify the body is identical. +func TestUnitDestroyFromRegistry(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy(t) + + m, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + t.Fatal(err) + } + + // submit a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, "submit", "fixtures/units/hello.service"); err != nil { + t.Fatalf("Unable to submit fleet unit: %v", err) + } + stdout, _, err := cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-units: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 1 { + t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout) + } + + // cat the unit and compare it with the value in etcd registry + unitBody, _, err := cluster.Fleetctl(m, "cat", "hello.service") + if err != nil { + t.Fatalf("Unable to retrieve the fleet unit: %v", err) + } + + var hashUnit string + if hashUnit, err = retrieveJobObjectHash(cluster, "hello.service"); err != nil { + t.Fatalf("Failed to retrieve hash of job object hello.service: %v", err) + } + + var regBody string + if regBody, err = retrieveUnitBody(cluster, hashUnit); err != nil { + t.Fatalf("Failed to retrieve unit body for hello.service: %v", err) + } + + // compare it with unitBody + if regBody != unitBody { + t.Fatalf("Failed to verify fleet unit: %v", err) + } + + // destroy the unit again + if _, _, err := cluster.Fleetctl(m, "destroy", "hello.service"); err != nil { + t.Fatalf("Failed to destroy unit: %v", err) + } + + stdout, _, err = cluster.Fleetctl(m, "list-units", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-units: %v", err) + } + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) != 0 && len(units) != 1 { + t.Fatalf("Did not find 1 unit in cluster: \n%s", stdout) + } + + // check for the unit being destroyed from the etcd registry, + // /fleet_functional/smoke/unit/. + // NOTE: do not check error of etcdctl, as it returns 4 on an empty list. + etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit") + etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit) + stdout, _, _ = util.RunEtcdctl("ls", etcdUnitPath) + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) != 0 && len(units) != 1 { + t.Fatalf("The unit still remains in the registry: %v") + } +} + +// retrieveJobObjectHash fetches the job hash value from +// /fleet_functional/smoke/job//object in the etcd registry. +func retrieveJobObjectHash(cluster platform.Cluster, jobName string) (hash string, err error) { + etcdJobPrefix := path.Join(cluster.Keyspace(), "job") + etcdJobPath := path.Join(etcdJobPrefix, jobName, "object") + + var stdout string + if stdout, _, err = util.RunEtcdctl("ls", etcdJobPath); err != nil { + return "", fmt.Errorf("Failed to list a unit from the registry: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) == 0 || len(units) == 0 { + return "", fmt.Errorf("No such unit in the registry: %v", err) + } + + stdout, _, err = util.RunEtcdctl("get", etcdJobPath) + stdout = strings.TrimSpace(stdout) + objectBody := strings.Split(stdout, "\n") + if err != nil || len(stdout) == 0 || len(objectBody) == 0 { + return "", fmt.Errorf("Failed to get unit from the registry: %v", err) + } + + type jobModel struct { + Name string + UnitHash unit.Hash + } + var jm jobModel + if err = json.Unmarshal([]byte(stdout), &jm); err != nil { + return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err) + } + + return jm.UnitHash.String(), nil +} + +// retrieveUnitBody fetches unit body from /fleet_functional/smoke/unit/ +// in the etcd registry. +func retrieveUnitBody(cluster platform.Cluster, hashUnit string) (regBody string, err error) { + etcdUnitPrefix := path.Join(cluster.Keyspace(), "unit") + etcdUnitPath := path.Join(etcdUnitPrefix, hashUnit) + + var stdout string + if stdout, _, err = util.RunEtcdctl("ls", etcdUnitPath); err != nil { + return "", fmt.Errorf("Failed to list a unit from the registry: %v", err) + } + + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(stdout) == 0 || len(units) == 0 { + return "", fmt.Errorf("No such unit in the registry: %v", err) + } + stdout, _, err = util.RunEtcdctl("get", etcdUnitPath) + stdout = strings.TrimSpace(stdout) + unitBody := strings.Split(stdout, "\n") + if err != nil || len(stdout) == 0 || len(unitBody) == 0 { + return "", fmt.Errorf("Failed to get unit from the registry: %v", err) + } + + type rawModel struct { + Raw string + } + + var rm rawModel + if err = json.Unmarshal([]byte(stdout), &rm); err != nil { + return "", fmt.Errorf("Failed to unmarshal fleet unit in the registry: %v", err) + } + return rm.Raw, nil +} diff --git a/functional/util/util.go b/functional/util/util.go index 7e690f6f2..7455d147a 100644 --- a/functional/util/util.go +++ b/functional/util/util.go @@ -28,6 +28,7 @@ import ( ) var fleetctlBinPath string +var etcdctlBinPath string func init() { fleetctlBinPath = os.Getenv("FLEETCTL_BIN") @@ -39,6 +40,15 @@ func init() { os.Exit(1) } + etcdctlBinPath = os.Getenv("ETCDCTL_BIN") + if etcdctlBinPath == "" { + fmt.Println("ETCDCTL_BIN environment variable must be set") + os.Exit(1) + } else if _, err := os.Stat(etcdctlBinPath); err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } + if os.Getenv("SSH_AUTH_SOCK") == "" { fmt.Println("SSH_AUTH_SOCK environment variable must be set") os.Exit(1) @@ -79,6 +89,38 @@ func RunFleetctlWithInput(input string, args ...string) (string, string, error) return stdoutBytes.String(), stderrBytes.String(), err } +func RunEtcdctl(args ...string) (string, string, error) { + log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " ")) + var stdoutBytes, stderrBytes bytes.Buffer + cmd := exec.Command(etcdctlBinPath, args...) + cmd.Stdout = &stdoutBytes + cmd.Stderr = &stderrBytes + err := cmd.Run() + return stdoutBytes.String(), stderrBytes.String(), err +} + +func RunEtcdctlWithInput(input string, args ...string) (string, string, error) { + log.Printf("%s %s", etcdctlBinPath, strings.Join(args, " ")) + var stdoutBytes, stderrBytes bytes.Buffer + cmd := exec.Command(etcdctlBinPath, args...) + cmd.Stdout = &stdoutBytes + cmd.Stderr = &stderrBytes + stdin, err := cmd.StdinPipe() + if err != nil { + return "", "", err + } + + if err = cmd.Start(); err != nil { + return "", "", err + } + + stdin.Write([]byte(input)) + stdin.Close() + err = cmd.Wait() + + return stdoutBytes.String(), stderrBytes.String(), err +} + // ActiveToSingleStates takes a map of active states (such as that returned by // WaitForNActiveUnits) and ensures that each unit has at most a single active // state. It returns a mapping of unit name to a single UnitState. diff --git a/registry/job.go b/registry/job.go index 4f4248a62..3fb50b013 100644 --- a/registry/job.go +++ b/registry/job.go @@ -18,7 +18,9 @@ import ( "errors" "fmt" "path" + "reflect" "sort" + "strings" etcd "github.com/coreos/etcd/client" "golang.org/x/net/context" @@ -285,6 +287,48 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc } +// getUnitFromPlainNode() takes a *etcd.Node containing a Unit's jobModel, and +// instantiates and returns a representative *job.Unit, transitively fetching +// the associated UnitFile as necessary. In contrast to getUnitFromObjectNode(), +// this does not use not.Value itself as a hash key, but it uses the last part +// of node.Key for the hash key. +func (r *EtcdRegistry) getUnitFromPlainNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) { + var err error + var jm jobModel + + if err = unmarshal(node.Value, &jm); err != nil { + return nil, err + } + + parts := strings.Split(node.Key, "/") + if len(parts) == 0 { + log.Errorf("key '%v' doesn't have enough parts", node.Key) + return nil, nil + } + stringHash := parts[len(parts)-1] + + hashKey, err := unit.HashFromHexString(stringHash) + if err != nil { + log.Errorf("cannot convert key string into hash. %v", err) + return nil, nil + } + + var unit *unit.UnitFile + + unit = unitHashLookupFunc(hashKey) + if unit == nil { + log.Warningf("No Unit found in Registry for Job(%s)", jm.Name) + return nil, nil + } + + ju := &job.Unit{ + Name: jm.Name, + Unit: *unit, + } + return ju, nil + +} + // jobModel is used for serializing and deserializing Jobs stored in the Registry type jobModel struct { Name string @@ -298,7 +342,12 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { opts := &etcd.DeleteOptions{ Recursive: true, } - _, err := r.kAPI.Delete(context.Background(), key, opts) + u, err := r.Unit(name) + if err != nil { + log.Warningf("r.Unit error, name=%s\n", name) + u = nil + } + _, err = r.kAPI.Delete(context.Background(), key, opts) if err != nil { if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { err = errors.New("job does not exist") @@ -308,9 +357,78 @@ func (r *EtcdRegistry) DestroyUnit(name string) error { } // TODO(jonboulle): add unit reference counting and actually destroying Units + + // Delete unit from the etcd registry + if u != nil { + // check if the unit is really valid. If not, return err. + key = r.hashedUnitPath(u.Unit.Hash()) + unitMatch, err := r.checkUnitMatch(u) + if err != nil { + return fmt.Errorf("Failed checking unit validity") + } + if unitMatch { + return fmt.Errorf("Invalid unit in the etcd registry: not deleting from registry.") + } + _, err = r.kAPI.Delete(context.Background(), key, opts) + if err != nil { + // NOTE: unable to delete the key, but it's practically no big deal, + // as the garbage will be later cleaned up on the etcd side. + return fmt.Errorf("Failed deleting unit from registry: %s", err) + } + } return nil } +// checkUnitMatch() determines if the given unit is a really valid entry in the +// etcd registry, by querying the entries via RPC. +func (r *EtcdRegistry) checkUnitMatch(unitDel *job.Unit) (unitMatch bool, err error) { + key := path.Join(r.keyPrefix, unitPrefix) + opts := &etcd.GetOptions{ + Recursive: true, + } + res, err := r.kAPI.Get(context.Background(), key, opts) + if err != nil { + if isEtcdError(err, etcd.ErrorCodeKeyNotFound) { + err = nil + } + return false, err + } + + return r.checkUnitSiblings(unitDel, res.Node) +} + +// checkUnitSiblings() returns true if there's a duplicated entry already in +// the etcd registry. +func (r *EtcdRegistry) checkUnitSiblings(unitDel *job.Unit, dir *etcd.Node) (bool, error) { + uhashKey := dir.Key + unitDelName := r.hashedUnitPath(unitDel.Unit.Hash()) + for _, uhashNode := range dir.Nodes { + newUnit, err := r.getUnitFromPlainNode(uhashNode, r.getUnitByHash) + if err != nil { + log.Errorf("cannot get unit. err: %v", err) + return false, err + } + if newUnit == nil { + log.Debugf("unable to parse Unit in Registry at key %s", uhashKey) + continue + } + + if unitDelName == uhashNode.Key { + log.Debugf("skipping the entry itself.") + continue + } + + if reflect.DeepEqual(unitDel.Unit.Contents, newUnit.Unit.Contents) { + // matched. so this unit has a duplicated entry, so return + log.Debugf("won't erase this key, as a duplicated entry is found.") + return true, nil + } + } + + log.Debugf("no matching entry, so it can be removed.") + return false, nil +} + // CreateUnit attempts to store a Unit and its associated unit file in the registry func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { if err := r.storeOrGetUnitFile(u.Unit); err != nil {