From 6d1495282581a4c49b8489465e929560202ded3b Mon Sep 17 00:00:00 2001 From: Aviel Segev Date: Tue, 17 Mar 2026 17:31:10 +0200 Subject: [PATCH] NO-JIRA | fix: refactor inspector to use pipelines Signed-off-by: Aviel Segev --- api/v1/openapi.yaml | 23 - api/v1/spec.gen.go | 17 - api/v1/types.gen.go | 3 - docs/filter-by-expression.md | 7 - internal/handlers/doc.go | 2 +- internal/handlers/handlers.go | 6 +- internal/handlers/handlers_suite_test.go | 6 +- internal/handlers/vms.go | 39 +- internal/handlers/vms_test.go | 115 +--- internal/models/inspection.go | 4 + internal/models/inspector.go | 32 - internal/services/doc.go | 2 +- internal/services/inspection.go | 245 +++++++ internal/services/inspection_test.go | 150 +++++ internal/services/inspector.go | 326 +++------ internal/services/inspector_test.go | 628 +++++------------- internal/services/manager.go | 10 +- internal/store/doc.go | 1 - internal/store/inspection.go | 197 ------ internal/store/inspection_filters.go | 108 --- internal/store/migrations/migrations_test.go | 20 - .../sql/006_inspection_status_table.sql | 3 + internal/store/store.go | 6 - internal/store/vm.go | 7 - internal/store/vm_filter_test.go | 3 - internal/store/vm_queries.go | 4 - pkg/errors/errors.go | 19 +- pkg/errors/errors_test.go | 13 - pkg/filter/doc.go | 4 - pkg/filter/sql.go | 6 - pkg/filter/sql_test.go | 2 - pkg/vmware/work_builder.go | 80 --- test/vms.go | 14 - 33 files changed, 708 insertions(+), 1394 deletions(-) create mode 100644 internal/services/inspection.go create mode 100644 internal/services/inspection_test.go delete mode 100644 internal/store/inspection.go delete mode 100644 internal/store/inspection_filters.go create mode 100644 internal/store/migrations/sql/006_inspection_status_table.sql delete mode 100644 pkg/vmware/work_builder.go diff --git a/api/v1/openapi.yaml b/api/v1/openapi.yaml index 736f86ff..2de6aa0d 100644 --- a/api/v1/openapi.yaml +++ b/api/v1/openapi.yaml @@ -267,29 +267,6 @@ paths: description: Inspector already running '500': description: Internal server error - patch: - summary: Add more VMs to inspection queue - operationId: addVMsToInspection - requestBody: - required: true - content: - application/json: - schema: - $ref: '#/components/schemas/VMIdArray' - example: ["vm-1236", "vm-1237"] - responses: - '202': - description: VMs added to inspection queue - content: - application/json: - schema: - $ref: '#/components/schemas/InspectorStatus' - '400': - description: Invalid request - '404': - description: Inspector not running - '500': - description: Internal server error delete: summary: Stop inspector entirely operationId: stopInspection diff --git a/api/v1/spec.gen.go b/api/v1/spec.gen.go index c0f4307f..904f6f60 100644 --- a/api/v1/spec.gen.go +++ b/api/v1/spec.gen.go @@ -64,9 +64,6 @@ type ServerInterface interface { // Get inspector status // (GET /vms/inspector) GetInspectorStatus(c *gin.Context) - // Add more VMs to inspection queue - // (PATCH /vms/inspector) - AddVMsToInspection(c *gin.Context) // Start inspection for VMs // (POST /vms/inspector) StartInspection(c *gin.Context) @@ -458,19 +455,6 @@ func (siw *ServerInterfaceWrapper) GetInspectorStatus(c *gin.Context) { siw.Handler.GetInspectorStatus(c) } -// AddVMsToInspection operation middleware -func (siw *ServerInterfaceWrapper) AddVMsToInspection(c *gin.Context) { - - for _, middleware := range siw.HandlerMiddlewares { - middleware(c) - if c.IsAborted() { - return - } - } - - siw.Handler.AddVMsToInspection(c) -} - // StartInspection operation middleware func (siw *ServerInterfaceWrapper) StartInspection(c *gin.Context) { @@ -600,7 +584,6 @@ func RegisterHandlersWithOptions(router gin.IRouter, si ServerInterface, options router.GET(options.BaseURL+"/vms", wrapper.GetVMs) router.DELETE(options.BaseURL+"/vms/inspector", wrapper.StopInspection) router.GET(options.BaseURL+"/vms/inspector", wrapper.GetInspectorStatus) - router.PATCH(options.BaseURL+"/vms/inspector", wrapper.AddVMsToInspection) router.POST(options.BaseURL+"/vms/inspector", wrapper.StartInspection) router.GET(options.BaseURL+"/vms/:id", wrapper.GetVM) router.DELETE(options.BaseURL+"/vms/:id/inspector", wrapper.RemoveVMFromInspection) diff --git a/api/v1/types.gen.go b/api/v1/types.gen.go index 9fde6136..e216c3e2 100644 --- a/api/v1/types.gen.go +++ b/api/v1/types.gen.go @@ -549,8 +549,5 @@ type UpdateGroupJSONRequestBody = UpdateGroupRequest // PostVddkMultipartRequestBody defines body for PostVddk for multipart/form-data ContentType. type PostVddkMultipartRequestBody PostVddkMultipartBody -// AddVMsToInspectionJSONRequestBody defines body for AddVMsToInspection for application/json ContentType. -type AddVMsToInspectionJSONRequestBody = VMIdArray - // StartInspectionJSONRequestBody defines body for StartInspection for application/json ContentType. type StartInspectionJSONRequestBody = InspectorStartRequest diff --git a/docs/filter-by-expression.md b/docs/filter-by-expression.md index 9dcede66..1d3a6ec5 100644 --- a/docs/filter-by-expression.md +++ b/docs/filter-by-expression.md @@ -263,13 +263,6 @@ Identifiers are **case-insensitive**. Dotted names refer to joined tables (e.g. | `concern.category` | string | Category | | `concern.assessment` | string | Assessment | -### vm_inspection_status (inspection.*) - -| Identifier | Type | Description (backing column) | -|----------------------|--------|-----------------------------| -| `inspection.status` | string | Inspection status | -| `inspection.error` | string | Inspection error | - ### vcpu (cpu.*) — CPU attributes | Identifier | Type | Description (backing column) | diff --git a/internal/handlers/doc.go b/internal/handlers/doc.go index e1c28f9e..c56eb891 100644 --- a/internal/handlers/doc.go +++ b/internal/handlers/doc.go @@ -182,7 +182,7 @@ // // The byExpression parameter accepts a filter DSL expression that can reference // any column across all joined tables (vinfo, vdisk, concerns, vcpu, vmemory, -// vnetwork, vdatastore, vm_inspection_status). See pkg/filter for the grammar +// vnetwork, vdatastore). See pkg/filter for the grammar // and docs/filter-by-expression.md for field mappings and examples. // // Valid Sort Fields: diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 73767136..a485c6d8 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -36,11 +36,11 @@ type VMService interface { // InspectorService defines the interface for deep inspector operations. type InspectorService interface { Start(ctx context.Context, vmIDs []string, cred *models.Credentials) error - Add(ctx context.Context, vmIDs []string) error + Add(id string) error GetStatus() models.InspectorStatus - GetVmStatus(ctx context.Context, id string) (models.InspectionStatus, error) + GetVmStatus(id string) (models.InspectionStatus, error) IsBusy() bool - CancelVmsInspection(ctx context.Context, vmIDs ...string) error + Cancel(id string) error Stop(ctx context.Context) error } diff --git a/internal/handlers/handlers_suite_test.go b/internal/handlers/handlers_suite_test.go index 376c2afb..98f87f61 100644 --- a/internal/handlers/handlers_suite_test.go +++ b/internal/handlers/handlers_suite_test.go @@ -121,7 +121,7 @@ func (m *MockInspectorService) Start(ctx context.Context, vmIDs []string, cred * return m.StartError } -func (m *MockInspectorService) Add(ctx context.Context, vmIDs []string) error { +func (m *MockInspectorService) Add(id string) error { m.AddCallCount++ return m.AddError } @@ -131,12 +131,12 @@ func (m *MockInspectorService) GetStatus() models.InspectorStatus { return m.GetStatusResult } -func (m *MockInspectorService) GetVmStatus(ctx context.Context, id string) (models.InspectionStatus, error) { +func (m *MockInspectorService) GetVmStatus(id string) (models.InspectionStatus, error) { m.GetVmStatusCallCount++ return m.GetVmStatusResult, m.GetVmStatusError } -func (m *MockInspectorService) CancelVmsInspection(ctx context.Context, vmIDs ...string) error { +func (m *MockInspectorService) Cancel(id string) error { m.CancelVmsInspectionCallCount++ return m.CancelVmsInspectionError } diff --git a/internal/handlers/vms.go b/internal/handlers/vms.go index f586ec0a..1f295de6 100644 --- a/internal/handlers/vms.go +++ b/internal/handlers/vms.go @@ -94,6 +94,12 @@ func (h *Handler) GetVMs(c *gin.Context, params v1.GetVMsParams) { // Map to API response apiVMs := make([]v1.VirtualMachine, 0, len(vms)) for _, vm := range vms { + s, err := h.inspectorSrv.GetVmStatus(vm.ID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get VM status: %v", err)}) + return + } + vm.Status = s apiVMs = append(apiVMs, v1.NewVirtualMachineFromSummary(vm)) } @@ -124,7 +130,7 @@ func (h *Handler) GetVM(c *gin.Context, id string) { // GetVMInspectionStatus returns the inspection status for a specific VM // (GET /vms/{id}/inspector) func (h *Handler) GetVMInspectionStatus(c *gin.Context, id string) { - s, err := h.inspectorSrv.GetVmStatus(c.Request.Context(), id) + s, err := h.inspectorSrv.GetVmStatus(id) if err != nil { if srvErrors.IsResourceNotFoundError(err) { c.JSON(http.StatusNotFound, v1.VmInspectionStatus{State: v1.VmInspectionStatusStateNotFound}) @@ -140,7 +146,7 @@ func (h *Handler) GetVMInspectionStatus(c *gin.Context, id string) { // RemoveVMFromInspection removes VM from inspection queue // (DELETE /vms/{id}/inspector) func (h *Handler) RemoveVMFromInspection(c *gin.Context, id string) { - if err := h.inspectorSrv.CancelVmsInspection(c.Request.Context(), id); err != nil { + if err := h.inspectorSrv.Cancel(id); err != nil { if srvErrors.IsInspectorNotRunningError(err) { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return @@ -149,7 +155,7 @@ func (h *Handler) RemoveVMFromInspection(c *gin.Context, id string) { return } - s, err := h.inspectorSrv.GetVmStatus(c.Request.Context(), id) + s, err := h.inspectorSrv.GetVmStatus(id) if err != nil { if srvErrors.IsResourceNotFoundError(err) { c.JSON(http.StatusNotFound, v1.VmInspectionStatus{State: v1.VmInspectionStatusStateNotFound}) @@ -196,33 +202,6 @@ func (h *Handler) StartInspection(c *gin.Context) { c.JSON(http.StatusAccepted, v1.InspectorStatus{State: v1.InspectorStatusStateInitiating}) } -// AddVMsToInspection adds more VMs to inspection queue -// (PATCH /vms/inspector) -func (h *Handler) AddVMsToInspection(c *gin.Context) { - var vmsMoid v1.VMIdArray - if err := c.ShouldBindJSON(&vmsMoid); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - if len(vmsMoid) == 0 { - c.JSON(http.StatusBadRequest, gin.H{"error": "no vms provided"}) - return - } - - if err := h.inspectorSrv.Add(c.Request.Context(), vmsMoid); err != nil { - if srvErrors.IsInspectorNotRunningError(err) { - c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) - return - } - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - c.JSON(http.StatusAccepted, v1.NewInspectorStatus(h.inspectorSrv.GetStatus())) - -} - // StopInspection stops inspector entirely // (DELETE /vms/inspector) func (h *Handler) StopInspection(c *gin.Context) { diff --git a/internal/handlers/vms_test.go b/internal/handlers/vms_test.go index e248bd1b..f8593478 100644 --- a/internal/handlers/vms_test.go +++ b/internal/handlers/vms_test.go @@ -50,7 +50,6 @@ var _ = Describe("VMs Handlers", func() { }) router.GET("/vms/inspector", handler.GetInspectorStatus) router.POST("/vms/inspector", handler.StartInspection) - router.PATCH("/vms/inspector", handler.AddVMsToInspection) router.DELETE("/vms/inspector", handler.StopInspection) router.GET("/vms/:id/inspector", func(c *gin.Context) { handler.GetVMInspectionStatus(c, c.Param("id")) @@ -445,65 +444,6 @@ var _ = Describe("VMs Handlers", func() { Expect(response.State).To(Equal(v1.InspectorStatusStateInitiating)) }) - // Given an invalid JSON request body - // When we try to add VMs to inspection - // Then it should return 400 Bad Request - It("AddVMsToInspection should return 400 for invalid JSON", func() { - // Arrange - req := httptest.NewRequest(http.MethodPatch, "/vms/inspector", strings.NewReader("invalid json")) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - - // Act - router.ServeHTTP(w, req) - - // Assert - Expect(w.Code).To(Equal(http.StatusBadRequest)) - var body map[string]any - Expect(json.Unmarshal(w.Body.Bytes(), &body)).To(Succeed()) - Expect(body["error"]).NotTo(BeEmpty()) - }) - - // Given an empty VM list in the request - // When we try to add VMs to inspection - // Then it should return 400 Bad Request - It("AddVMsToInspection should return 400 for empty VM list", func() { - // Arrange - reqBody := `[]` - req := httptest.NewRequest(http.MethodPatch, "/vms/inspector", strings.NewReader(reqBody)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - - // Act - router.ServeHTTP(w, req) - - // Assert - Expect(w.Code).To(Equal(http.StatusBadRequest)) - var body map[string]any - Expect(json.Unmarshal(w.Body.Bytes(), &body)).To(Succeed()) - Expect(body["error"]).To(Equal("no vms provided")) - }) - - // Given a running inspector and a valid VM list - // When we add VMs to inspection - // Then it should return 202 Accepted - It("AddVMsToInspection should add VMs successfully", func() { - // Arrange - mockInspector.GetStatusResult = models.InspectorStatus{ - State: models.InspectorStateRunning, - } - body := `["vm-1","vm-2"]` - req := httptest.NewRequest(http.MethodPatch, "/vms/inspector", strings.NewReader(body)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - - // Act - router.ServeHTTP(w, req) - - // Assert - Expect(w.Code).To(Equal(http.StatusAccepted)) - }) - // Given a running inspector // When we stop the inspection // Then it should return 202 Accepted @@ -611,7 +551,7 @@ var _ = Describe("VMs Handlers", func() { Expect(response.State).To(Equal(v1.VmInspectionStatusStateCanceled)) }) - // Given CancelVmsInspection returns an error + // Given Cancel returns an error // When we remove a VM from inspection // Then it should return 500 Internal Server Error It("RemoveVMFromInspection should return 500 when cancel fails", func() { @@ -700,24 +640,6 @@ var _ = Describe("VMs Handlers", func() { Expect(body["error"]).To(Equal("failed to start inspector: start failed")) }) - It("AddVMsToInspection should return 400 when add fails", func() { - // Arrange - mockInspector.AddError = errors.New("inspector not running") - reqBody := `["vm-1","vm-2"]` - req := httptest.NewRequest(http.MethodPatch, "/vms/inspector", strings.NewReader(reqBody)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - - // Act - router.ServeHTTP(w, req) - - // Assert - Expect(w.Code).To(Equal(http.StatusBadRequest)) - var body map[string]any - Expect(json.Unmarshal(w.Body.Bytes(), &body)).To(Succeed()) - Expect(body["error"]).To(Equal("inspector not running")) - }) - It("RemoveVMFromInspection should return 404 when inspector not running", func() { mockInspector.CancelVmsInspectionError = srvErrors.NewInspectorNotRunningError() @@ -732,21 +654,6 @@ var _ = Describe("VMs Handlers", func() { Expect(body["error"]).To(Equal("inspector not running")) }) - It("AddVMsToInspection should return 404 when inspector not running", func() { - mockInspector.AddError = srvErrors.NewInspectorNotRunningError() - body := `["vm-1","vm-2"]` - req := httptest.NewRequest(http.MethodPatch, "/vms/inspector", strings.NewReader(body)) - req.Header.Set("Content-Type", "application/json") - w := httptest.NewRecorder() - - router.ServeHTTP(w, req) - - Expect(w.Code).To(Equal(http.StatusNotFound)) - var respBody map[string]any - Expect(json.Unmarshal(w.Body.Bytes(), &respBody)).To(Succeed()) - Expect(respBody["error"]).To(Equal("inspector not running")) - }) - It("StopInspection should return 404 when inspector not running", func() { mockInspector.StopError = srvErrors.NewInspectorNotRunningError() @@ -804,12 +711,13 @@ var _ = Describe("Version Handler", func() { var _ = Describe("VMs Handlers Integration", func() { var ( - ctx context.Context - db *sql.DB - st *store.Store - vmSrv *services.VMService - handler *handlers.Handler - router *gin.Engine + ctx context.Context + db *sql.DB + st *store.Store + vmSrv *services.VMService + mockInspector *MockInspectorService + handler *handlers.Handler + router *gin.Engine ) BeforeEach(func() { @@ -831,7 +739,12 @@ var _ = Describe("VMs Handlers Integration", func() { Expect(err).NotTo(HaveOccurred()) vmSrv = services.NewVMService(st) - handler = handlers.NewHandler(config.Configuration{}).WithVMService(vmSrv) + mockInspector = &MockInspectorService{ + GetVmStatusResult: models.InspectionStatus{State: models.InspectionStateNotFound}, + } + handler = handlers.NewHandler(config.Configuration{}). + WithVMService(vmSrv). + WithInspectorService(mockInspector) router = gin.New() router.GET("/vms", func(c *gin.Context) { var params v1.GetVMsParams diff --git a/internal/models/inspection.go b/internal/models/inspection.go index c60ffacb..1681beff 100644 --- a/internal/models/inspection.go +++ b/internal/models/inspection.go @@ -38,3 +38,7 @@ type InspectionStatus struct { State InspectionState Error error } + +// InspectionResult is the shared result struct threaded through inspection work units. +// InspectionResult Todo: pass here data between inspection phase to saving step +type InspectionResult struct{} diff --git a/internal/models/inspector.go b/internal/models/inspector.go index 38869424..270c27f9 100644 --- a/internal/models/inspector.go +++ b/internal/models/inspector.go @@ -1,12 +1,5 @@ package models -import ( - "context" - "time" - - "go.uber.org/zap" -) - // InspectorState represents the current state of the Inspector. type InspectorState string @@ -32,28 +25,3 @@ type InspectorStatus struct { State InspectorState Error error } - -type InspectorWorkBuilder interface { - Build(string) []InspectorWorkUnit -} - -// InspectorWorkUnit represents a unit of work in the collector workflow. -type InspectorWorkUnit struct { - Work func() func(ctx context.Context) (any, error) -} - -type UnimplementedInspectorWorkBuilder struct{} - -func (u UnimplementedInspectorWorkBuilder) Build(id string) []InspectorWorkUnit { - return []InspectorWorkUnit{ - { - Work: func() func(ctx context.Context) (any, error) { - return func(ctx context.Context) (any, error) { - time.Sleep(10 * time.Second) - zap.S().Named("inspector_service").Infof("unimplemented work finsished for: %s", id) - return nil, nil - } - }, - }, - } -} diff --git a/internal/services/doc.go b/internal/services/doc.go index bdbff26c..fbb2e1f3 100644 --- a/internal/services/doc.go +++ b/internal/services/doc.go @@ -188,7 +188,7 @@ // Filtering: // - A single filter DSL expression (byExpression) that can reference any column // across all joined tables (vinfo, vdisk, concerns, vcpu, vmemory, vnetwork, -// vdatastore, vm_inspection_status). See pkg/filter for the grammar and field mappings. +// vdatastore). See pkg/filter for the grammar and field mappings. // // Sorting: // - Multiple sort fields with direction control (ascending/descending) diff --git a/internal/services/inspection.go b/internal/services/inspection.go new file mode 100644 index 00000000..e3fdae9f --- /dev/null +++ b/internal/services/inspection.go @@ -0,0 +1,245 @@ +package services + +import ( + "context" + "errors" + + srvErrors "github.com/kubev2v/assisted-migration-agent/pkg/errors" + "github.com/kubev2v/assisted-migration-agent/pkg/vmware" + + "go.uber.org/zap" + + "github.com/kubev2v/assisted-migration-agent/internal/models" + "github.com/kubev2v/assisted-migration-agent/pkg/scheduler" +) + +type ( + inspectionPipeline = WorkPipeline[models.InspectionStatus, models.InspectionResult] + inspectionWorkBuilder func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] +) + +type InspectionService struct { + scheduler *scheduler.Scheduler[models.InspectionResult] + buildFn inspectionWorkBuilder + pipelines map[string]*inspectionPipeline + operator vmware.VMOperator +} + +// NewInspectionService creates a new InspectionService with the default vmware builder. +func NewInspectionService() *InspectionService { + return &InspectionService{} +} + +func (i *InspectionService) WithWorkUnitsBuilder(builder inspectionWorkBuilder) *InspectionService { + i.buildFn = builder + return i +} + +func (i *InspectionService) add(id string) error { + _, found := i.pipelines[id] + + if found && i.pipelines[id] != nil && i.pipelines[id].IsRunning() { + return srvErrors.NewInspectionInProgressError() + } + + pipeline, err := i.startVmPipeline(id) + if err != nil { + return err + } + + i.pipelines[id] = pipeline + + return nil +} + +func (i *InspectionService) cancelVmInspection(id string) { + if p, ok := i.pipelines[id]; ok { + p.Stop() + } +} + +func (i *InspectionService) isBusy() bool { + for _, p := range i.pipelines { + if p.IsRunning() { + return true + } + } + + return false +} + +// getVmStatus returns the current vm inspectionSvc status. +func (i *InspectionService) getVmStatus(id string) (models.InspectionStatus, error) { + pipeline, found := i.pipelines[id] + + if !found { + return models.InspectionStatus{State: models.InspectionStateNotFound}, nil + } + + state := pipeline.State() + if state.Err != nil { + if errors.Is(state.Err, errPipelineStopped) { + return models.InspectionStatus{State: models.InspectionStateCanceled, Error: state.Err}, nil + } + return models.InspectionStatus{State: models.InspectionStateError, Error: state.Err}, nil + } + + if pipeline.IsRunning() { + return state.State, nil + } + + return models.InspectionStatus{State: models.InspectionStateCompleted}, nil +} + +func (i *InspectionService) start(operator *vmware.VMManager, vmIDs []string) error { + i.operator = operator + + sched := scheduler.NewScheduler[models.InspectionResult](5) + i.scheduler = sched + + if i.buildFn == nil { + i.buildFn = i.buildInspectionWorkUnits + } + + i.pipelines = make(map[string]*inspectionPipeline) + + for _, id := range vmIDs { + pipeline, err := i.startVmPipeline(id) + if err != nil { + return err + } + i.pipelines[id] = pipeline + } + + return nil +} + +func (i *InspectionService) startVmPipeline(id string) (*inspectionPipeline, error) { + pipeline := NewWorkPipeline(models.InspectionStatus{State: models.InspectionStatePending}, i.scheduler, i.buildFn(id)) + if err := pipeline.Start(); err != nil { + i.pipelines[id].state = WorkPipelineStatus[models.InspectionStatus, models.InspectionResult]{ + State: models.InspectionStatus{State: models.InspectionStateError, Error: err}, + Err: err, + } + return nil, err + } + + return pipeline, nil +} + +func (i *InspectionService) stop() { + for _, pipeline := range i.pipelines { + p := pipeline + if p != nil { + p.Stop() + } + } + + s := i.scheduler + i.scheduler = nil + if s != nil { + s.Close() + } +} + +func (i *InspectionService) buildInspectionWorkUnits(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + err := i.validate(ctx, id) + return result, err + }, + }, + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + err := i.createSnapshot(ctx, id) + return result, err + }, + }, + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + err := i.inspect(id) + return result, err + }, + }, + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + err := i.save(ctx, id) + return result, err + }, + }, + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + err := i.removeSnapshot(ctx, id) + return result, err + }, + }, + } +} + +func (i *InspectionService) validate(ctx context.Context, id string) error { + return i.operator.ValidatePrivileges(ctx, id, models.RequiredPrivileges) +} + +func (i *InspectionService) createSnapshot(ctx context.Context, id string) error { + zap.S().Named("inspector_service").Infow("creating VM snapshot", "vmId", id) + req := vmware.CreateSnapshotRequest{ + VmId: id, + SnapshotName: models.InspectionSnapshotName, + Description: "", + Memory: false, + Quiesce: false, + } + + if err := i.operator.CreateSnapshot(ctx, req); err != nil { + zap.S().Named("inspector_service").Errorw("failed to create VM snapshot", "vmId", id, "error", err) + return err + } + + zap.S().Named("inspector_service").Infow("VM snapshot created", "vmId", id) + + return nil +} + +func (i *InspectionService) inspect(id string) error { + return nil +} + +func (i *InspectionService) save(ctx context.Context, id string) error { + return nil +} + +func (i *InspectionService) removeSnapshot(ctx context.Context, id string) error { + + zap.S().Named("inspector_service").Infow("removing VM snapshot", "vmId", id) + + removeSnapReq := vmware.RemoveSnapshotRequest{ + VmId: id, + SnapshotName: models.InspectionSnapshotName, + Consolidate: true, + } + + if err := i.operator.RemoveSnapshot(ctx, removeSnapReq); err != nil { + zap.S().Named("inspector_service").Errorw("failed to remove VM snapshot", "vmId", id, "error", err) + return err + } + + zap.S().Named("inspector_service").Infow("VM snapshot removed", "vmId", id) + + return nil +} diff --git a/internal/services/inspection_test.go b/internal/services/inspection_test.go new file mode 100644 index 00000000..6e074599 --- /dev/null +++ b/internal/services/inspection_test.go @@ -0,0 +1,150 @@ +package services + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kubev2v/assisted-migration-agent/internal/models" + "github.com/kubev2v/assisted-migration-agent/pkg/vmware" +) + +var _ = Describe("InspectionService", func() { + Describe("getVmStatus", func() { + It("returns NotFound when no pipelines exist", func() { + svc := NewInspectionService() + status, err := svc.getVmStatus("vm-1") + Expect(err).NotTo(HaveOccurred()) + Expect(status.State).To(Equal(models.InspectionStateNotFound)) + }) + + It("returns pipeline state after start", func() { + svc := NewInspectionService().WithWorkUnitsBuilder(func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + return result, nil + }, + }, + } + }) + + err := svc.start(nil, []string{"vm-1"}) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() models.InspectionState { + status, _ := svc.getVmStatus("vm-1") + return status.State + }).Should(Equal(models.InspectionStateCompleted)) + }) + }) + + Describe("add", func() { + It("creates pipelines for new VM IDs", func() { + svc := NewInspectionService().WithWorkUnitsBuilder(func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + return result, nil + }, + }, + } + }) + + err := svc.start(nil, []string{"vm-1"}) + Expect(err).NotTo(HaveOccurred()) + + err = svc.add("vm-2") + Expect(err).NotTo(HaveOccurred()) + + err = svc.add("vm-3") + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-2") + return s.State + }).Should(Equal(models.InspectionStateCompleted)) + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-3") + return s.State + }).Should(Equal(models.InspectionStateCompleted)) + }) + }) + + Describe("cancelVmInspection", func() { + It("stops specified pipelines", func() { + var block sync.WaitGroup + block.Add(1) + svc := NewInspectionService().WithWorkUnitsBuilder(func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + block.Wait() + return result, nil + }, + }, + } + }) + + err := svc.start(nil, []string{"vm-1", "vm-2"}) + Expect(err).NotTo(HaveOccurred()) + + svc.cancelVmInspection("vm-1") + + block.Done() + + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-1") + return s.State + }).Should(Equal(models.InspectionStateCanceled)) + + // vm-2 should still complete + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-2") + return s.State + }).Should(Equal(models.InspectionStateCompleted)) + }) + }) + + Describe("start", func() { + It("stores operator and creates pipelines for given IDs", func() { + svc := NewInspectionService().WithWorkUnitsBuilder(func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + return result, nil + }, + }, + } + }) + + err := svc.start((*vmware.VMManager)(nil), []string{"vm-a", "vm-b"}) + Expect(err).NotTo(HaveOccurred()) + + Expect(svc.operator).To(BeNil()) + + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-a") + return s.State + }).Should(Equal(models.InspectionStateCompleted)) + Eventually(func() models.InspectionState { + s, _ := svc.getVmStatus("vm-b") + return s.State + }).Should(Equal(models.InspectionStateCompleted)) + }) + }) +}) diff --git a/internal/services/inspector.go b/internal/services/inspector.go index f801bf8c..3285b323 100644 --- a/internal/services/inspector.go +++ b/internal/services/inspector.go @@ -2,183 +2,158 @@ package services import ( "context" - "database/sql" - "errors" - "fmt" "sync" "time" "github.com/vmware/govmomi" - "github.com/kubev2v/assisted-migration-agent/internal/store" "github.com/kubev2v/assisted-migration-agent/pkg/vmware" "go.uber.org/zap" "github.com/kubev2v/assisted-migration-agent/internal/models" srvErrors "github.com/kubev2v/assisted-migration-agent/pkg/errors" - "github.com/kubev2v/assisted-migration-agent/pkg/scheduler" ) type InspectorService struct { - scheduler *scheduler.Scheduler[any] - store *store.Store - builder models.InspectorWorkBuilder - - status models.InspectorStatus - - mu sync.Mutex - - done chan any - - vsphereClient *govmomi.Client - cancel context.CancelFunc + mu sync.Mutex cred *models.Credentials + vsphereClient *govmomi.Client + inspectionSvc *InspectionService + status models.InspectorStatus + stop chan struct{} } // NewInspectorService creates a new InspectorService with the default vmware builder. -func NewInspectorService(s *scheduler.Scheduler[any], store *store.Store) *InspectorService { +func NewInspectorService() *InspectorService { return &InspectorService{ - scheduler: s, - status: models.InspectorStatus{State: models.InspectorStateReady}, - store: store, + status: models.InspectorStatus{State: models.InspectorStateReady}, } } // GetStatus returns the current inspector status. -func (c *InspectorService) GetStatus() models.InspectorStatus { - c.mu.Lock() - defer c.mu.Unlock() +func (i *InspectorService) GetStatus() models.InspectorStatus { + i.mu.Lock() + defer i.mu.Unlock() - return c.status + return i.status } -// GetVmStatus returns the current vm inspection status. -func (c *InspectorService) GetVmStatus(ctx context.Context, id string) (models.InspectionStatus, error) { - s, err := c.store.Inspection().Get(ctx, id) +func (i *InspectorService) GetVmStatus(id string) (models.InspectionStatus, error) { + if i.inspectionSvc == nil { + return models.InspectionStatus{State: models.InspectionStateNotFound}, nil + } + + i.mu.Lock() + defer i.mu.Unlock() + s, err := i.inspectionSvc.getVmStatus(id) if err != nil { - return models.InspectionStatus{}, err + return s, err } - return *s, nil + + return s, nil } -func (c *InspectorService) Start(ctx context.Context, vmIDs []string, cred *models.Credentials) error { - if c.IsBusy() { - return fmt.Errorf("deep inspector already in progress") +func (i *InspectorService) Start(ctx context.Context, vmIDs []string, cred *models.Credentials) error { + i.mu.Lock() + defer i.mu.Unlock() + + if i.isBusyWithoutMutex() { + return srvErrors.NewInspectionInProgressError() } - c.setState(models.InspectorStateInitiating) + i.setState(models.InspectorStateInitiating) zap.S().Infow("starting inspector", "vmCount", len(vmIDs)) vClient, err := vmware.NewVsphereClient(ctx, cred.URL, cred.Username, cred.Password, true) if err != nil { zap.S().Named("inspector_service").Errorw("failed to connect to vSphere", "error", err) - c.setErrorStatus(err) + i.setState(models.InspectorStateReady) return err } zap.S().Named("inspector_service").Info("vSphere connection established") - c.vsphereClient = vClient - c.cred = cred - if c.builder == nil { - c.builder = vmware.NewInspectorWorkBuilder(vmware.NewVMManager(vClient, cred.Username)) - } + i.vsphereClient = vClient + i.cred = cred - if err := c.store.Inspection().DeleteAll(ctx); err != nil { - c.setErrorStatus(err) - return fmt.Errorf("failed to clear vms inspection table: %w", err) + if i.inspectionSvc == nil { + i.inspectionSvc = NewInspectionService() } - if err := c.store.Inspection().Add(ctx, vmIDs, models.InspectionStatePending); err != nil { - c.setErrorStatus(err) - return fmt.Errorf("failed to init inspection table: %w", err) + if err := i.initInspectionPipelines(vmIDs); err != nil { + i.inspectionSvc.stop() + _ = i.closeVsphereClient(ctx) + i.setState(models.InspectorStateReady) + return err } - runCtx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - c.done = make(chan any) + i.stop = make(chan struct{}) - go c.run(runCtx, c.done) + go i.run(context.Background()) return nil } -func (c *InspectorService) Add(ctx context.Context, vmIDs []string) error { - if !c.IsBusy() { - return srvErrors.NewInspectorNotRunningError() - } +func (i *InspectorService) Add(id string) error { + i.mu.Lock() + defer i.mu.Unlock() - if c.GetStatus().State == models.InspectorStateCanceling { - return fmt.Errorf("inspector already canceling") - } - - if len(vmIDs) == 0 { - return fmt.Errorf("vmIDs is empty") + if !i.isBusyWithoutMutex() { + return srvErrors.NewInspectorNotRunningError() } - if err := c.store.Inspection().Add(ctx, vmIDs, models.InspectionStatePending); err != nil { - return fmt.Errorf("failed to add VMs to inspection queue: %w", err) + if err := i.inspectionSvc.add(id); err != nil { + return err } return nil } -func (c *InspectorService) Stop(ctx context.Context) error { - if !c.IsBusy() { - return srvErrors.NewInspectorNotRunningError() - } - - c.setState(models.InspectorStateCanceling) +func (i *InspectorService) Stop(ctx context.Context) error { + i.mu.Lock() + defer i.mu.Unlock() - // Cancel pending VMs before waiting for the goroutine to finish - // This ensures VMs are marked as canceled even if the goroutine finishes quickly - if err := c.CancelVmsInspection(ctx); err != nil { - return fmt.Errorf("failed to update inspection table: %w", err) + if !i.isBusyWithoutMutex() { + return srvErrors.NewInspectorNotRunningError() } - c.mu.Lock() - cancel := c.cancel - done := c.done - c.mu.Unlock() - - if cancel != nil { - cancel() - } + i.stop <- struct{}{} - if done != nil { - <-done - } + i.setState(models.InspectorStateCanceling) - c.setState(models.InspectorStateCanceled) - zap.S().Info("inspector stopped") + i.inspectionSvc.stop() return nil } -func (c *InspectorService) CancelVmsInspection(ctx context.Context, vmIDs ...string) error { - if !c.IsBusy() { +func (i *InspectorService) Cancel(id string) error { + i.mu.Lock() + defer i.mu.Unlock() + if !i.isBusyWithoutMutex() { return srvErrors.NewInspectorNotRunningError() } - filter := store.NewInspectionUpdateFilter().ByStatus(models.InspectionStatePending) + i.inspectionSvc.cancelVmInspection(id) - if len(vmIDs) > 0 { - filter = filter.ByVmIDs(vmIDs...) - } + return nil +} - err := c.store.Inspection().Update(ctx, filter, models.InspectionStatus{ - State: models.InspectionStateCanceled, - }) - if err != nil { - return fmt.Errorf("failed to update inspection table: %w", err) - } +func (i *InspectorService) WithInspectionService(svc *InspectionService) *InspectorService { + i.inspectionSvc = svc + return i +} - return nil +func (i *InspectorService) IsBusy() bool { + i.mu.Lock() + isBusy := i.isBusyWithoutMutex() + i.mu.Unlock() + + return isBusy } -func (c *InspectorService) IsBusy() bool { - switch c.GetStatus().State { +func (i *InspectorService) isBusyWithoutMutex() bool { + switch i.status.State { case models.InspectorStateReady, models.InspectorStateCompleted, models.InspectorStateError, models.InspectorStateCanceled: return false default: @@ -186,143 +161,60 @@ func (c *InspectorService) IsBusy() bool { } } -func (c *InspectorService) WithBuilder(builder models.InspectorWorkBuilder) *InspectorService { - c.builder = builder - return c +func (i *InspectorService) initInspectionPipelines(ids []string) error { + return i.inspectionSvc.start(vmware.NewVMManager(i.vsphereClient, i.cred.Username), ids) } -func (c *InspectorService) run(ctx context.Context, done chan any) { - defer close(done) - defer func() { - cleanupCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() +func (i *InspectorService) run(ctx context.Context) { + i.setState(models.InspectorStateRunning) - c.mu.Lock() - if c.done == done { - c.cancel = nil - c.done = nil - } - c.mu.Unlock() - - c.closeVsphereClient(cleanupCtx) + ticker := time.NewTicker(5 * time.Second) + defer func() { + ticker.Stop() + _ = i.closeVsphereClient(ctx) + close(i.stop) }() - c.setState(models.InspectorStateRunning) - zap.S().Debugw("inspector changed state", "state", c.GetStatus().State) + canceled := false for { - id, err := c.store.Inspection().First(ctx) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - break // no more pending works - } - zap.S().Errorw("failed to get first pending inspection", "error", err) - c.setErrorStatus(err) - return - } - - if err := c.setVmState(ctx, id, models.InspectionStateRunning); err != nil { - zap.S().Errorf("failed to set vm status to running: %v", err) - c.setErrorStatus(err) + select { + case <-ctx.Done(): + i.setErrorStatus(context.Canceled) return - } - - if err := c.runVMWork(ctx, id, c.builder.Build(id)); err != nil { - var e *srvErrors.InspectorWorkError - switch { - case errors.As(err, &e): - if setError := c.setVmErrorStatus(ctx, id, err); setError != nil { - c.setErrorStatus(err) - return + case <-ticker.C: + i.mu.Lock() + idle := !i.inspectionSvc.isBusy() + i.mu.Unlock() + if idle { + if canceled { + i.setState(models.InspectorStateCanceled) + + } else { + i.setState(models.InspectorStateCompleted) } - continue // VM failed, move to next VM - case errors.Is(err, context.Canceled): - c.setState(models.InspectorStateCanceled) - return - default: - c.setErrorStatus(err) return } + case <-i.stop: + canceled = true } - - if err := c.setVmState(ctx, id, models.InspectionStateCompleted); err != nil { - zap.S().Errorf("failed to set vm status to completed: %v", err) - c.setErrorStatus(err) - return - } - - zap.S().Debugw("VM inspection completed", "vmID", id) } - - c.setState(models.InspectorStateCompleted) - zap.S().Info("inspector finished work") } -func (c *InspectorService) runVMWork(ctx context.Context, id string, units []models.InspectorWorkUnit) error { - for _, unit := range units { - - future := c.scheduler.AddWork(func(ctx context.Context) (any, error) { - return unit.Work()(ctx) - }) - - select { - // Todo: handle the context done case. we may want to run some cleanup tasks - case <-ctx.Done(): - future.Stop() - return context.Canceled - - case result := <-future.C(): - if result.Err != nil { - zap.S().Errorw("VM inspection failed", "vmID", id, "error", result.Err) - return srvErrors.NewInspectorWorkError("work finished with error: %s", result.Err.Error()) - } - } - } - return nil +func (i *InspectorService) closeVsphereClient(ctx context.Context) error { + logoutCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + return i.vsphereClient.Logout(logoutCtx) } -func (c *InspectorService) closeVsphereClient(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() - if c.vsphereClient != nil { - _ = c.vsphereClient.Logout(ctx) - c.vsphereClient = nil - } +func (i *InspectorService) setState(s models.InspectorState) { + i.status.State = s + i.status.Error = nil } -func (c *InspectorService) setState(s models.InspectorState) { - c.mu.Lock() - defer c.mu.Unlock() - c.status.State = s - c.status.Error = nil -} - -func (c *InspectorService) setErrorStatus(err error) { - c.mu.Lock() - defer c.mu.Unlock() - - c.status = models.InspectorStatus{ +func (i *InspectorService) setErrorStatus(err error) { + i.status = models.InspectorStatus{ State: models.InspectorStateError, Error: err, } } - -func (c *InspectorService) setVmState(ctx context.Context, vmID string, s models.InspectionState) error { - if err := c.store.Inspection().Update(ctx, store.NewInspectionUpdateFilter().ByVmIDs(vmID), - models.InspectionStatus{State: s}); err != nil { - return fmt.Errorf("updating vm %s in store: %w", vmID, err) - } - - return nil -} - -func (c *InspectorService) setVmErrorStatus(ctx context.Context, vmID string, err error) error { - if err := c.store.Inspection().Update(ctx, store.NewInspectionUpdateFilter().ByVmIDs(vmID), models.InspectionStatus{ - State: models.InspectionStateError, - Error: err, - }); err != nil { - return fmt.Errorf("updating vm %s in store: %w", vmID, err) - } - - return nil -} diff --git a/internal/services/inspector_test.go b/internal/services/inspector_test.go index 9db2d9cf..be3e3b0a 100644 --- a/internal/services/inspector_test.go +++ b/internal/services/inspector_test.go @@ -15,8 +15,6 @@ import ( "github.com/kubev2v/assisted-migration-agent/internal/store" "github.com/kubev2v/assisted-migration-agent/internal/store/migrations" srvErrors "github.com/kubev2v/assisted-migration-agent/pkg/errors" - "github.com/kubev2v/assisted-migration-agent/pkg/scheduler" - "github.com/kubev2v/assisted-migration-agent/test" ) // getVCenterCredentials returns test credentials for vCenter. @@ -29,74 +27,70 @@ func getVCenterCredentials() *models.Credentials { } } -// testsMockInspectorWorkBuilder implements models.InspectorWorkBuilder for testing -type testsMockInspectorWorkBuilder struct { - vmWorkErr map[string]error // per-VM errors - workDelay time.Duration +// mockInspectionBuilder provides a configurable inspectionWorkBuilder for tests (per-VM inspection work units). +type mockInspectionBuilder struct { + delay time.Duration + vmErrors map[string]error inspected []string mu sync.Mutex } -func newMockInspectorWorkBuilder() *testsMockInspectorWorkBuilder { - return &testsMockInspectorWorkBuilder{ - vmWorkErr: make(map[string]error), - inspected: make([]string, 0), - } -} - -func (m *testsMockInspectorWorkBuilder) withVmError(vmID string, err error) *testsMockInspectorWorkBuilder { - m.vmWorkErr[vmID] = err +func (m *mockInspectionBuilder) withWorkDelay(d time.Duration) *mockInspectionBuilder { + m.delay = d return m } -func (m *testsMockInspectorWorkBuilder) withWorkDelay(d time.Duration) *testsMockInspectorWorkBuilder { - m.workDelay = d +func (m *mockInspectionBuilder) withVmError(vmID string, err error) *mockInspectionBuilder { + if m.vmErrors == nil { + m.vmErrors = make(map[string]error) + } + m.vmErrors[vmID] = err return m } -func (m *testsMockInspectorWorkBuilder) getInspectedVMs() []string { +func (m *mockInspectionBuilder) getInspectedVMs() []string { m.mu.Lock() defer m.mu.Unlock() - result := make([]string, len(m.inspected)) - copy(result, m.inspected) - return result + return append([]string(nil), m.inspected...) } -func (m *testsMockInspectorWorkBuilder) Build(vmID string) []models.InspectorWorkUnit { - return []models.InspectorWorkUnit{ - { - Work: func() func(ctx context.Context) (any, error) { - return func(ctx context.Context) (any, error) { - if m.workDelay > 0 { +func (m *mockInspectionBuilder) builder() func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return []models.WorkUnit[models.InspectionStatus, models.InspectionResult]{ + { + Status: func() models.InspectionStatus { + return models.InspectionStatus{State: models.InspectionStateRunning} + }, + Work: func(ctx context.Context, result models.InspectionResult) (models.InspectionResult, error) { + if m.delay > 0 { select { + case <-time.After(m.delay): case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(m.workDelay): + return result, ctx.Err() } } - - if err, ok := m.vmWorkErr[vmID]; ok && err != nil { - return nil, err + if err, ok := m.vmErrors[id]; ok && err != nil { + return result, err } - m.mu.Lock() - m.inspected = append(m.inspected, vmID) + m.inspected = append(m.inspected, id) m.mu.Unlock() - - return nil, nil - } + return result, nil + }, }, - }, + } } } +func newMockInspectionBuilder() *mockInspectionBuilder { + return &mockInspectionBuilder{} +} + var _ = Describe("InspectorService", func() { var ( - ctx context.Context - db *sql.DB - st *store.Store - sched *scheduler.Scheduler[any] - srv *services.InspectorService + ctx context.Context + db *sql.DB + srv *services.InspectorService ) // Helper to insert test VMs into vinfo table @@ -123,18 +117,13 @@ var _ = Describe("InspectorService", func() { insertVM("vm-2", "test-vm-2") insertVM("vm-3", "test-vm-3") - st = store.NewStore(db, test.NewMockValidator()) - sched = scheduler.NewScheduler[any](1) - srv = services.NewInspectorService(sched, st) + srv = services.NewInspectorService() }) AfterEach(func() { if srv != nil { _ = srv.Stop(ctx) } - if sched != nil { - sched.Close() - } if db != nil { _ = db.Close() } @@ -157,7 +146,7 @@ var _ = Describe("InspectorService", func() { Context("when inspector is not started", func() { It("should return InspectorNotRunningError when trying to add VMs", func() { - err := srv.Add(ctx, []string{"vm-1", "vm-2"}) + err := srv.Add("vm-1") Expect(err).To(HaveOccurred()) var notRunningErr *srvErrors.InspectorNotRunningError @@ -170,11 +159,12 @@ var _ = Describe("InspectorService", func() { // Insert an initial VM for starting the inspector insertVM("vm-0", "test-vm-0") - // Use a mock builder with delay to keep inspector running - builder := newMockInspectorWorkBuilder().withWorkDelay(1 * time.Second) - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + // Use mock inspection service with delay so per-VM pipelines stay running and inspector stays in Running state + builder := newMockInspectionBuilder().withWorkDelay(1 * time.Second) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) - // Start inspector with vm-0 (will stay running due to delay) + // Start inspector with vm-0 (will stay running due to inspection delay) err := srv.Start(ctx, []string{"vm-0"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) @@ -184,61 +174,53 @@ var _ = Describe("InspectorService", func() { }).Should(Equal(models.InspectorStateRunning)) }) - It("should add VMs to inspection table with pending status", func() { - err := srv.Add(ctx, []string{"vm-1", "vm-2", "vm-3"}) + It("should add VMs to inspection queue with pending status", func() { + err := srv.Add("vm-1") + Expect(err).NotTo(HaveOccurred()) + err = srv.Add("vm-2") + Expect(err).NotTo(HaveOccurred()) + err = srv.Add("vm-3") Expect(err).NotTo(HaveOccurred()) - // Verify added VMs are in DB with pending status + // Verify added VMs are in pipelines (pending or running) for _, vmID := range []string{"vm-1", "vm-2", "vm-3"} { - status, err := st.Inspection().Get(ctx, vmID) + status, err := srv.GetVmStatus(vmID) Expect(err).NotTo(HaveOccurred()) - Expect(status.State).To(Equal(models.InspectionStatePending)) + Expect(status.State).To(Or( + Equal(models.InspectionStatePending), + Equal(models.InspectionStateRunning), + )) } }) - It("should not duplicate VMs when adding same VM twice", func() { - err := srv.Add(ctx, []string{"vm-1", "vm-2"}) + It("should return err when adding same VM twice", func() { + err := srv.Add("vm-1") Expect(err).NotTo(HaveOccurred()) - - err = srv.Add(ctx, []string{"vm-2", "vm-3"}) - Expect(err).NotTo(HaveOccurred()) - - // Should have vm-0 (from Start) + vm-1, vm-2, vm-3 (from Add) = 4 total - statuses, err := st.Inspection().List(ctx, nil) + err = srv.Add("vm-2") Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(4)) - }) - - It("should return error for empty VM list", func() { - // Get current count before adding empty list - before, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - - err = srv.Add(ctx, []string{}) + err = srv.Add("vm-2") Expect(err).To(HaveOccurred()) - - // Count should not change - after, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(len(after)).To(Equal(len(before))) + var srvErr *srvErrors.OperationInProgressError + Expect(errors.As(err, &srvErr)).To(BeTrue()) }) }) }) Describe("GetVmStatus", func() { - It("should return error for non-existent VM", func() { - _, err := srv.GetVmStatus(ctx, "non-existent") - Expect(err).To(HaveOccurred()) - Expect(srvErrors.IsResourceNotFoundError(err)).To(BeTrue()) + It("should return NotFound state for non-existent VM when inspector never started", func() { + status, err := srv.GetVmStatus("non-existent") + Expect(err).NotTo(HaveOccurred()) + Expect(status.State).To(Equal(models.InspectionStateNotFound)) }) It("should return VM inspection status after adding", func() { // Insert an initial VM and start inspector insertVM("vm-0", "test-vm-0") - // Use a mock builder with delay to keep inspector running - builder := newMockInspectorWorkBuilder().withWorkDelay(1 * time.Second) - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + // Use mock inspection service with delay to keep inspector running + builder := newMockInspectionBuilder().withWorkDelay(1 * time.Second) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-0"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) @@ -247,28 +229,31 @@ var _ = Describe("InspectorService", func() { return srv.GetStatus().State }).Should(Equal(models.InspectorStateRunning)) - err = srv.Add(ctx, []string{"vm-1"}) + err = srv.Add("vm-1") Expect(err).NotTo(HaveOccurred()) - status, err := srv.GetVmStatus(ctx, "vm-1") + status, err := srv.GetVmStatus("vm-1") Expect(err).NotTo(HaveOccurred()) - Expect(status.State).To(Equal(models.InspectionStatePending)) + Expect(status.State).To(Or( + Equal(models.InspectionStatePending), + Equal(models.InspectionStateRunning), + )) }) }) - Describe("CancelVmsInspection", func() { + Describe("Cancel", func() { Context("when inspector is not started", func() { It("should return InspectorNotRunningError when trying to cancel VMs", func() { - err := srv.CancelVmsInspection(ctx, "vm-1", "vm-2") + err := srv.Cancel("vm-2") Expect(err).To(HaveOccurred()) var notRunningErr *srvErrors.InspectorNotRunningError Expect(errors.As(err, ¬RunningErr)).To(BeTrue()) }) - It("should return InspectorNotRunningError when trying to cancel all VMs", func() { - err := srv.CancelVmsInspection(ctx) + It("should return InspectorNotRunningError when trying to stop inspector", func() { + err := srv.Stop(ctx) Expect(err).To(HaveOccurred()) var notRunningErr *srvErrors.InspectorNotRunningError @@ -281,9 +266,10 @@ var _ = Describe("InspectorService", func() { // Insert an initial VM for starting the inspector insertVM("vm-0", "test-vm-0") - // Use a mock builder with delay to keep inspector running - builder := newMockInspectorWorkBuilder().withWorkDelay(1 * time.Second) - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + // Use mock inspection service with delay to keep inspector running + builder := newMockInspectionBuilder().withWorkDelay(1 * time.Second) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) // Start inspector with vm-0 (will stay running due to delay) err := srv.Start(ctx, []string{"vm-0"}, getVCenterCredentials()) @@ -295,117 +281,91 @@ var _ = Describe("InspectorService", func() { }).Should(Equal(models.InspectorStateRunning)) // Add VMs to the inspection queue - err = srv.Add(ctx, []string{"vm-1", "vm-2", "vm-3"}) + err = srv.Add("vm-1") + Expect(err).NotTo(HaveOccurred()) + err = srv.Add("vm-2") + Expect(err).NotTo(HaveOccurred()) + err = srv.Add("vm-3") Expect(err).NotTo(HaveOccurred()) }) It("should cancel specific pending VMs", func() { - err := srv.CancelVmsInspection(ctx, "vm-2") + err := srv.Cancel("vm-2") Expect(err).NotTo(HaveOccurred()) // Check vm-2 status is canceled - status, err := st.Inspection().Get(ctx, "vm-2") + status, err := srv.GetVmStatus("vm-2") Expect(err).NotTo(HaveOccurred()) Expect(status.State).To(Equal(models.InspectionStateCanceled)) - // Other VMs should still be pending - status1, err := st.Inspection().Get(ctx, "vm-1") + // Other VMs should still be pending or running + status1, err := srv.GetVmStatus("vm-1") Expect(err).NotTo(HaveOccurred()) - Expect(status1.State).To(Equal(models.InspectionStatePending)) + Expect(status1.State).To(Or( + Equal(models.InspectionStatePending), + Equal(models.InspectionStateRunning), + )) - status3, err := st.Inspection().Get(ctx, "vm-3") + status3, err := srv.GetVmStatus("vm-3") Expect(err).NotTo(HaveOccurred()) - Expect(status3.State).To(Equal(models.InspectionStatePending)) + Expect(status3.State).To(Or( + Equal(models.InspectionStatePending), + Equal(models.InspectionStateRunning), + )) }) It("should cancel multiple specific VMs", func() { - err := srv.CancelVmsInspection(ctx, "vm-1", "vm-3") - Expect(err).NotTo(HaveOccurred()) - - // Check vm-1 and vm-3 are canceled - status1, err := st.Inspection().Get(ctx, "vm-1") + err := srv.Cancel("vm-3") Expect(err).NotTo(HaveOccurred()) - Expect(status1.State).To(Equal(models.InspectionStateCanceled)) - status3, err := st.Inspection().Get(ctx, "vm-3") + status3, err := srv.GetVmStatus("vm-3") Expect(err).NotTo(HaveOccurred()) Expect(status3.State).To(Equal(models.InspectionStateCanceled)) - // vm-2 should still be pending - status2, err := st.Inspection().Get(ctx, "vm-2") - Expect(err).NotTo(HaveOccurred()) - Expect(status2.State).To(Equal(models.InspectionStatePending)) - }) - - It("should cancel all pending VMs when no specific IDs provided", func() { - err := srv.CancelVmsInspection(ctx) - Expect(err).NotTo(HaveOccurred()) - - // vm-1, vm-2, vm-3 should be canceled (vm-0 is running, not pending) - statuses, err := st.Inspection().List(ctx, store.NewInspectionQueryFilter().ByStatus(models.InspectionStateCanceled)) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(3)) - }) - - It("should not cancel already completed VMs", func() { - // Mark vm-1 as completed - err := st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-1"), - models.InspectionStatus{State: models.InspectionStateCompleted}) - Expect(err).NotTo(HaveOccurred()) - - // Cancel all pending - err = srv.CancelVmsInspection(ctx) + // vm-2 should still be pending or running + status2, err := srv.GetVmStatus("vm-2") Expect(err).NotTo(HaveOccurred()) - - // vm-1 should still be completed (not canceled) - status1, err := st.Inspection().Get(ctx, "vm-1") - Expect(err).NotTo(HaveOccurred()) - Expect(status1.State).To(Equal(models.InspectionStateCompleted)) - - // vm-2 and vm-3 should be canceled - status2, err := st.Inspection().Get(ctx, "vm-2") - Expect(err).NotTo(HaveOccurred()) - Expect(status2.State).To(Equal(models.InspectionStateCanceled)) + Expect(status2.State).To(Or( + Equal(models.InspectionStatePending), + Equal(models.InspectionStateRunning), + )) }) }) }) Describe("Start", func() { - var ( - builder *testsMockInspectorWorkBuilder - ) - It("should complete inspection successfully for single VM", func() { - builder = newMockInspectorWorkBuilder() - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + builder := newMockInspectionBuilder() + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-1"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) // Verify VM was inspected Expect(builder.getInspectedVMs()).To(ContainElement("vm-1")) - // Verify VM status is completed in DB - status, err := st.Inspection().Get(ctx, "vm-1") + // Verify VM status is completed + status, err := srv.GetVmStatus("vm-1") Expect(err).NotTo(HaveOccurred()) Expect(status.State).To(Equal(models.InspectionStateCompleted)) }) It("should complete inspection successfully for multiple VMs", func() { - builder = newMockInspectorWorkBuilder() - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + builder := newMockInspectionBuilder() + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-1", "vm-2", "vm-3"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) // Verify all VMs were inspected inspected := builder.getInspectedVMs() @@ -413,24 +373,8 @@ var _ = Describe("InspectorService", func() { Expect(inspected).To(ContainElements("vm-1", "vm-2", "vm-3")) }) - It("should process VMs in sequence order", func() { - builder = newMockInspectorWorkBuilder() - srv = services.NewInspectorService(sched, st).WithBuilder(builder) - - err := srv.Start(ctx, []string{"vm-1", "vm-2", "vm-3"}, getVCenterCredentials()) - Expect(err).NotTo(HaveOccurred()) - - Eventually(func() models.InspectorState { - return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) - - // VMs should be processed in order they were added - Expect(builder.getInspectedVMs()).To(Equal([]string{"vm-1", "vm-2", "vm-3"})) - }) - It("should return error for invalid cred", func() { // Use invalid credentials to trigger connection error - // The builder's init error won't be triggered since connection happens first invalidCreds := &models.Credentials{ URL: "https://invalid-host:8989/sdk", Username: "invalid", @@ -439,8 +383,6 @@ var _ = Describe("InspectorService", func() { err := srv.Start(ctx, []string{"vm-1"}, invalidCreds) Expect(err).To(HaveOccurred()) - // The error could be "connection refused", "no such host", "timeout", etc. - // Just check that it's a connection-related error errMsg := err.Error() Expect(errMsg).To(Or( ContainSubstring("connection refused"), @@ -451,37 +393,39 @@ var _ = Describe("InspectorService", func() { ContainSubstring("dial tcp"), )) + // Pipeline is never set when Start fails on connect, so status stays Ready status := srv.GetStatus() - Expect(status.State).To(Equal(models.InspectorStateError)) - Expect(status.Error).NotTo(BeNil()) + Expect(status.State).To(Equal(models.InspectorStateReady)) }) It("should mark VM as error when inspection fails and continue with next VM", func() { - builder = newMockInspectorWorkBuilder().withVmError("vm-1", errors.New("inspection failed")) - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + builder := newMockInspectionBuilder().withVmError("vm-1", errors.New("inspection failed")) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-1", "vm-2"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) // Check vm-1 status is error - status1, err := st.Inspection().Get(ctx, "vm-1") + status1, err := srv.GetVmStatus("vm-1") Expect(err).NotTo(HaveOccurred()) Expect(status1.State).To(Equal(models.InspectionStateError)) Expect(status1.Error).NotTo(BeNil()) // Check vm-2 status is completed (should continue after vm-1 error) - status2, err := st.Inspection().Get(ctx, "vm-2") + status2, err := srv.GetVmStatus("vm-2") Expect(err).NotTo(HaveOccurred()) Expect(status2.State).To(Equal(models.InspectionStateCompleted)) }) It("should clear previous inspection data on new start", func() { - builder = newMockInspectorWorkBuilder() - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + builder := newMockInspectionBuilder() + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) // First run err := srv.Start(ctx, []string{"vm-1"}, getVCenterCredentials()) @@ -489,27 +433,30 @@ var _ = Describe("InspectorService", func() { Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) err = srv.Start(ctx, []string{"vm-2", "vm-3"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) - // Should only have vm-2 and vm-3, not vm-1 - statuses, err := st.Inspection().List(ctx, nil) + // Should only have vm-2 and vm-3 in pipelines; vm-1 from first run is gone + status1, _ := srv.GetVmStatus("vm-1") + Expect(status1.State).To(Equal(models.InspectionStateNotFound)) + status2, err := srv.GetVmStatus("vm-2") Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(2)) - Expect(statuses).To(HaveKey("vm-2")) - Expect(statuses).To(HaveKey("vm-3")) - Expect(statuses).NotTo(HaveKey("vm-1")) + Expect(status2.State).To(Equal(models.InspectionStateCompleted)) + status3, err := srv.GetVmStatus("vm-3") + Expect(err).NotTo(HaveOccurred()) + Expect(status3.State).To(Equal(models.InspectionStateCompleted)) }) It("should be busy while running", func() { - builder = newMockInspectorWorkBuilder().withWorkDelay(100 * time.Millisecond) - srv = services.NewInspectorService(sched, st).WithBuilder(builder) + builder := newMockInspectionBuilder().withWorkDelay(100 * time.Millisecond) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-1"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) @@ -522,16 +469,18 @@ var _ = Describe("InspectorService", func() { // Wait for completion Eventually(func() models.InspectorState { return srv.GetStatus().State - }).Should(Equal(models.InspectorStateCompleted)) + }, time.Second*10).Should(Equal(models.InspectorStateCompleted)) // Should not be busy after completion Expect(srv.IsBusy()).To(BeFalse()) }) }) - Describe("CancelInspector", func() { + Describe("Stop", func() { It("should stop inspector and cancel all pending VMs", func() { - srv = services.NewInspectorService(sched, st) + builder := newMockInspectionBuilder().withWorkDelay(1 * time.Second) + inspectionSvc := services.NewInspectionService().WithWorkUnitsBuilder(builder.builder()) + srv = services.NewInspectorService().WithInspectionService(inspectionSvc) err := srv.Start(ctx, []string{"vm-1", "vm-2", "vm-3"}, getVCenterCredentials()) Expect(err).NotTo(HaveOccurred()) @@ -541,13 +490,15 @@ var _ = Describe("InspectorService", func() { return srv.GetStatus().State }).Should(Equal(models.InspectorStateRunning)) - // Cancel inspector + // Stop inspector err = srv.Stop(ctx) Expect(err).NotTo(HaveOccurred()) + Expect(srv.GetStatus().State).To(Equal(models.InspectorStateCanceling)) // Inspector should be in canceled state - status := srv.GetStatus() - Expect(status.State).To(Equal(models.InspectorStateCanceled)) + Eventually(func() models.InspectorState { + return srv.GetStatus().State + }, 10*time.Second).To(Equal(models.InspectorStateCanceled)) // Should not be busy Expect(srv.IsBusy()).To(BeFalse()) @@ -555,274 +506,3 @@ var _ = Describe("InspectorService", func() { }) }) - -var _ = Describe("InspectionStore", func() { - var ( - ctx context.Context - db *sql.DB - st *store.Store - ) - - // Helper to insert test VMs into vinfo table - insertVM := func(id, name string) { - _, err := db.ExecContext(ctx, ` - INSERT INTO vinfo ("VM ID", "VM", "Powerstate", "Cluster", "Memory") - VALUES (?, ?, 'poweredOn', 'cluster-a', 4096) - `, id, name) - Expect(err).NotTo(HaveOccurred()) - } - - BeforeEach(func() { - ctx = context.Background() - - var err error - db, err = store.NewDB(nil, ":memory:") - Expect(err).NotTo(HaveOccurred()) - - err = migrations.Run(ctx, db) - Expect(err).NotTo(HaveOccurred()) - - // Insert test VMs into vinfo (required for foreign key constraint) - insertVM("vm-1", "test-vm-1") - insertVM("vm-2", "test-vm-2") - insertVM("vm-3", "test-vm-3") - insertVM("vm-a", "test-vm-a") - insertVM("vm-b", "test-vm-b") - insertVM("vm-c", "test-vm-c") - - st = store.NewStore(db, test.NewMockValidator()) - }) - - AfterEach(func() { - if db != nil { - _ = db.Close() - } - }) - - Describe("Add", func() { - It("should add VMs with pending status", func() { - err := st.Inspection().Add(ctx, []string{"vm-1", "vm-2"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - statuses, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(2)) - Expect(statuses["vm-1"].State).To(Equal(models.InspectionStatePending)) - Expect(statuses["vm-2"].State).To(Equal(models.InspectionStatePending)) - }) - - It("should ignore duplicates on conflict", func() { - err := st.Inspection().Add(ctx, []string{"vm-1"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - // Try to add same VM again - err = st.Inspection().Add(ctx, []string{"vm-1"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - statuses, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(1)) - }) - - It("should handle empty list", func() { - err := st.Inspection().Add(ctx, []string{}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - }) - }) - - Describe("Get", func() { - BeforeEach(func() { - err := st.Inspection().Add(ctx, []string{"vm-1"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should return inspection status for existing VM", func() { - status, err := st.Inspection().Get(ctx, "vm-1") - Expect(err).NotTo(HaveOccurred()) - Expect(status.State).To(Equal(models.InspectionStatePending)) - }) - - It("should return error for non-existent VM", func() { - _, err := st.Inspection().Get(ctx, "non-existent") - Expect(err).To(HaveOccurred()) - Expect(srvErrors.IsResourceNotFoundError(err)).To(BeTrue()) - }) - }) - - Describe("List", func() { - BeforeEach(func() { - err := st.Inspection().Add(ctx, []string{"vm-1", "vm-2", "vm-3"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - // Mark vm-2 as completed - err = st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-2"), - models.InspectionStatus{State: models.InspectionStateCompleted}) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should list all inspections without filter", func() { - statuses, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(3)) - }) - - It("should filter by status", func() { - statuses, err := st.Inspection().List(ctx, - store.NewInspectionQueryFilter().ByStatus(models.InspectionStatePending)) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(2)) - Expect(statuses).To(HaveKey("vm-1")) - Expect(statuses).To(HaveKey("vm-3")) - }) - - It("should filter by VM IDs", func() { - statuses, err := st.Inspection().List(ctx, - store.NewInspectionQueryFilter().ByVmIDs("vm-1", "vm-2")) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(2)) - Expect(statuses).To(HaveKey("vm-1")) - Expect(statuses).To(HaveKey("vm-2")) - }) - - It("should apply limit", func() { - statuses, err := st.Inspection().List(ctx, - store.NewInspectionQueryFilter().Limit(2)) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(2)) - }) - }) - - Describe("First", func() { - It("should return first pending VM by sequence", func() { - // Add VMs in order - err := st.Inspection().Add(ctx, []string{"vm-1"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - err = st.Inspection().Add(ctx, []string{"vm-2"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - err = st.Inspection().Add(ctx, []string{"vm-3"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - first, err := st.Inspection().First(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(first).To(Equal("vm-1")) - }) - - It("should skip non-pending VMs", func() { - err := st.Inspection().Add(ctx, []string{"vm-1", "vm-2", "vm-3"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - - // Mark vm-1 as completed - err = st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-1"), - models.InspectionStatus{State: models.InspectionStateCompleted}) - Expect(err).NotTo(HaveOccurred()) - - first, err := st.Inspection().First(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(first).To(Equal("vm-2")) - }) - - It("should return error when no pending VMs", func() { - _, err := st.Inspection().First(ctx) - Expect(err).To(HaveOccurred()) - Expect(errors.Is(err, sql.ErrNoRows)).To(BeTrue()) - }) - }) - - Describe("Update", func() { - BeforeEach(func() { - err := st.Inspection().Add(ctx, []string{"vm-1", "vm-2", "vm-3"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should update status for specific VM", func() { - err := st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-1"), - models.InspectionStatus{State: models.InspectionStateRunning}) - Expect(err).NotTo(HaveOccurred()) - - status, err := st.Inspection().Get(ctx, "vm-1") - Expect(err).NotTo(HaveOccurred()) - Expect(status.State).To(Equal(models.InspectionStateRunning)) - }) - - It("should update status with error", func() { - testErr := errors.New("inspection failed") - err := st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-1"), - models.InspectionStatus{State: models.InspectionStateError, Error: testErr}) - Expect(err).NotTo(HaveOccurred()) - - status, err := st.Inspection().Get(ctx, "vm-1") - Expect(err).NotTo(HaveOccurred()) - Expect(status.State).To(Equal(models.InspectionStateError)) - Expect(status.Error).NotTo(BeNil()) - Expect(status.Error.Error()).To(Equal("inspection failed")) - }) - - It("should update multiple VMs by status", func() { - err := st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByStatus(models.InspectionStatePending), - models.InspectionStatus{State: models.InspectionStateCanceled}) - Expect(err).NotTo(HaveOccurred()) - - statuses, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - for _, s := range statuses { - Expect(s.State).To(Equal(models.InspectionStateCanceled)) - } - }) - }) - - Describe("DeleteAll", func() { - BeforeEach(func() { - err := st.Inspection().Add(ctx, []string{"vm-1", "vm-2", "vm-3"}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - }) - - It("should delete all inspections", func() { - err := st.Inspection().DeleteAll(ctx) - Expect(err).NotTo(HaveOccurred()) - - statuses, err := st.Inspection().List(ctx, nil) - Expect(err).NotTo(HaveOccurred()) - Expect(statuses).To(HaveLen(0)) - }) - }) - - Describe("Processing order", func() { - It("should maintain insertion order via sequence", func() { - // Add VMs one by one - for _, id := range []string{"vm-c", "vm-a", "vm-b"} { - err := st.Inspection().Add(ctx, []string{id}, models.InspectionStatePending) - Expect(err).NotTo(HaveOccurred()) - } - - // First should return in insertion order - first, err := st.Inspection().First(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(first).To(Equal("vm-c")) - - // Mark as completed and get next - err = st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-c"), - models.InspectionStatus{State: models.InspectionStateCompleted}) - Expect(err).NotTo(HaveOccurred()) - - second, err := st.Inspection().First(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(second).To(Equal("vm-a")) - - // Mark as completed and get next - err = st.Inspection().Update(ctx, - store.NewInspectionUpdateFilter().ByVmIDs("vm-a"), - models.InspectionStatus{State: models.InspectionStateCompleted}) - Expect(err).NotTo(HaveOccurred()) - - third, err := st.Inspection().First(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(third).To(Equal("vm-b")) - }) - }) -}) diff --git a/internal/services/manager.go b/internal/services/manager.go index baf3e03c..81f1bdd0 100644 --- a/internal/services/manager.go +++ b/internal/services/manager.go @@ -72,8 +72,14 @@ func (m *ServiceManager) Initialize() error { m.cfg.Agent.DataFolder, m.cfg.Agent.OpaPoliciesFolder, ) - m.inspector = NewInspectorService(m.scheduler, m.store). - WithBuilder(models.UnimplementedInspectorWorkBuilder{}) + + // Todo: remove WithWorkUnitsBuilder when service is ready + m.inspector = NewInspectorService(). + WithInspectionService(NewInspectionService().WithWorkUnitsBuilder( + func(id string) []models.WorkUnit[models.InspectionStatus, models.InspectionResult] { + return make([]models.WorkUnit[models.InspectionStatus, models.InspectionResult], 0) + })) + m.vddk = NewVddkService(m.cfg.Agent.DataFolder, m.store) consoleSrv, err := NewConsoleService( diff --git a/internal/store/doc.go b/internal/store/doc.go index d46bade8..c47ebdea 100644 --- a/internal/store/doc.go +++ b/internal/store/doc.go @@ -176,7 +176,6 @@ // FROM vinfo v // LEFT JOIN vdisk dk ON v."VM ID" = dk."VM ID" // LEFT JOIN concerns c ON v."VM ID" = c."VM_ID" -// LEFT JOIN vm_inspection_status i ON v."VM ID" = i."VM ID" // LEFT JOIN vcpu cpu ON v."VM ID" = cpu."VM ID" // LEFT JOIN vmemory mem ON v."VM ID" = mem."VM ID" // LEFT JOIN vnetwork net ON v."VM ID" = net."VM ID" diff --git a/internal/store/inspection.go b/internal/store/inspection.go deleted file mode 100644 index 490279f6..00000000 --- a/internal/store/inspection.go +++ /dev/null @@ -1,197 +0,0 @@ -package store - -import ( - "context" - "database/sql" - "errors" - "fmt" - - sq "github.com/Masterminds/squirrel" - - "github.com/kubev2v/assisted-migration-agent/internal/models" - srvErrors "github.com/kubev2v/assisted-migration-agent/pkg/errors" -) - -// Column name constants for vm_inspection_status table -const ( - inspectionTable = "vm_inspection_status" - inspectionColVmID = `"VM ID"` - inspectionColStatus = "status" - inspectionColError = "error" - inspectionColSequence = "sequence" -) - -type InspectionStore struct { - db QueryInterceptor -} - -func NewInspectionStore(db QueryInterceptor) *InspectionStore { - return &InspectionStore{db: db} -} - -// Get returns the inspection status for a VM by its ID. -func (s *InspectionStore) Get(ctx context.Context, vmID string) (*models.InspectionStatus, error) { - query, args, err := sq.Select(inspectionColVmID, inspectionColStatus, inspectionColError). - From(inspectionTable). - Where(sq.Eq{inspectionColVmID: vmID}). - ToSql() - if err != nil { - return nil, fmt.Errorf("building query for vm %s: %w", vmID, err) - } - - row := s.db.QueryRowContext(ctx, query, args...) - var id, status string - var errStr sql.NullString - err = row.Scan(&id, &status, &errStr) - if errors.Is(err, sql.ErrNoRows) { - return nil, srvErrors.NewResourceNotFoundError("vm inspection status", vmID) - } - if err != nil { - return nil, fmt.Errorf("scanning inspection for vm %s: %w", vmID, err) - } - result := &models.InspectionStatus{ - State: models.InspectionState(status), - } - if errStr.Valid { - result.Error = errors.New(errStr.String) - } - return result, nil -} - -// List returns inspection statuses matching the filter. If filter is nil, returns all. -func (s *InspectionStore) List(ctx context.Context, filter *InspectionQueryFilter) (map[string]models.InspectionStatus, error) { - builder := sq.Select(inspectionColVmID, inspectionColStatus, inspectionColError).From(inspectionTable) - - if filter != nil { - builder = filter.Apply(builder) - } - - query, args, err := builder.ToSql() - if err != nil { - return nil, fmt.Errorf("building list query: %w", err) - } - - rows, err := s.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("executing list query: %w", err) - } - defer func() { - _ = rows.Close() - }() - - result := make(map[string]models.InspectionStatus) - for rows.Next() { - var vmID, status string - var errStr sql.NullString - if err := rows.Scan(&vmID, &status, &errStr); err != nil { - return nil, fmt.Errorf("scanning inspection row: %w", err) - } - inspStatus := models.InspectionStatus{ - State: models.InspectionState(status), - } - if errStr.Valid { - inspStatus.Error = errors.New(errStr.String) - } - result[vmID] = inspStatus - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("iterating inspection rows: %w", err) - } - return result, nil -} - -// First returns the VM ID for pending inspection with the lowest sequence value. -// Returns empty string and sql.ErrNoRows if no matching record is found. -func (s *InspectionStore) First(ctx context.Context) (string, error) { - builder := sq.Select(inspectionColVmID). - From(inspectionTable). - OrderBy(inspectionColSequence + " ASC"). - Where(sq.Eq{inspectionColStatus: models.InspectionStatePending.Value()}). - Limit(1) - - query, args, err := builder.ToSql() - if err != nil { - return "", fmt.Errorf("building first pending query: %w", err) - } - - row := s.db.QueryRowContext(ctx, query, args...) - var vmID string - err = row.Scan(&vmID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return "", sql.ErrNoRows - } - return "", fmt.Errorf("scanning first pending vm: %w", err) - } - - return vmID, nil -} - -// Add inserts new inspection statuses for multiple VMs. Existing VMs are ignored. -// The sequence is automatically assigned by the database based on insertion order. -func (s *InspectionStore) Add(ctx context.Context, vmIDs []string, status models.InspectionState) error { - if len(vmIDs) == 0 { - return nil - } - - builder := sq.Insert(inspectionTable). - Columns(inspectionColVmID, inspectionColStatus) - - for _, vmID := range vmIDs { - builder = builder.Values(vmID, status.Value()) - } - - query, args, err := builder. - Suffix("ON CONFLICT (" + inspectionColVmID + ") DO NOTHING"). - ToSql() - if err != nil { - return fmt.Errorf("building add query: %w", err) - } - - _, err = s.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("executing add for %d vms: %w", len(vmIDs), err) - } - return nil -} - -// Update updates inspection status for VMs matching the filter. -func (s *InspectionStore) Update(ctx context.Context, filter *InspectionUpdateFilter, status models.InspectionStatus) error { - var errStr *string - if status.Error != nil { - e := status.Error.Error() - errStr = &e - } - - builder := sq.Update(inspectionTable). - Set(inspectionColStatus, status.State.Value()). - Set(inspectionColError, errStr) - - if filter != nil { - builder = filter.Apply(builder) - } - - query, args, err := builder.ToSql() - if err != nil { - return fmt.Errorf("building update query: %w", err) - } - _, err = s.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("executing update: %w", err) - } - return nil -} - -// DeleteAll removes all inspection statuses. -func (s *InspectionStore) DeleteAll(ctx context.Context) error { - query, args, err := sq.Delete(inspectionTable).ToSql() - if err != nil { - return fmt.Errorf("building delete all query: %w", err) - } - _, err = s.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("deleting all inspections: %w", err) - } - return nil -} diff --git a/internal/store/inspection_filters.go b/internal/store/inspection_filters.go deleted file mode 100644 index eeea6098..00000000 --- a/internal/store/inspection_filters.go +++ /dev/null @@ -1,108 +0,0 @@ -package store - -import ( - sq "github.com/Masterminds/squirrel" - - "github.com/kubev2v/assisted-migration-agent/internal/models" -) - -type InspectionFilterFunc func(sq.SelectBuilder) sq.SelectBuilder - -type InspectionQueryFilter struct { - filters []InspectionFilterFunc -} - -func NewInspectionQueryFilter() *InspectionQueryFilter { - return &InspectionQueryFilter{ - filters: make([]InspectionFilterFunc, 0), - } -} - -func (f *InspectionQueryFilter) Add(filter InspectionFilterFunc) *InspectionQueryFilter { - f.filters = append(f.filters, filter) - return f -} - -func (f *InspectionQueryFilter) ByVmIDs(vmIDs ...string) *InspectionQueryFilter { - if len(vmIDs) == 0 { - return f - } - return f.Add(func(b sq.SelectBuilder) sq.SelectBuilder { - return b.Where(sq.Eq{inspectionColVmID: vmIDs}) - }) -} - -func (f *InspectionQueryFilter) ByStatus(statuses ...models.InspectionState) *InspectionQueryFilter { - if len(statuses) == 0 { - return f - } - statusStrings := make([]string, len(statuses)) - for i, s := range statuses { - statusStrings[i] = s.Value() - } - return f.Add(func(b sq.SelectBuilder) sq.SelectBuilder { - return b.Where(sq.Eq{inspectionColStatus: statusStrings}) - }) -} - -func (f *InspectionQueryFilter) Limit(limit int) *InspectionQueryFilter { - return f.Add(func(b sq.SelectBuilder) sq.SelectBuilder { - return b.Limit(uint64(limit)) - }) -} - -func (f *InspectionQueryFilter) OrderBySequence() *InspectionQueryFilter { - return f.Add(func(b sq.SelectBuilder) sq.SelectBuilder { - return b.OrderBy(inspectionColSequence + " ASC") - }) -} - -func (f *InspectionQueryFilter) Apply(builder sq.SelectBuilder) sq.SelectBuilder { - for _, filter := range f.filters { - builder = filter(builder) - } - return builder -} - -type UpdateFilterFunc func(sq.UpdateBuilder) sq.UpdateBuilder - -type InspectionUpdateFilter struct { - filters []UpdateFilterFunc -} - -func NewInspectionUpdateFilter() *InspectionUpdateFilter { - return &InspectionUpdateFilter{ - filters: make([]UpdateFilterFunc, 0), - } -} - -func (f *InspectionUpdateFilter) ByVmIDs(vmIDs ...string) *InspectionUpdateFilter { - if len(vmIDs) == 0 { - return f - } - f.filters = append(f.filters, func(b sq.UpdateBuilder) sq.UpdateBuilder { - return b.Where(sq.Eq{inspectionColVmID: vmIDs}) - }) - return f -} - -func (f *InspectionUpdateFilter) ByStatus(statuses ...models.InspectionState) *InspectionUpdateFilter { - if len(statuses) == 0 { - return f - } - statusStrings := make([]string, len(statuses)) - for i, s := range statuses { - statusStrings[i] = s.Value() - } - f.filters = append(f.filters, func(b sq.UpdateBuilder) sq.UpdateBuilder { - return b.Where(sq.Eq{inspectionColStatus: statusStrings}) - }) - return f -} - -func (f *InspectionUpdateFilter) Apply(builder sq.UpdateBuilder) sq.UpdateBuilder { - for _, filter := range f.filters { - builder = filter(builder) - } - return builder -} diff --git a/internal/store/migrations/migrations_test.go b/internal/store/migrations/migrations_test.go index 8ba69378..1e25142a 100644 --- a/internal/store/migrations/migrations_test.go +++ b/internal/store/migrations/migrations_test.go @@ -125,25 +125,5 @@ var _ = Describe("Migrations", func() { Expect(v).To(Equal(i + 1)) } }) - - // Given migrations have been applied - // When we check the vm_inspection_status table - // Then it should exist and accept inserts - It("should create vm_inspection_status table", func() { - err := migrations.Run(ctx, db) - Expect(err).NotTo(HaveOccurred()) - - // Insert a row into vinfo first (FK constraint) - _, err = db.ExecContext(ctx, ` - INSERT INTO vinfo ("VM ID", "VM") VALUES ('vm-1', 'test-vm') - `) - Expect(err).NotTo(HaveOccurred()) - - _, err = db.ExecContext(ctx, ` - INSERT INTO vm_inspection_status ("VM ID", status) - VALUES ('vm-1', 'pending') - `) - Expect(err).NotTo(HaveOccurred()) - }) }) }) diff --git a/internal/store/migrations/sql/006_inspection_status_table.sql b/internal/store/migrations/sql/006_inspection_status_table.sql new file mode 100644 index 00000000..9e9f5ed8 --- /dev/null +++ b/internal/store/migrations/sql/006_inspection_status_table.sql @@ -0,0 +1,3 @@ +-- drop vm_inspection_status table +DROP TABLE IF EXISTS vm_inspection_status; +DROP SEQUENCE IF EXISTS vm_inspection_status_seq; \ No newline at end of file diff --git a/internal/store/store.go b/internal/store/store.go index 8b84d570..dc321207 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -15,7 +15,6 @@ type Store struct { configuration *ConfigurationStore inventory *InventoryStore vm *VMStore - inspection *InspectionStore group *GroupStore vddk *VddkStore transactor *DBTransactor @@ -30,7 +29,6 @@ func NewStore(db *sql.DB, validator duckdb_parser.Validator) *Store { configuration: NewConfigurationStore(qi), inventory: NewInventoryStore(qi), vm: NewVMStore(qi, parser), - inspection: NewInspectionStore(qi), group: NewGroupStore(qi), vddk: NewVddkStore(qi), transactor: newTransactor(db), @@ -65,10 +63,6 @@ func (s *Store) VM() *VMStore { return s.vm } -func (s *Store) Inspection() *InspectionStore { - return s.inspection -} - func (s *Store) Group() *GroupStore { return s.group } diff --git a/internal/store/vm.go b/internal/store/vm.go index 4b34907b..fea11bec 100644 --- a/internal/store/vm.go +++ b/internal/store/vm.go @@ -2,7 +2,6 @@ package store import ( "context" - "errors" "fmt" "strings" @@ -66,7 +65,6 @@ func (s *VMStore) List(ctx context.Context, filters []sq.Sqlizer, opts ...ListOp var vms []models.VirtualMachineSummary for rows.Next() { var vm models.VirtualMachineSummary - var sqlErr string var tags StringArray err := rows.Scan( &vm.ID, @@ -77,18 +75,13 @@ func (s *VMStore) List(ctx context.Context, filters []sq.Sqlizer, opts ...ListOp &vm.Memory, &vm.DiskSize, &vm.IssueCount, - &vm.Status.State, &vm.IsTemplate, &vm.IsMigratable, - &sqlErr, &tags, ) if err != nil { return nil, err } - if sqlErr != "" { - vm.Status.Error = errors.New(sqlErr) - } vm.Tags = tags vms = append(vms, vm) } diff --git a/internal/store/vm_filter_test.go b/internal/store/vm_filter_test.go index c737e3c0..ff11f82b 100644 --- a/internal/store/vm_filter_test.go +++ b/internal/store/vm_filter_test.go @@ -51,9 +51,6 @@ var _ = Describe("VMStore cross-table filters", func() { err = test.InsertVMDatastores(ctx, db) Expect(err).NotTo(HaveOccurred()) - - err = test.InsertVMInspections(ctx, db) - Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { diff --git a/internal/store/vm_queries.go b/internal/store/vm_queries.go index 83836f31..c473753f 100644 --- a/internal/store/vm_queries.go +++ b/internal/store/vm_queries.go @@ -13,16 +13,13 @@ var vmOutputQuery = sq.Select( `v."Memory" AS memory`, `COALESCE(d.total_disk, 0) AS disk_size`, `COALESCE(c.issues_count, 0) AS issue_count`, - `COALESCE(i.status, 'not_found') AS status`, `v."Template" as template`, `COALESCE(crit.critical_count, 0) = 0 AS migratable`, - `COALESCE(i.error, '') AS error`, `COALESCE(t.tags, [])::VARCHAR[] AS tags`, ).From("vinfo v"). LeftJoin(`(SELECT "VM_ID", COUNT(*) AS issues_count FROM concerns GROUP BY "VM_ID") c ON v."VM ID" = c."VM_ID"`). LeftJoin(`(SELECT "VM_ID", COUNT(*) AS critical_count FROM concerns WHERE "Category" = 'Critical' GROUP BY "VM_ID") crit ON v."VM ID" = crit."VM_ID"`). LeftJoin(`(SELECT "VM ID", SUM("Capacity MiB") AS total_disk FROM vdisk GROUP BY "VM ID") d ON v."VM ID" = d."VM ID"`). - LeftJoin(`vm_inspection_status i ON v."VM ID" = i."VM ID"`). LeftJoin(`( SELECT u.vm_id, list_distinct(flatten(list(g.tags))) AS tags FROM group_matches gm @@ -39,7 +36,6 @@ var vmFilterSubquery = sq.Select(`DISTINCT v."VM ID"`). From("vinfo v"). LeftJoin(`vdisk dk ON v."VM ID" = dk."VM ID"`). LeftJoin(`concerns c ON v."VM ID" = c."VM_ID"`). - LeftJoin(`vm_inspection_status i ON v."VM ID" = i."VM ID"`). LeftJoin(`vcpu cpu ON v."VM ID" = cpu."VM ID"`). LeftJoin(`vmemory mem ON v."VM ID" = mem."VM ID"`). LeftJoin(`vnetwork net ON v."VM ID" = net."VM ID"`). diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 8c021852..622cab6f 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -71,6 +71,10 @@ func NewOperationInProgressError(op string) *OperationInProgressError { } } +func NewInspectionInProgressError() *OperationInProgressError { + return NewOperationInProgressError("inspection") +} + func NewCollectionInProgressError() *OperationInProgressError { return NewOperationInProgressError("collection") } @@ -169,21 +173,6 @@ func IsConsoleClientError(err error) bool { return errors.As(err, &e) } -// InspectorWorkError indicates that an error occurred during the work -type InspectorWorkError struct { - msg string -} - -func NewInspectorWorkError(format string, args ...any) error { - return &InspectorWorkError{ - msg: fmt.Sprintf(format, args...), - } -} - -func (e *InspectorWorkError) Error() string { - return e.msg -} - // InspectorNotRunningError indicates that inspector not currently running type InspectorNotRunningError struct{} diff --git a/pkg/errors/errors_test.go b/pkg/errors/errors_test.go index ca87077a..81e48e03 100644 --- a/pkg/errors/errors_test.go +++ b/pkg/errors/errors_test.go @@ -394,19 +394,6 @@ var _ = Describe("Errors", func() { }) }) - Context("InspectorWorkError", func() { - // Given an InspectorWorkError created with format args - // When Error() is called - // Then it should contain the formatted message - It("should format message with args", func() { - // Arrange - err := srvErrors.NewInspectorWorkError("vm %s failed: %s", "vm-1", "snapshot error") - - // Act & Assert - Expect(err.Error()).To(Equal("vm vm-1 failed: snapshot error")) - }) - }) - Context("InspectorNotRunningError", func() { // Given an InspectorNotRunningError // When Error() is called diff --git a/pkg/filter/doc.go b/pkg/filter/doc.go index 2fc1c0f5..2ec0ee1b 100644 --- a/pkg/filter/doc.go +++ b/pkg/filter/doc.go @@ -159,10 +159,6 @@ // // concern.label, concern.category, concern.assessment // -// vm_inspection_status (i) — inspection.* prefix: -// -// inspection.status, inspection.error -// // vcpu (cpu) — cpu.* prefix: // // cpu.hot_add, cpu.hot_remove, cpu.sockets, cpu.cores_per_socket diff --git a/pkg/filter/sql.go b/pkg/filter/sql.go index cd955084..feadf88c 100644 --- a/pkg/filter/sql.go +++ b/pkg/filter/sql.go @@ -106,12 +106,6 @@ var defaultMapFn MapFunc = func(name string) (string, error) { case "concern.assessment": return `c."Assessment"`, nil - // vm_inspection_status (i) — inspection.* prefix - case "inspection.status": - return `i.status`, nil - case "inspection.error": - return `i.error`, nil - // vcpu (cpu) — cpu.* prefix case "cpu.hot_add": return `cpu."Hot Add"`, nil diff --git a/pkg/filter/sql_test.go b/pkg/filter/sql_test.go index a6d8cf16..f040c280 100644 --- a/pkg/filter/sql_test.go +++ b/pkg/filter/sql_test.go @@ -832,8 +832,6 @@ var _ = Describe("SQL Generation", func() { {"concern.label", `c."Label"`}, {"concern.category", `c."Category"`}, {"concern.assessment", `c."Assessment"`}, - {"inspection.status", `i.status`}, - {"inspection.error", `i.error`}, {"cpu.hot_add", `cpu."Hot Add"`}, {"cpu.hot_remove", `cpu."Hot Remove"`}, {"cpu.sockets", `cpu."Sockets"`}, diff --git a/pkg/vmware/work_builder.go b/pkg/vmware/work_builder.go deleted file mode 100644 index f4055d0e..00000000 --- a/pkg/vmware/work_builder.go +++ /dev/null @@ -1,80 +0,0 @@ -package vmware - -import ( - "context" - - "go.uber.org/zap" - - "github.com/kubev2v/assisted-migration-agent/internal/models" -) - -// InsWorkBuilder builds a sequence of WorkUnits for the v1 Inspector workflow. -type InsWorkBuilder struct { - operator VMOperator -} - -// NewInspectorWorkBuilder creates a new v1 work builder. -func NewInspectorWorkBuilder(operator VMOperator) *InsWorkBuilder { - return &InsWorkBuilder{ - operator: operator, - } -} - -// Build creates the sequence of WorkUnits for the Inspector workflow. -func (b *InsWorkBuilder) Build(id string) []models.InspectorWorkUnit { - return b.vmWork(id) -} - -func (b *InsWorkBuilder) vmWork(id string) []models.InspectorWorkUnit { - var units []models.InspectorWorkUnit - - inspect := models.InspectorWorkUnit{ - Work: func() func(ctx context.Context) (any, error) { - return func(ctx context.Context) (any, error) { - zap.S().Named("inspector_service").Info("validate privileges on VM") - - if err := b.operator.ValidatePrivileges(ctx, id, models.RequiredPrivileges); err != nil { - zap.S().Named("inspector_service").Errorw("validation failed", "error", err) - return nil, err - } - - zap.S().Named("inspector_service").Infow("creating VM snapshot", "vmId", id) - req := CreateSnapshotRequest{ - VmId: id, - SnapshotName: models.InspectionSnapshotName, - Description: "", - Memory: false, - Quiesce: false, - } - - if err := b.operator.CreateSnapshot(ctx, req); err != nil { - zap.S().Named("inspector_service").Errorw("failed to create VM snapshot", "vmId", id, "error", err) - return nil, err - } - - zap.S().Named("inspector_service").Infow("VM snapshot created", "vmId", id) - - // Todo: add the inspection logic here - - removeSnapReq := RemoveSnapshotRequest{ - VmId: id, - SnapshotName: models.InspectionSnapshotName, - Consolidate: true, - } - - if err := b.operator.RemoveSnapshot(ctx, removeSnapReq); err != nil { - zap.S().Named("inspector_service").Errorw("failed to remove VM snapshot", "vmId", id, "error", err) - return nil, err - } - - zap.S().Named("inspector_service").Infow("VM snapshot removed", "vmId", id) - - return nil, nil - } - }, - } - - units = append(units, inspect) - - return units -} diff --git a/test/vms.go b/test/vms.go index f48e3e56..99a09609 100644 --- a/test/vms.go +++ b/test/vms.go @@ -221,17 +221,3 @@ func InsertVMDatastores(ctx context.Context, db *sql.DB) error { } return nil } - -// InsertVMInspections inserts vm_inspection_status fixture data. -func InsertVMInspections(ctx context.Context, db *sql.DB) error { - for _, ins := range Inspections { - _, err := db.ExecContext(ctx, ` - INSERT INTO vm_inspection_status ("VM ID", status, error) - VALUES (?, ?, ?) - `, ins.VMID, ins.Status, ins.Error) - if err != nil { - return err - } - } - return nil -}