Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 148 additions & 28 deletions components/notebook-controller/controllers/culling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ const DEFAULT_IDLENESS_CHECK_PERIOD = "1"
const DEFAULT_ENABLE_CULLING = "false"
const DEFAULT_CLUSTER_DOMAIN = "cluster.local"
const DEFAULT_DEV = "false"
const DEFAULT_TRACK_TERMINAL_ACTIVITY = "false"

var CULL_IDLE_TIME = 0
var ENABLE_CULLING = false
var IDLENESS_CHECK_PERIOD = 0
var CLUSTER_DOMAIN = ""
var DEV = false
var TRACK_TERMINAL_ACTIVITY = false

// When a Resource should be stopped/culled, then the controller should add this
// annotation in the Resource's Metadata. Then, inside the reconcile loop,
Expand Down Expand Up @@ -67,6 +69,13 @@ type KernelStatus struct {
Connections int `json:"connections"`
}

// Each terminal of the Notebook Server has a status.
// TerminalStatus struct:
type TerminalStatus struct {
Name string `json:"name"`
LastActivity string `json:"last_activity"`
}

// CullingReconciler : Type of a reconciler that will be culling idle notebooks
type CullingReconciler struct {
client.Client
Expand Down Expand Up @@ -240,6 +249,48 @@ func getNotebookApiKernels(nm, ns string, log logr.Logger) []KernelStatus {
return kernels
}

func getNotebookApiTerminals(nm, ns string, log logr.Logger) []TerminalStatus {
// Get the Terminals' status from the Server's `/api/terminals` endpoint
client := &http.Client{
Timeout: time.Second * 10,
}

domain := GetEnvDefault("CLUSTER_DOMAIN", DEFAULT_CLUSTER_DOMAIN)
url := fmt.Sprintf(
"http://%s.%s.svc.%s/notebook/%s/%s/api/terminals",
nm, ns, domain, ns, nm)
if GetEnvDefault("DEV", DEFAULT_DEV) != "false" {
url = fmt.Sprintf(
"http://localhost:8001/api/v1/namespaces/%s/services/%s:http-%s/proxy/notebook/%s/%s/api/terminals",
ns, nm, nm, ns, nm)
}

resp, err := client.Get(url)
if err != nil {
log.Info(fmt.Sprintf("Could not GET terminals from %s: %v", url, err))
return nil
}

// Decode the body
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Info(fmt.Sprintf(
"Warning: GET to %s: %d (terminals endpoint may not be available for this notebook type)",
url, resp.StatusCode))
return nil
}

var terminals []TerminalStatus

err = json.NewDecoder(resp.Body).Decode(&terminals)
if err != nil {
log.Error(err, "Error parsing JSON response for Notebook API Terminals.")
return nil
}

return terminals
}

