diff --git a/cdb/db.go b/cdb/db.go index 021c5cb..b87193e 100644 --- a/cdb/db.go +++ b/cdb/db.go @@ -132,25 +132,20 @@ func (oDb *DB) DeleteBatched(ctx context.Context, table, dateCol, orderbyCol str ctx, cancel := context.WithTimeout(ctx, time.Minute) // Execute the DELETE statement - result, err := oDb.DB.ExecContext(ctx, query) + count, err := oDb.execCountContext(ctx, query) cancel() if err != nil { return totalDeleted, batchCount, fmt.Errorf("%s: error executing batch %d: %w", table, batchCount, err) } // Check the number of affected rows - rowsAffected, err := result.RowsAffected() - if err != nil { - return totalDeleted, batchCount, fmt.Errorf("%s: error checking affected rows for batch %d: %w", table, batchCount, err) - } - - totalDeleted += rowsAffected - if rowsAffected > 0 { - slog.Debug(fmt.Sprintf("DeleteBatched: %s: batch %d: deleted %d rows. total deleted: %d", table, batchCount, rowsAffected, totalDeleted)) + totalDeleted += count + if count > 0 { + slog.Debug(fmt.Sprintf("DeleteBatched: %s: batch %d: deleted %d rows. total deleted: %d", table, batchCount, count, totalDeleted)) } // If less than the batch size was deleted, we've reached the end of the matching rows. - if rowsAffected < batchSize { + if count < batchSize { return totalDeleted, batchCount, nil } @@ -158,3 +153,21 @@ func (oDb *DB) DeleteBatched(ctx context.Context, table, dateCol, orderbyCol str time.Sleep(10 * time.Millisecond) } } + +// ExecContextAndCountRowsAffected executes the oDb.DB.ExecContext query with the provided context, returning the number of rows affected and an error. +func (oDb *DB) ExecContextAndCountRowsAffected(ctx context.Context, query string, args ...any) (int64, error) { + return oDb.execCountContext(ctx, query, args...) +} + +// execCountContext executes the oDb.DB.ExecContext query with the provided context and arguments, returning the number of affected rows and an error. +func (oDb *DB) execCountContext(ctx context.Context, query string, args ...any) (int64, error) { + result, err := oDb.DB.ExecContext(ctx, query, args...) + if err != nil { + return 0, err + } + if result == nil { + // len data may be 0, so no rows affected. + return 0, nil + } + return result.RowsAffected() +} diff --git a/cdb/db_actions.go b/cdb/db_actions.go index 2992f4f..eea0c54 100644 --- a/cdb/db_actions.go +++ b/cdb/db_actions.go @@ -67,13 +67,9 @@ func (oDb *DB) AutoAckActionErrors(ctx context.Context, ids []int64) error { WHERE id IN (%s)`, Placeholders(len(ids))) args := argsFromIDs(ids) - result, err := oDb.DB.ExecContext(ctx, request, args...) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request, args...); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("svcactions") } return nil @@ -97,13 +93,9 @@ func (oDb *DB) LogActionErrorsNotAcked(ctx context.Context, ids []int64) error { WHERE id IN (%s)`, Placeholders(len(ids))) args := argsFromIDs(ids) - result, err := oDb.DB.ExecContext(ctx, request, args...) - if err != nil { + if count, err := oDb.execCountContext(ctx, request, args...); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("log") } return nil @@ -175,13 +167,9 @@ func (oDb *DB) UpdateUnfinishedActions(ctx context.Context) error { AND end IS NULL AND status IS NULL AND action NOT LIKE "%#%"` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("svcactions") } return nil @@ -232,6 +220,8 @@ func (oDb *DB) InsertSvcAction(ctx context.Context, svcID, nodeID uuid.UUID, act result, err := oDb.DB.ExecContext(ctx, query, args...) if err != nil { return 0, err + } else if result == nil { + return 0, errors.New("insert svcactions returns unexpected nil result") } id, err := result.LastInsertId() @@ -253,6 +243,8 @@ func (oDb *DB) UpdateSvcAction(ctx context.Context, svcActionID int64, end time. result, err := oDb.DB.ExecContext(ctx, query, end, status, end, statusLog, svcActionID) if err != nil { return err + } else if result == nil { + return errors.New("update svcactions returns unexpected nil result") } if rowsAffected, err := result.RowsAffected(); err != nil { return err diff --git a/cdb/db_checks.go b/cdb/db_checks.go index 72f22c7..5449c8a 100644 --- a/cdb/db_checks.go +++ b/cdb/db_checks.go @@ -8,14 +8,10 @@ import ( func (oDb *DB) PurgeChecksOutdated(ctx context.Context) error { request := fmt.Sprintf("DELETE FROM `checks_live` WHERE `chk_updated` < DATE_SUB(NOW(), INTERVAL 2 DAY)") - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return fmt.Errorf("delete from checks_live: %w", err) - } - if rowAffected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("count delete from checks_live: %w", err) - } else if rowAffected > 0 { - slog.Info(fmt.Sprintf("purged %d entries from table checks_live", rowAffected)) + } else if count > 0 { + slog.Info(fmt.Sprintf("purged %d entries from table checks_live", count)) oDb.SetChange("checks_live") } return nil diff --git a/cdb/db_compliance.go b/cdb/db_compliance.go index 887c5cd..421315a 100644 --- a/cdb/db_compliance.go +++ b/cdb/db_compliance.go @@ -11,11 +11,9 @@ func (oDb *DB) PurgeCompModulesetsNodes(ctx context.Context) error { node_id NOT IN ( SELECT DISTINCT node_id FROM nodes )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_node_moduleset") } return nil @@ -28,11 +26,9 @@ func (oDb *DB) PurgeCompRulesetsNodes(ctx context.Context) error { node_id NOT IN ( SELECT DISTINCT node_id FROM nodes )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_rulesets_nodes") } return nil @@ -45,11 +41,9 @@ func (oDb *DB) PurgeCompRulesetsServices(ctx context.Context) error { svc_id NOT IN ( SELECT DISTINCT svc_id FROM svcmon )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_rulesets_services") } return nil @@ -62,11 +56,9 @@ func (oDb *DB) PurgeCompModulesetsServices(ctx context.Context) error { svc_id NOT IN ( SELECT DISTINCT svc_id FROM svcmon )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_modulesets_services") } return nil @@ -78,11 +70,9 @@ func (oDb *DB) PurgeCompStatusOutdated(ctx context.Context) error { FROM comp_status WHERE run_date < DATE_SUB(NOW(), INTERVAL 31 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil @@ -96,11 +86,9 @@ func (oDb *DB) PurgeCompStatusSvcOrphans(ctx context.Context) error { svc_id NOT IN ( SELECT DISTINCT svc_id FROM svcmon )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil @@ -114,11 +102,9 @@ func (oDb *DB) PurgeCompStatusNodeOrphans(ctx context.Context) error { node_id NOT IN ( SELECT DISTINCT node_id FROM nodes )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil @@ -132,11 +118,9 @@ func (oDb *DB) PurgeCompStatusModulesetOrphans(ctx context.Context) error { run_module NOT IN ( SELECT modset_mod_name FROM comp_moduleset_modules )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil @@ -156,11 +140,9 @@ func (oDb *DB) PurgeCompStatusNodeUnattached(ctx context.Context) error { FROM comp_node_moduleset ) )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil @@ -180,11 +162,9 @@ func (oDb *DB) PurgeCompStatusSvcUnattached(ctx context.Context) error { FROM comp_modulesets_services ) )` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("comp_status") } return nil diff --git a/cdb/db_dashboard.go b/cdb/db_dashboard.go index a587441..babcafb 100644 --- a/cdb/db_dashboard.go +++ b/cdb/db_dashboard.go @@ -49,24 +49,22 @@ func (oDb *DB) DashboardInstanceFrozenUpdate(ctx context.Context, objectID, node dash_updated = NOW(), dash_env = ?` ) var ( - err error - result sql.Result + err error + count int64 ) switch frozen { case true: - result, err = oDb.DB.ExecContext(ctx, queryFrozen, objectID, nodeID, objectEnv, objectEnv) + count, err = oDb.execCountContext(ctx, queryFrozen, objectID, nodeID, objectEnv, objectEnv) if err != nil { return fmt.Errorf("update dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err) } case false: - result, err = oDb.DB.ExecContext(ctx, queryThawed, objectID, nodeID) + count, err = oDb.execCountContext(ctx, queryThawed, objectID, nodeID) if err != nil { return fmt.Errorf("delete dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err) } } - if count, err := result.RowsAffected(); err != nil { - return fmt.Errorf("count dashboard 'service frozen' for %s@%s: %w", objectID, nodeID, err) - } else if count > 0 { + if count > 0 { oDb.SetChange("dashboard") } return nil @@ -78,9 +76,7 @@ func (oDb *DB) DashboardDeleteInstanceNotUpdated(ctx context.Context, objectID, const ( query = `DELETE FROM dashboard WHERE svc_id = ? AND node_id = ? AND dash_type = 'instance status not updated'` ) - if result, err := oDb.DB.ExecContext(ctx, query, objectID, nodeID); err != nil { - return err - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query, objectID, nodeID); err != nil { return err } else if count > 0 { oDb.SetChange("dashboard") @@ -94,9 +90,7 @@ func (oDb *DB) DashboardDeleteObjectWithType(ctx context.Context, objectID, dash const ( query = `DELETE FROM dashboard WHERE svc_id = ? AND dash_type = ?` ) - if result, err := oDb.DB.ExecContext(ctx, query, objectID, dashType); err != nil { - return fmt.Errorf("dashboardDeleteObjectWithType %s: %w", dashType, err) - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query, objectID, dashType); err != nil { return fmt.Errorf("dashboardDeleteObjectWithType %s: %w", dashType, err) } else if count > 0 { oDb.SetChange("dashboard") @@ -140,13 +134,11 @@ func (oDb *DB) DashboardUpdateObject(ctx context.Context, d *Dashboard) error { dash_env = ? ` ) - result, err := oDb.DB.ExecContext(ctx, query, + count, err := oDb.execCountContext(ctx, query, d.ObjectID, d.Type, d.Fmt, d.Severity, d.Dict, d.Env, d.Fmt, d.Severity, d.Dict, d.Env) if err != nil { return fmt.Errorf("dashboardUpdateObject: %w", err) - } else if count, err := result.RowsAffected(); err != nil { - return fmt.Errorf("dashboardUpdateObject: %w", err) } else if count > 0 { oDb.SetChange("dashboard") } @@ -161,9 +153,7 @@ func (oDb *DB) DashboardDeleteNetworkWrongMaskNotUpdated(ctx context.Context) er dash_type="netmask misconfigured" AND dash_updated < DATE_SUB(NOW(), INTERVAL 1 MINUTE)` ) - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return fmt.Errorf("DashboardDeleteNetworkWrongMaskNotUpdated: %w", err) - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return fmt.Errorf("DashboardDeleteNetworkWrongMaskNotUpdated: %w", err) } else if count > 0 { oDb.SetChange("dashboard") @@ -182,9 +172,7 @@ func (oDb *DB) DashboardDeleteActionErrors(ctx context.Context) error { FROM b_action_errors )` ) - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return fmt.Errorf("DashboardDeleteActionErrors: %w", err) - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return fmt.Errorf("DashboardDeleteActionErrors: %w", err) } else if count > 0 { oDb.SetChange("dashboard") @@ -201,9 +189,7 @@ func (oDb *DB) PurgeAlertsOnDeletedNodes(ctx context.Context) error { n.node_id IS NULL AND d.node_id != ""` ) - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err } else if count > 0 { oDb.SetChange("dashboard") @@ -222,9 +208,7 @@ func (oDb *DB) PurgeAlertsOnDeletedInstances(ctx context.Context) error { d.node_id != "" AND d.svc_id != ""` ) - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err } else if count > 0 { oDb.SetChange("dashboard") @@ -241,9 +225,7 @@ func (oDb *DB) PurgeAlertsOnDeletedServices(ctx context.Context) error { n.svc_id IS NULL AND d.svc_id != ""` ) - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err } else if count > 0 { oDb.SetChange("dashboard") @@ -271,13 +253,9 @@ func (oDb *DB) DashboardUpdateNodesNotUpdated(ctx context.Context) error { WHERE updated < date_sub(NOW(), interval 25 hour) ON DUPLICATE KEY UPDATE dash_updated=NOW()` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -290,13 +268,9 @@ func (oDb *DB) DashboardUpdateChecksNotUpdated(ctx context.Context) error { dash_type = "check value not updated" AND node_id NOT IN (SELECT DISTINCT node_id FROM checks_live) ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } @@ -320,7 +294,7 @@ func (oDb *DB) DashboardUpdateChecksNotUpdated(ctx context.Context) error { UNION ALL - -- Suppression des "check value not updated" non correspondants + -- Suppression des "check value not updated" non correspondents SELECT d.id FROM dashboard d LEFT JOIN checks_live c ON d.dash_dict_md5 = MD5(CONCAT('{"i":"', c.chk_instance, '", "t":"', c.chk_type, '"}')) @@ -330,13 +304,9 @@ func (oDb *DB) DashboardUpdateChecksNotUpdated(ctx context.Context) error { AND c.id IS NULL ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } request = ` @@ -360,13 +330,9 @@ func (oDb *DB) DashboardUpdateChecksNotUpdated(ctx context.Context) error { WHERE chk_updated < DATE_SUB(NOW(), INTERVAL 1 DAY) ON DUPLICATE KEY UPDATE dash_updated = NOW(); ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -401,13 +367,9 @@ func (oDb *DB) AlertActionErrors(ctx context.Context, line BActionErrorCount) er dash_fmt="%(err)s action errors", dash_dict=CONCAT('{"err": "', ?, '"}'), dash_updated=NOW()` - result, err := oDb.DB.ExecContext(ctx, request, line.SvcID, line.NodeID, severity, line.ErrCount, env, severity, line.ErrCount) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request, line.SvcID, line.NodeID, severity, line.ErrCount, env, severity, line.ErrCount); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -420,13 +382,9 @@ func (oDb *DB) DashboardDeleteActionErrorsWithNoError(ctx context.Context) error dash_dict='{"err": "0"}' and dash_type='action errors' ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -454,13 +412,9 @@ func (oDb *DB) DashboardUpdateServiceConfigNotUpdated(ctx context.Context) error ON DUPLICATE KEY UPDATE dash_updated=NOW() ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -488,15 +442,12 @@ func (oDb *DB) DashboardUpdateInstancesNotUpdated(ctx context.Context) error { ON DUPLICATE KEY UPDATE dash_updated=NOW() ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } + request = ` DELETE FROM dashboard WHERE id IN ( @@ -512,13 +463,9 @@ func (oDb *DB) DashboardUpdateInstancesNotUpdated(ctx context.Context) error { (svcmon.id IS NULL OR svcmon.mon_updated >= DATE_SUB(NOW(), INTERVAL 16 MINUTE)) ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -526,8 +473,7 @@ func (oDb *DB) DashboardUpdateInstancesNotUpdated(ctx context.Context) error { func (oDb *DB) DashboardUpdateNodeMaintenanceExpired(ctx context.Context) error { request := `SET @now = NOW()` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if _, err := oDb.DB.ExecContext(ctx, request); err != nil { return err } @@ -555,15 +501,12 @@ func (oDb *DB) DashboardUpdateNodeMaintenanceExpired(ctx context.Context) error ON DUPLICATE KEY UPDATE dash_updated=@now ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } + request = ` DELETE FROM dashboard WHERE @@ -573,13 +516,9 @@ func (oDb *DB) DashboardUpdateNodeMaintenanceExpired(ctx context.Context) error dash_updated IS NULL ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -587,8 +526,7 @@ func (oDb *DB) DashboardUpdateNodeMaintenanceExpired(ctx context.Context) error func (oDb *DB) DashboardUpdateNodeCloseToMaintenanceEnd(ctx context.Context) error { request := `SET @now = NOW()` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if _, err := oDb.DB.ExecContext(ctx, request); err != nil { return err } @@ -617,15 +555,12 @@ func (oDb *DB) DashboardUpdateNodeCloseToMaintenanceEnd(ctx context.Context) err ON DUPLICATE KEY UPDATE dash_updated=@now ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } + request = ` DELETE FROM dashboard WHERE @@ -635,13 +570,9 @@ func (oDb *DB) DashboardUpdateNodeCloseToMaintenanceEnd(ctx context.Context) err dash_updated IS NULL ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -649,8 +580,7 @@ func (oDb *DB) DashboardUpdateNodeCloseToMaintenanceEnd(ctx context.Context) err func (oDb *DB) DashboardUpdateNodeWithoutMaintenanceEnd(ctx context.Context) error { request := `SET @now = NOW()` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if _, err := oDb.DB.ExecContext(ctx, request); err != nil { return err } @@ -683,15 +613,12 @@ func (oDb *DB) DashboardUpdateNodeWithoutMaintenanceEnd(ctx context.Context) err ON DUPLICATE KEY UPDATE dash_updated=@now ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } + request = ` DELETE FROM dashboard WHERE @@ -701,13 +628,9 @@ func (oDb *DB) DashboardUpdateNodeWithoutMaintenanceEnd(ctx context.Context) err dash_updated IS NULL ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil @@ -728,8 +651,7 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { dash_dict IS NULL ) ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if _, err := oDb.DB.ExecContext(ctx, request); err != nil { return err } @@ -743,13 +665,9 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { FROM apps a ) ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } @@ -775,13 +693,9 @@ func (oDb *DB) DashboardUpdateAppWithoutResponsible(ctx context.Context) error { ON DUPLICATE KEY UPDATE dash_updated=NOW() ` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } return nil diff --git a/cdb/db_dashboard_object_flex.go b/cdb/db_dashboard_object_flex.go index d2aff4f..6014f00 100644 --- a/cdb/db_dashboard_object_flex.go +++ b/cdb/db_dashboard_object_flex.go @@ -46,16 +46,12 @@ func (oDb *DB) DashboardUpdateObjectFlexStarted(ctx context.Context, obj *DBObje DELETE FROM dashboard WHERE svc_id = ? and dash_type = "flex error" and dash_fmt like "%instances started%"` - if result, err := oDb.DB.ExecContext(ctx, query, obj.SvcID, sev, obj.Env, obj.SvcID, obj.SvcID); err != nil { + if count, err := oDb.execCountContext(ctx, query, obj.SvcID, sev, obj.Env, obj.SvcID, obj.SvcID); err != nil { return fmt.Errorf("dashboardDeleteObjectWithType flex error: %w", err) - } else if count, err := result.RowsAffected(); err != nil { - return fmt.Errorf("dashboardDeleteObjectWithType count updated: %w", err) } else if count > 0 { defer oDb.SetChange("dashboard") - } else if result, err := oDb.DB.ExecContext(ctx, queryDelete, obj.SvcID); err != nil { + } else if count, err := oDb.execCountContext(ctx, queryDelete, obj.SvcID); err != nil { return fmt.Errorf("dashboardDeleteObjectWithType clean obsolete flex error: %w", err) - } else if count, err := result.RowsAffected(); err != nil { - return fmt.Errorf("dashboardDeleteObjectWithType count deleted: %w", err) } else if count > 0 { defer oDb.SetChange("dashboard") } diff --git a/cdb/db_diskinfo.go b/cdb/db_diskinfo.go index b5abb85..8b4c796 100644 --- a/cdb/db_diskinfo.go +++ b/cdb/db_diskinfo.go @@ -78,12 +78,10 @@ func (oDb *DB) UpdateDiskinfoArrayID(ctx context.Context, diskID, arrayID string " VALUES (?, ?, NOW())" + " ON DUPLICATE KEY UPDATE `disk_arrayid` = VALUES(`disk_arrayid`), `disk_updated` = VALUES(disk_updated)" ) - if result, err := oDb.DB.ExecContext(ctx, query, diskID, arrayID); err != nil { + if count, err := oDb.execCountContext(ctx, query, diskID, arrayID); err != nil { return false, fmt.Errorf("update diskinfo: %w", err) - } else if affected, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("count diskinfo updated: %w", err) } else { - return affected > 0, nil + return count > 0, nil } } @@ -93,12 +91,10 @@ func (oDb *DB) UpdateDiskinfoArrayAndDevIDsAndSize(ctx context.Context, diskID, " VALUES (?, ?, ?, ?, NOW())" + " ON DUPLICATE KEY UPDATE `disk_arrayid` = VALUES(`disk_arrayid`), `disk_devid` = VALUES(`disk_devid`), `disk_size` = VALUES(`disk_size`), `disk_updated` = VALUES(disk_updated)" ) - if result, err := oDb.DB.ExecContext(ctx, query, diskID, arrayID, devID, size); err != nil { + if count, err := oDb.execCountContext(ctx, query, diskID, arrayID, devID, size); err != nil { return false, fmt.Errorf("update diskinfo: %w", err) - } else if affected, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("count diskinfo updated: %w", err) } else { - return affected > 0, nil + return count > 0, nil } } @@ -108,12 +104,10 @@ func (oDb *DB) UpdateDiskinfoForDiskSize(ctx context.Context, diskID string, siz " VALUES (?, ?, NOW())" + " ON DUPLICATE KEY UPDATE `disk_size` = VALUES(`disk_size`), `disk_updated` = VALUES(disk_updated)" ) - if result, err := oDb.DB.ExecContext(ctx, query, diskID, size); err != nil { + if count, err := oDb.execCountContext(ctx, query, diskID, size); err != nil { return false, fmt.Errorf("update diskinfo: %w", err) - } else if affected, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("count diskinfo updated: %w", err) } else { - return affected > 0, nil + return count > 0, nil } } @@ -122,12 +116,10 @@ func (oDb *DB) UpdateDiskinfoSetMissingArrayID(ctx context.Context, diskID, arra query = "UPDATE `diskinfo` SET `disk_arrayid` = ?" + " WHERE `disk_id` = ? AND (`disk_arrayid` = '' OR `disk_arrayid` is NULL)" ) - if result, err := oDb.DB.ExecContext(ctx, query, arrayID, diskID); err != nil { + if count, err := oDb.execCountContext(ctx, query, arrayID, diskID); err != nil { return false, fmt.Errorf("update diskinfo: %w", err) - } else if affected, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("count diskinfo updated: %w", err) } else { - return affected > 0, nil + return count > 0, nil } } @@ -136,11 +128,9 @@ func (oDb *DB) PurgeDiskinfoOutdated(ctx context.Context) error { FROM diskinfo WHERE disk_updated < DATE_SUB(NOW(), INTERVAL 2 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return fmt.Errorf("purge diskinfo: %w", err) - } else if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("count diskinfo deleted: %w", err) - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("diskinfo") } return nil diff --git a/cdb/db_heartbeat.go b/cdb/db_heartbeat.go index 58fc57e..0964f00 100644 --- a/cdb/db_heartbeat.go +++ b/cdb/db_heartbeat.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "log/slog" "time" ) @@ -77,7 +76,7 @@ func (oDb *DB) HBUpdate(ctx context.Context, hb DBHeartbeat) error { defer logDuration("HBUpdate cluster id:"+hb.ClusterID+" "+hb.Driver, time.Now()) const ( qUpdate = "" + - "INSERT INTO `hbmon` (`cluster_id`, `node_id`, `peer_node_id`, `driver`, `name`, `desc`, `state`, `beating`, `last_beating`, `updated`)" + + "INSERT INTO `hbmon` (`cluster_id`, `node_id`, `peer_node_id`, `driver`, `name`, `desc`, `stat e`, `beating`, `last_beating`, `updated`)" + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NOW())" + "ON DUPLICATE KEY UPDATE" + " `cluster_id` = VALUES(`cluster_id`), `driver` = VALUES(`driver`), `desc` = VALUES(`desc`), `state` = VALUES(`state`), `beating`= VALUES(`beating`), `last_beating`= VALUES(`last_beating`), `updated`= VALUES(`updated`)" @@ -96,15 +95,9 @@ func (oDb *DB) HBDeleteOutDatedByClusterID(ctx context.Context, clusterID string query = "" + "DELETE FROM `hbmon` WHERE `cluster_id` = ? AND `updated` < ?" ) - _, err := oDb.DB.ExecContext(ctx, query, clusterID, maxTime) - result, err := oDb.DB.ExecContext(ctx, query, clusterID, maxTime) - if err != nil { + if count, err := oDb.execCountContext(ctx, query, clusterID, maxTime); err != nil { return fmt.Errorf("hbDeleteOutDatedByClusterID: %w", err) - } - if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("hbDeleteOutDatedByClusterID count affected: %w", err) - } else if affected > 0 { - slog.Debug(fmt.Sprintf("hbDeleteOutDatedByClusterID %s %d", clusterID, affected)) + } else if count > 0 { oDb.SetChange("hbmon") } return nil @@ -153,7 +146,7 @@ func (oDb *DB) HBLogUpdate(ctx context.Context, hb DBHeartbeat) error { "INSERT INTO `hbmon_log_last` (`cluster_id`, `node_id`, `peer_node_id`, `name`, `state`, `beating`, `begin`, `end`) " + " VALUES (?, ?, ?, ?, ?, ?, NOW(), NOW())" + " ON DUPLICATE KEY UPDATE " + - " `cluster_id`= VALUES(`cluster_id`), `state` = VALUES(`state`), `beating` = VALUES(`beating`), `end` = VALUES(`begin`), `end` = VALUES(`end`)" + " `cluster_id`= VALUES(`cluster_id`), `state` = VALUES(`state`), `beating` = VALUES(`beating`), `begin` = VALUES(`begin`), `end` = VALUES(`end`)" queryExtendIntervalOfCurrent = "" + "UPDATE `hbmon_log_last` SET `end` = NOW() " + diff --git a/cdb/db_instances.go b/cdb/db_instances.go index 359c656..8f94718 100644 --- a/cdb/db_instances.go +++ b/cdb/db_instances.go @@ -226,8 +226,7 @@ func (oDb *DB) InstancePing(ctx context.Context, svcID, nodeID string) (updates " AND (`res_end` < DATE_SUB(NOW(), INTERVAL 30 SECOND) OR `res_end` is NULL)" ) var ( - count int64 - result sql.Result + count int64 ) begin := time.Now() err = oDb.DB.QueryRowContext(ctx, qHasInstance, svcID, nodeID).Scan(&count) @@ -243,9 +242,7 @@ func (oDb *DB) InstancePing(ctx context.Context, svcID, nodeID string) (updates } begin = time.Now() - if result, err = oDb.DB.ExecContext(ctx, qUpdateSvcmon, svcID, nodeID); err != nil { - return - } else if count, err = result.RowsAffected(); err != nil { + if count, err = oDb.execCountContext(ctx, qUpdateSvcmon, svcID, nodeID); err != nil { return } else if count == 0 { return @@ -261,9 +258,7 @@ func (oDb *DB) InstancePing(ctx context.Context, svcID, nodeID string) (updates slog.Debug(fmt.Sprintf("instancePing qUpdateSvcmonLogLast %s", time.Since(begin))) begin = time.Now() - if result, err = oDb.DB.ExecContext(ctx, qUpdateResmon, svcID, nodeID); err != nil { - return - } else if count, err = result.RowsAffected(); err != nil { + if count, err = oDb.execCountContext(ctx, qUpdateResmon, svcID, nodeID); err != nil { return } else if count == 0 { return @@ -300,13 +295,10 @@ func (oDb *DB) InstancePingFromNodeID(ctx context.Context, nodeID string) (updat WHERE node_id = ? AND res_end < DATE_SUB(NOW(), INTERVAL 30 SECOND)` ) var ( - count int64 - result sql.Result + count int64 ) - if result, err = oDb.DB.ExecContext(ctx, qUpdateSvcmon, nodeID); err != nil { - return - } else if count, err = result.RowsAffected(); err != nil { + if count, err = oDb.execCountContext(ctx, qUpdateSvcmon, nodeID); err != nil { return } else if count == 0 { return @@ -318,9 +310,7 @@ func (oDb *DB) InstancePingFromNodeID(ctx context.Context, nodeID string) (updat return } - if result, err = oDb.DB.ExecContext(ctx, qUpdateResmon, nodeID); err != nil { - return - } else if count, err = result.RowsAffected(); err != nil { + if count, err = oDb.execCountContext(ctx, qUpdateResmon, nodeID); err != nil { return } else if count == 0 { return @@ -337,13 +327,9 @@ func (oDb *DB) InstanceDeleteStatus(ctx context.Context, svcID, nodeID string) e queryDelete = "" + "DELETE FROM `svcmon` WHERE `svc_id` = ? AND `node_id` = ?" ) - result, err := oDb.DB.ExecContext(ctx, queryDelete, svcID, nodeID) - if err != nil { + if count, err := oDb.execCountContext(ctx, queryDelete, svcID, nodeID); err != nil { return fmt.Errorf("instanceDeleteStatus %s@%s: %w", svcID, nodeID, err) - } - if changes, err := result.RowsAffected(); err != nil { - return fmt.Errorf("instanceDeleteStatus %s@%s can't get deleted count: %w", svcID, nodeID, err) - } else if changes > 0 { + } else if count > 0 { oDb.SetChange("svcmon") } return nil @@ -537,14 +523,10 @@ func (oDb *DB) InstanceResourcesDeleteObsolete(ctx context.Context, svcID, nodeI "DELETE FROM `resmon`" + "WHERE `svc_id` = ? AND `node_id` = ? AND `updated` < ?" ) - result, err := oDb.DB.ExecContext(ctx, queryPurge, svcID, nodeID, maxTime) - if err != nil { + if count, err := oDb.execCountContext(ctx, queryPurge, svcID, nodeID, maxTime); err != nil { return fmt.Errorf("InstanceResourcesDeleteObsolete: %w", err) - } - if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("InstanceResourcesDeleteObsolete count affected: %w", err) } else { - slog.Debug(fmt.Sprintf("InstanceResourcesDeleteObsolete %s@%s: %d", svcID, nodeID, affected)) + slog.Debug(fmt.Sprintf("InstanceResourcesDeleteObsolete %s@%s: %d", svcID, nodeID, count)) } return nil } @@ -585,10 +567,12 @@ func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID str for _, key := range info.Keys { if result, err := stmt.ExecContext(ctx, svcID, nodeID, info.Rid, key.Key, topology, key.Value); err != nil { return fmt.Errorf("db exec: %w", err) - } else if count, err := result.RowsAffected(); err != nil { - return fmt.Errorf("db rows affected: %w", err) - } else if count > 0 { - changed = true + } else if result != nil { + if count, err := result.RowsAffected(); err != nil { + return fmt.Errorf("db rows affected: %w", err) + } else if count > 0 { + changed = true + } } } } @@ -599,11 +583,9 @@ func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID str func (oDb *DB) InstanceResourceInfoDelete(ctx context.Context, svcID, nodeID string, maxTime time.Time) error { defer logDuration("InstanceResourceInfoDelete "+svcID+"@"+nodeID, time.Now()) const query = "DELETE FROM `resinfo` WHERE `svc_id` = ? AND `node_id` = ? AND `updated` < ?" - if result, err := oDb.DB.ExecContext(ctx, query, svcID, nodeID, maxTime); err != nil { + if count, err := oDb.execCountContext(ctx, query, svcID, nodeID, maxTime); err != nil { return fmt.Errorf("query %s: %w", query, err) - } else if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("query %s count row affected: %w", query, err) - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("resinfo") } return nil @@ -804,14 +786,10 @@ func (oDb *DB) PurgeInstance(ctx context.Context, id InstanceID) error { slog.Debug(fmt.Sprintf("purging %s", id)) for _, tableName := range tables { request := fmt.Sprintf("DELETE FROM %s WHERE `svc_id` = ? and `node_id` = ?", tableName) - result, err1 := oDb.DB.ExecContext(ctx, request, id.svcID, id.nodeID) - if err1 != nil { + if count, err1 := oDb.execCountContext(ctx, request, id.svcID, id.nodeID); err1 != nil { err = errors.Join(err, fmt.Errorf("delete from %s: %w", tableName, err1)) continue - } - if rowAffected, err1 := result.RowsAffected(); err1 != nil { - err = errors.Join(err, fmt.Errorf("count delete from %s: %w", tableName, err1)) - } else if rowAffected > 0 { + } else if count > 0 { slog.Debug(fmt.Sprintf("purged %s from table %s", id, tableName)) oDb.SetChange(tableName) } @@ -858,14 +836,10 @@ func (oDb *DB) LogInstancesNotUpdated(ctx context.Context) error { node_id from svcmon where mon_updated 0 { - slog.Debug(fmt.Sprintf("alert: instance outdated: %d", rowAffected)) + } else if count > 0 { + slog.Debug(fmt.Sprintf("alert: instance outdated: %d", count)) oDb.SetChange("log") } else { slog.Debug("alert: no instance outdated") diff --git a/cdb/db_nodes.go b/cdb/db_nodes.go index 26d216f..99a66ad 100644 --- a/cdb/db_nodes.go +++ b/cdb/db_nodes.go @@ -223,16 +223,14 @@ func (oDb *DB) NodeContainerUpdateFromParentNode(ctx context.Context, cName, cAp ` const queryWhere1 = ` WHERE nodename = ? AND app in (?, ?)` - result, err := oDb.DB.ExecContext(ctx, queryUpdate+queryWhere1, + count, err := oDb.execCountContext(ctx, queryUpdate+queryWhere1, pn.LocAddr, pn.LocCountry, pn.locZip, pn.LocCity, pn.LocBuilding, pn.LocFloor, pn.LocRoom, pn.LocRack, pn.Hv, pn.Enclosure, pn.EnclosureSlot, cName, pn.App, cApp) if err != nil { return err } - if count, err := result.RowsAffected(); err != nil { - return err - } else if count > 0 { + if count > 0 { oDb.SetChange("nodes") return nil } else { @@ -255,11 +253,7 @@ func (oDb *DB) NodeContainerUpdateFromParentNode(ctx context.Context, cName, cAp args = append(args, apps[i]) } queryWhere2 += `)` - result, err := oDb.DB.ExecContext(ctx, queryUpdate+queryWhere2, args...) - if err != nil { - return err - } - if count, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, queryUpdate+queryWhere2, args...); err != nil { return err } else if count > 0 { oDb.SetChange("nodes") @@ -294,10 +288,8 @@ func (oDb *DB) NodeUpdateClusterIDForNodeID(ctx context.Context, nodeID, cluster // found node with nodeID and clusterID return false, nil case sql.ErrNoRows: - if result, err := oDb.DB.ExecContext(ctx, queryUpdate, clusterID, nodeID); err != nil { + if count, err := oDb.execCountContext(ctx, queryUpdate, clusterID, nodeID); err != nil { return false, fmt.Errorf("NodeUpdateClusterIDForNodeID update: %w", err) - } else if count, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("NodeUpdateClusterIDForNodeID count updated: %w", err) } else if count > 0 { oDb.SetChange("nodes") return true, nil @@ -311,14 +303,10 @@ func (oDb *DB) NodeUpdateClusterIDForNodeID(ctx context.Context, nodeID, cluster func (oDb *DB) PurgeNodeHBAsOutdated(ctx context.Context) error { request := fmt.Sprintf("DELETE FROM `node_hba` WHERE `updated` < DATE_SUB(NOW(), INTERVAL 7 DAY)") - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { - slog.Info(fmt.Sprintf("purged %d entries from table node_hba", rowAffected)) + } else if count > 0 { + slog.Info(fmt.Sprintf("purged %d entries from table node_hba", count)) oDb.SetChange("node_hba") } return nil @@ -377,13 +365,9 @@ func (oDb *DB) AlertMACDup(ctx context.Context) error { dash_env = VALUES(dash_env), dash_updated = VALUES(dash_updated) ` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } @@ -391,13 +375,9 @@ func (oDb *DB) AlertMACDup(ctx context.Context) error { WHERE dash_type = "mac duplicate" AND dash_updated < DATE_SUB(NOW(), INTERVAL 1 DAY)` - result, err = oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("dashboard") } @@ -428,13 +408,9 @@ func (oDb *DB) UpdateVirtualAssets(ctx context.Context) error { m.mon_vmname=n2.nodename AND m.mon_vmtype IN ('ldom', 'hpvm', 'kvm', 'xen', 'vbox', 'ovm', 'esx', 'zone', 'lxc', 'jail', 'vz', 'srp') and m.mon_containerstatus IN ("up", "stdby up", "warn")` - result, err := oDb.DB.ExecContext(ctx, request) - if err != nil { - return err - } - if rowAffected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, request); err != nil { return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("nodes") } return nil @@ -489,13 +465,9 @@ func (oDb *DB) UpdateVirtualAsset(ctx context.Context, svcID, nodeID string) err WHERE n.nodename = source.vmname AND n.app IN (source.svc_app, source.node_app)` - result, err := oDb.DB.ExecContext(ctx, request, svcID, nodeID) - if err != nil { + if count, err := oDb.execCountContext(ctx, request, svcID, nodeID); err != nil { return err - } - if rowAffected, err := result.RowsAffected(); err != nil { - return err - } else if rowAffected > 0 { + } else if count > 0 { oDb.SetChange("nodes") } return nil diff --git a/cdb/db_object.go b/cdb/db_object.go index d94a179..d5834a3 100644 --- a/cdb/db_object.go +++ b/cdb/db_object.go @@ -284,19 +284,12 @@ func (oDb *DB) ObjectPing(ctx context.Context, svcID string) (updates bool, err const updateSvcLogLastSvc = "" + "UPDATE `services_log_last` SET `svc_end` = ? WHERE `svc_id`= ? " var ( - now = time.Now() - result sql.Result - rowAffected int64 + now = time.Now() + count int64 ) - result, err = oDb.DB.ExecContext(ctx, UpdateServicesSvcStatusUpdated, svcID) - if err != nil { + if count, err = oDb.execCountContext(ctx, UpdateServicesSvcStatusUpdated, svcID); err != nil { return - } - rowAffected, err = result.RowsAffected() - if err != nil { - return - } - if rowAffected == 0 { + } else if count == 0 { return } @@ -451,16 +444,11 @@ func (oDb *DB) InsertOrUpdateObjectConfig(ctx context.Context, c *DBObjectConfig if c.Env != nil { env = *c.Env } - result, err := oDb.DB.ExecContext(ctx, query, c.Name, c.ClusterID, c.SvcID, nodes, drpNode, drpNodes, - app, env, c.Comment, c.FlexMin, c.FlexMax, c.HA, c.Config) + count, err := oDb.execCountContext(ctx, query, c.Name, c.ClusterID, c.SvcID, nodes, drpNode, drpNodes, app, env, c.Comment, c.FlexMin, c.FlexMax, c.HA, c.Config) if err != nil { return false, fmt.Errorf("update services config: %w", err) } - if affected, err := result.RowsAffected(); err != nil { - return false, fmt.Errorf("update services config unable to count row affected: %w", err) - } else { - return affected > 0, nil - } + return count > 0, nil } // ObjectIDFindOrCreate ensures a unique svc_id exists for a given svcname and clusterID, creating it if necessary. @@ -487,6 +475,10 @@ func (oDb *DB) ObjectIDFindOrCreate(ctx context.Context, svcname, clusterID stri err = fmt.Errorf("INSERT IGNORE INTO `service_ids`: %w", err) _ = tx.Rollback() return + } else if result == nil { + err = fmt.Errorf("INSERT IGNORE INTO `service_ids` returned nil result") + _ = tx.Rollback() + return } if rowsAffected, err = result.RowsAffected(); err != nil { err = fmt.Errorf("count row affected for INSERT IGNORE INTO `service_ids`: %w", err) @@ -523,14 +515,10 @@ func (oDb *DB) PurgeTablesFromObjectID(ctx context.Context, id string) error { slog.Debug(fmt.Sprintf("purging object %s", id)) for _, tableName := range tables { request := fmt.Sprintf("DELETE FROM %s WHERE `svc_id` = ?", tableName) - result, err1 := oDb.DB.ExecContext(ctx, request, id) - if err1 != nil { + if count, err1 := oDb.execCountContext(ctx, request, id); err1 != nil { err = errors.Join(err, fmt.Errorf("delete from %s: %w", tableName, err1)) continue - } - if rowAffected, err1 := result.RowsAffected(); err1 != nil { - err = errors.Join(err, fmt.Errorf("count delete from %s: %w", tableName, err1)) - } else if rowAffected > 0 { + } else if count > 0 { slog.Debug(fmt.Sprintf("purged table %s object %s", tableName, id)) oDb.SetChange(tableName) } @@ -563,7 +551,7 @@ func (oDb *DB) ObjectsOutdated(ctx context.Context) (objects []ObjectMeta, err e return } -func (oDb *DB) ObjectUpdateStatusSimple(ctx context.Context, objects []ObjectMeta, availStatus, overallStatus string) (n int64, err error) { +func (oDb *DB) ObjectUpdateStatusSimple(ctx context.Context, objects []ObjectMeta, availStatus, overallStatus string) (int64, error) { idsLen := len(objects) sql := `UPDATE services SET svc_status=?, svc_availstatus=?, svc_status_updated=NOW() @@ -577,13 +565,12 @@ func (oDb *DB) ObjectUpdateStatusSimple(ctx context.Context, objects []ObjectMet args[i+2] = o.ID } - result, err := oDb.DB.ExecContext(ctx, sql, args...) - if err != nil { + if count, err := oDb.execCountContext(ctx, sql, args...); err != nil { return 0, err - } - n, err = result.RowsAffected() - if err == nil && n > 0 { + } else if count > 0 { oDb.Session.SetChanges("services") + return count, nil + } else { + return 0, nil } - return n, err } diff --git a/cdb/db_packages.go b/cdb/db_packages.go index 4dea254..3a72af9 100644 --- a/cdb/db_packages.go +++ b/cdb/db_packages.go @@ -7,11 +7,9 @@ func (oDb *DB) PurgePackagesOutdated(ctx context.Context) error { FROM packages WHERE pkg_updated < DATE_SUB(NOW(), INTERVAL 100 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("packages") } return nil @@ -22,11 +20,9 @@ func (oDb *DB) PurgePatchesOutdated(ctx context.Context) error { FROM patches WHERE patch_updated < DATE_SUB(NOW(), INTERVAL 100 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("patches") } return nil diff --git a/cdb/db_resources.go b/cdb/db_resources.go index dcbf227..7a68f7e 100644 --- a/cdb/db_resources.go +++ b/cdb/db_resources.go @@ -114,12 +114,9 @@ func (oDb *DB) ResourceUpdateStatus(ctx context.Context, resources []ResourceMet args[i+1] = resource.ID } - result, err := oDb.DB.ExecContext(ctx, sql, args...) - if err != nil { + if n, err = oDb.execCountContext(ctx, sql, args...); err != nil { return 0, err - } - n, err = result.RowsAffected() - if err == nil && n > 0 { + } else if n > 0 { oDb.Session.SetChanges("resmon") } return n, err @@ -130,11 +127,9 @@ func (oDb *DB) PurgeResmonOutdated(ctx context.Context) error { FROM resmon WHERE updated < DATE_SUB(NOW(), INTERVAL 1 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("resmon") } return nil diff --git a/cdb/db_storage.go b/cdb/db_storage.go index 5aaef75..ab56417 100644 --- a/cdb/db_storage.go +++ b/cdb/db_storage.go @@ -9,11 +9,9 @@ func (oDb *DB) PurgeStorArrayOutdated(ctx context.Context) error { WHERE array_model LIKE "vdisk%" AND array_updated < DATE_SUB(NOW(), INTERVAL 2 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("stor_array") } return nil @@ -27,11 +25,9 @@ func (oDb *DB) UpdateStorArrayDGQuota(ctx context.Context) error { join diskinfo di on sd.disk_id=di.disk_id join stor_array ar on (di.disk_arrayid=ar.array_name) join stor_array_dg dg on (di.disk_group=dg.dg_name and dg.array_id=ar.id)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { - return err - } else if affected, err := result.RowsAffected(); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("stor_array") } return nil diff --git a/cdb/db_svcdisks.go b/cdb/db_svcdisks.go index 1a0af93..e706d9c 100644 --- a/cdb/db_svcdisks.go +++ b/cdb/db_svcdisks.go @@ -9,11 +9,9 @@ func (oDb *DB) PurgeSvcdisksOutdated(ctx context.Context) error { FROM svcdisks WHERE disk_updated < DATE_SUB(NOW(), INTERVAL 2 DAY)` - if result, err := oDb.DB.ExecContext(ctx, query); err != nil { + if count, err := oDb.execCountContext(ctx, query); err != nil { return err - } else if affected, err := result.RowsAffected(); err != nil { - return err - } else if affected > 0 { + } else if count > 0 { oDb.SetChange("svcdisks") } return nil diff --git a/worker/job_feed_daemon_status.go b/worker/job_feed_daemon_status.go index c7c6bb6..a70040f 100644 --- a/worker/job_feed_daemon_status.go +++ b/worker/job_feed_daemon_status.go @@ -306,13 +306,24 @@ func (d *jobFeedDaemonStatus) dataToNodeHeartbeat(_ context.Context) error { } } for _, hb := range l { - if n := d.byNodename[hb.nodename]; n != nil { - hb.DBHeartbeat.NodeID = n.NodeID + hb.DBHeartbeat.ClusterID = d.clusterID + if hb.nodename != "" { + if n := d.byNodename[hb.nodename]; n != nil { + hb.DBHeartbeat.NodeID = n.NodeID + } else { + slog.Debug(fmt.Sprintf("dataToNodeHeartbeat: skipped because of unregistered node %s for %s", hb.nodename, hb)) + continue + } } - if n := d.byNodename[hb.peerNodename]; n != nil { - hb.DBHeartbeat.PeerNodeID = n.NodeID + + if hb.peerNodename != "" { + if n := d.byNodename[hb.peerNodename]; n != nil { + hb.DBHeartbeat.PeerNodeID = n.NodeID + } else { + slog.Debug(fmt.Sprintf("dataToNodeHeartbeat: skipped because of unregistered peer node %s for %s", hb.peerNodename, hb)) + continue + } } - hb.DBHeartbeat.ClusterID = d.clusterID slog.Debug(fmt.Sprintf("dataToNodeHeartbeat: found %s", hb)) d.heartbeats = append(d.heartbeats, hb) } @@ -325,7 +336,7 @@ func (d *jobFeedDaemonStatus) heartbeatToDB(ctx context.Context) error { for _, hb := range d.heartbeats { slog.Debug(fmt.Sprintf("inserting: %s", hb)) if err := d.oDb.HBUpdate(ctx, hb.DBHeartbeat); err != nil { - return fmt.Errorf("1 heartbeatToDB hbUpdate %s: %w", hb, err) + return fmt.Errorf("heartbeatToDB hbUpdate %s: %w", hb, err) } if err := d.oDb.HBLogUpdate(ctx, hb.DBHeartbeat); err != nil { return fmt.Errorf("heartbeatToDB hbLogUpdate %s: %w", hb, err) diff --git a/worker/job_feed_node_disk.go b/worker/job_feed_node_disk.go index 4b4d662..901e8b4 100644 --- a/worker/job_feed_node_disk.go +++ b/worker/job_feed_node_disk.go @@ -284,28 +284,24 @@ func (d *jobFeedNodeDisk) updateDB(ctx context.Context) error { Keys: []string{"disk_id", "svc_id", "node_id", "disk_dg"}, Data: data, } - if affected, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return fmt.Errorf("updateDB insert: %w", err) - } else if affected > 0 { + } else if count > 0 { d.oDb.SetChange("svcdisks") } query := "DELETE FROM `svcdisks` WHERE `node_id` = ? AND `disk_updated` < ?" - if result, err := d.db.ExecContext(ctx, query, nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, query, nodeID, now); err != nil { return fmt.Errorf("query %s: %w", query, err) - } else if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("query %s count row affected: %w", query, err) - } else if affected > 0 { + } else if count > 0 { d.oDb.SetChange("svcdisks") } // TODO: validate delete query query = "DELETE FROM `diskinfo` WHERE `disk_arrayid` = ? AND `disk_updated` < ?" - if result, err := d.db.ExecContext(ctx, query, nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, query, nodeID, now); err != nil { return fmt.Errorf("query %s: %w", query, err) - } else if affected, err := result.RowsAffected(); err != nil { - return fmt.Errorf("query %s count row affected: %w", query, err) - } else if affected > 0 { + } else if count > 0 { d.oDb.SetChange("diskinfo") } return nil diff --git a/worker/job_feed_system.go b/worker/job_feed_system.go index 9233adc..4c1fc88 100644 --- a/worker/job_feed_system.go +++ b/worker/job_feed_system.go @@ -56,10 +56,12 @@ func (d *jobFeedSystem) Operations() []operation { {desc: "system/hba", do: d.hba, condition: hasProp("hba"), blocking: true}, {desc: "system/targets", do: d.targets, condition: hasProp("targets"), blocking: true}, {desc: "system/package", do: d.pkg, condition: hasProp("package"), blocking: true}, + {desc: "system/pushFromTableChanges", do: d.pushFromTableChanges}, } } func (d *jobFeedSystem) pkg(ctx context.Context) error { + const tableName = "packages" pkgList, ok := d.data["package"].([]any) if !ok { slog.Warn("unsupported json format for packages") @@ -80,7 +82,7 @@ func (d *jobFeedSystem) pkg(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "packages", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "pkg_updated"}, @@ -95,15 +97,18 @@ func (d *jobFeedSystem) pkg(ctx context.Context) error { Data: pkgList, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(request.Table) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM packages WHERE node_id = ? AND pkg_updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM packages WHERE node_id = ? AND pkg_updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } + if err := d.oDb.DashboardUpdatePkgDiffForNode(ctx, nodeID); err != nil { return err } @@ -111,6 +116,7 @@ func (d *jobFeedSystem) pkg(ctx context.Context) error { } func (d *jobFeedSystem) targets(ctx context.Context) error { + const tableName = "stor_zone" data, ok := d.data["targets"].([]any) if !ok { slog.Warn("unsupported system targets data format") @@ -131,7 +137,7 @@ func (d *jobFeedSystem) targets(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "stor_zone", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "updated"}, @@ -142,20 +148,23 @@ func (d *jobFeedSystem) targets(ctx context.Context) error { Data: data, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM stor_zone WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM stor_zone WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) hba(ctx context.Context) error { + const tableName = "node_hba" data, ok := d.data["hba"].([]any) if !ok { slog.Warn("unsupported system hba data format") @@ -176,7 +185,7 @@ func (d *jobFeedSystem) hba(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "node_hba", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "updated"}, @@ -187,20 +196,24 @@ func (d *jobFeedSystem) hba(ctx context.Context) error { Data: data, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_hba WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM node_hba WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) lan(ctx context.Context) error { + const tableName = "node_ip" + var l []any data, ok := d.data["lan"].(map[string]any) if !ok { @@ -230,7 +243,7 @@ func (d *jobFeedSystem) lan(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "node_ip", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "updated"}, @@ -245,20 +258,24 @@ func (d *jobFeedSystem) lan(ctx context.Context) error { Data: l, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_ip WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM node_ip WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) groups(ctx context.Context) error { + const tableName = "node_groups" + data, ok := d.data["gids"].([]any) if !ok { slog.Warn("unsupported system groups data format") @@ -279,7 +296,7 @@ func (d *jobFeedSystem) groups(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "node_groups", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "updated"}, @@ -290,20 +307,23 @@ func (d *jobFeedSystem) groups(ctx context.Context) error { Data: data, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_groups WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM node_groups WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) users(ctx context.Context) error { + tableName := "node_users" data, ok := d.data["uids"].([]any) if !ok { slog.Warn("unsupported system users data format") @@ -324,7 +344,7 @@ func (d *jobFeedSystem) users(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "node_users", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "updated"}, @@ -335,20 +355,23 @@ func (d *jobFeedSystem) users(ctx context.Context) error { Data: data, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_users WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM node_users WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) hardware(ctx context.Context) error { + tableName := "node_hw" data, ok := d.data["hardware"].([]any) if !ok { slog.Warn("unsupported system hardware data format") @@ -368,7 +391,7 @@ func (d *jobFeedSystem) hardware(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "node_hw", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "node_id"}, mariadb.Mapping{To: "hw_type", From: "type"}, @@ -382,20 +405,23 @@ func (d *jobFeedSystem) hardware(ctx context.Context) error { Data: data, } - if _, err := request.QueryContext(ctx, d.db); err != nil { + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } - if rows, err := d.db.QueryContext(ctx, "DELETE FROM node_hw WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { + if count, err := d.oDb.ExecContextAndCountRowsAffected(ctx, "DELETE FROM node_hw WHERE node_id = ? AND updated < ?", nodeID, now); err != nil { return err - } else { - defer func() { _ = rows.Close() }() + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) } return nil } func (d *jobFeedSystem) properties(ctx context.Context) error { + tableName := "nodes" data, ok := d.data["properties"].(map[string]any) if !ok { slog.Warn("unsupported system properties format") @@ -420,7 +446,7 @@ func (d *jobFeedSystem) properties(ctx context.Context) error { } request := mariadb.InsertOrUpdate{ - Table: "nodes", + Table: tableName, Mappings: mariadb.Mappings{ mariadb.Mapping{To: "asset_env", Get: get, Optional: true}, mariadb.Mapping{To: "bios_version", Get: get}, @@ -468,9 +494,13 @@ func (d *jobFeedSystem) properties(ctx context.Context) error { Data: data, } - _, err := request.QueryContext(ctx, d.db) + if count, err := request.ExecContextAndCountRowsAffected(ctx, d.db); err != nil { + return err + } else if count > 0 { + d.oDb.Session.SetChanges(tableName) + } - return err + return nil } func (d *jobFeedSystem) getData(ctx context.Context) error {