func allKernelsAreIdle(kernels []KernelStatus, log logr.Logger) bool {
// Iterate on the list of kernels' status.
// If all kernels are on execution_state=idle then this function returns true.
Expand All @@ -254,57 +305,118 @@ func allKernelsAreIdle(kernels []KernelStatus, log logr.Logger) bool {
return true
}

func getMostRecentKernelActivity(kernels []KernelStatus, log logr.Logger) (time.Time, error) {
// Get the most recent last_activity timestamp from all kernels
var recentTime time.Time

if len(kernels) == 0 {
return recentTime, nil
}

// Parse the first kernel's timestamp
kernelTime, err := time.Parse(time.RFC3339, kernels[0].LastActivity)
if err != nil {
log.Error(err, "Error parsing the last-activity from the /api/kernels")
return recentTime, err
}
recentTime = kernelTime

// Check remaining kernels for more recent activity
for i := 1; i < len(kernels); i++ {
kernelLastActivity, err := time.Parse(time.RFC3339, kernels[i].LastActivity)
if err != nil {
log.Error(err, "Error parsing the last-activity from the /api/kernels")
continue
}
if kernelLastActivity.After(recentTime) {
recentTime = kernelLastActivity
}
}

return recentTime, nil
}

func getMostRecentTerminalActivity(terminals []TerminalStatus, log logr.Logger) (time.Time, error) {
// Get the most recent last_activity timestamp from all terminals
var recentTime time.Time

if len(terminals) == 0 {
return recentTime, nil
}

// Check all terminals for most recent activity
for i := 0; i < len(terminals); i++ {
terminalLastActivity, err := time.Parse(time.RFC3339, terminals[i].LastActivity)
if err != nil {
log.Error(err, "Error parsing the last-activity from the /api/terminals")
continue
}
if recentTime.IsZero() || terminalLastActivity.After(recentTime) {
recentTime = terminalLastActivity
}
}

return recentTime, nil
}

// Update LAST_ACTIVITY_ANNOTATION
func updateNotebookLastActivityAnnotation(meta *metav1.ObjectMeta, log logr.Logger) {

log.Info("Updating the last-activity annotation. Checking /api/kernels")
nm, ns := meta.GetName(), meta.GetNamespace()
kernels := getNotebookApiKernels(nm, ns, log)
if kernels == nil {
log.Info("Could not GET the kernels status. Will not update last-activity.")

// Also check terminals if terminal tracking is enabled
var terminals []TerminalStatus
if TRACK_TERMINAL_ACTIVITY {
log.Info("Terminal tracking enabled. Checking /api/terminals")
terminals = getNotebookApiTerminals(nm, ns, log)
}

// If we can't get any activity data, don't update
if kernels == nil && terminals == nil {
log.Info("Could not GET kernels or terminals status. Will not update last-activity.")
return
} else if len(kernels) == 0 {
log.Info("Notebook has no kernels. Will not update last-activity")
}

// If both are empty (no kernels and no terminals), don't update
if len(kernels) == 0 && len(terminals) == 0 {
log.Info("Notebook has no kernels or terminals. Will not update last-activity")
return
}

updateTimestampFromKernelsActivity(meta, kernels, log)
updateTimestampFromActivity(meta, kernels, terminals, log)
}

func updateTimestampFromKernelsActivity(meta *metav1.ObjectMeta, kernels []KernelStatus, log logr.Logger) {
func updateTimestampFromActivity(meta *metav1.ObjectMeta, kernels []KernelStatus, terminals []TerminalStatus, log logr.Logger) {

if !allKernelsAreIdle(kernels, log) {
// At least on kernel is "busy" so the last-activity annotation should
// should be the current time.
// If any kernel is busy, update to current time
if len(kernels) > 0 && !allKernelsAreIdle(kernels, log) {
// At least one kernel is "busy" so the last-activity annotation should
// be the current time.
t := createTimestamp()
log.Info(fmt.Sprintf("Found a busy kernel. Updating the last-activity to %s", t))

meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
return
}

// Checking for the most recent kernel last_activity. The LAST_ACTIVITY_ANNOTATION
// should be the most recent kernel last-activity among the kernels.
recentTime, err := time.Parse(time.RFC3339, kernels[0].LastActivity)
if err != nil {
log.Error(err, "Error parsing the last-activity from the /api/kernels")
return
}
// Get the most recent activity from kernels and terminals using helper functions
kernelTime, _ := getMostRecentKernelActivity(kernels, log)
terminalTime, _ := getMostRecentTerminalActivity(terminals, log)

for i := 1; i < len(kernels); i++ {
kernelLastActivity, err := time.Parse(time.RFC3339, kernels[i].LastActivity)
if err != nil {
log.Error(err, "Error parsing the last-activity from the /api/kernels")
return
}
if kernelLastActivity.After(recentTime) {
recentTime = kernelLastActivity
}
// Use whichever timestamp is most recent
recentTime := kernelTime
if terminalTime.After(kernelTime) {
recentTime = terminalTime
}
t := recentTime.Format(time.RFC3339)

meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
log.Info(fmt.Sprintf("Successfully updated last-activity from latest kernel action, %s", t))
// Update the annotation with the most recent activity
if !recentTime.IsZero() {
t := recentTime.Format(time.RFC3339)
meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
log.Info(fmt.Sprintf("Successfully updated last-activity from latest kernel/terminal action, %s", t))
}
}

func updateLastCullingCheckTimestampAnnotation(meta *metav1.ObjectMeta, log logr.Logger) {
Expand Down Expand Up @@ -425,6 +537,14 @@ func initGlobalVars() error {
ENABLE_CULLING = true
}

trackTerminalActivity := GetEnvDefault("TRACK_TERMINAL_ACTIVITY", DEFAULT_TRACK_TERMINAL_ACTIVITY)
if trackTerminalActivity == "true" {
TRACK_TERMINAL_ACTIVITY = true
log.Info("Terminal activity tracking is ENABLED")
} else {
log.Info("Terminal activity tracking is DISABLED (only kernel activity will be tracked)")
}

CLUSTER_DOMAIN = GetEnvDefault("CLUSTER_DOMAIN", DEFAULT_CLUSTER_DOMAIN)

cullPeriod := GetEnvDefault("IDLENESS_CHECK_PERIOD", DEFAULT_IDLENESS_CHECK_PERIOD)
Expand Down
Loading