From 8a6f9e8b3679f191dd94b7d9b269e803397a538b Mon Sep 17 00:00:00 2001 From: mprachar Date: Tue, 31 Mar 2026 00:22:37 +0000 Subject: [PATCH] fix: clean up stale/failed items from extract map to prevent memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three pre-existing leak paths cause unbounded memory growth over days: 1. EXTRACTFAILED items with exhausted retries have no matching case in checkExtractDone() — they stay in u.Map forever holding xtractr.Response 2. EXTRACTFAILED folder items with exhausted retries and positive DeleteAfter match no case in checkFolderStats() — stuck in both maps forever 3. Items stuck at EXTRACTED/EXTRACTING/QUEUED (e.g. Starr app never imports) have no TTL — they hold xtractr.Response indefinitely Fixes: - Add retries-exhausted cleanup case in handlers.go and folder.go - Add 24-hour stale item safety net for intermediate states - Add opt-in pprof debug endpoints (webserver.pprof config) for profiling Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/unpackerr/folder.go | 5 +++++ pkg/unpackerr/handlers.go | 12 ++++++++++++ pkg/unpackerr/start.go | 1 + pkg/unpackerr/webserver.go | 26 ++++++++++++++++++++++++++ 4 files changed, 44 insertions(+) diff --git a/pkg/unpackerr/folder.go b/pkg/unpackerr/folder.go index 0b5048aa..3080760b 100644 --- a/pkg/unpackerr/folder.go +++ b/pkg/unpackerr/folder.go @@ -582,6 +582,11 @@ func (u *Unpackerr) checkFolderStats(now time.Time) { folder.config.Path, folder.retries, u.MaxRetries, elapsed.Round(time.Second)) case EXTRACTFAILED == folder.status && folder.retries < u.MaxRetries: // This empty block is to avoid deleting an item that needs more retries. + case EXTRACTFAILED == folder.status && u.MaxRetries > 0 && folder.retries >= u.MaxRetries: + // Retries exhausted — clean up to prevent the item from staying in the map forever. + u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, now, true) + delete(u.folders.Folders, name) + u.Printf("[Folder] Retries exhausted (%d/%d), giving up: %s", folder.retries, u.MaxRetries, name) case folder.status > EXTRACTING && folder.config.DeleteAfter.Duration <= 0: // if DeleteAfter is 0 we don't delete anything. we are done. u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: nil}, now, false) diff --git a/pkg/unpackerr/handlers.go b/pkg/unpackerr/handlers.go index c00ed9b3..e93fd12c 100644 --- a/pkg/unpackerr/handlers.go +++ b/pkg/unpackerr/handlers.go @@ -191,6 +191,18 @@ func (u *Unpackerr) checkExtractDone(now time.Time) { item.Updated = now u.Printf("[%s] Extract failed %v ago, triggering restart (%d/%d): %v", item.App, elapsed.Round(time.Second), item.Retries, u.MaxRetries, name) + case item.Status == EXTRACTFAILED && u.MaxRetries > 0 && item.Retries >= u.MaxRetries: + // Retries exhausted — clean up to prevent the item from staying in the map forever. + u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: item.Resp}, now, true) + u.Printf("[%s] Retries exhausted (%d/%d), giving up: %v", + item.App, item.Retries, u.MaxRetries, name) + case (item.Status == EXTRACTED || item.Status == EXTRACTING || item.Status == QUEUED) && + elapsed >= staleItemTimeout: + // Safety net: items stuck at intermediate states for too long are cleaned up + // to prevent unbounded map growth (e.g. Starr app never imports the item). + u.updateQueueStatus(&newStatus{Name: name, Status: DELETED, Resp: item.Resp}, now, true) + u.Printf("[%s] Stale item removed after %v at status %s: %v", + item.App, elapsed.Round(time.Second), item.Status.Desc(), name) case item.Status == IMPORTED && elapsed >= item.DeleteDelay: var webhook bool diff --git a/pkg/unpackerr/start.go b/pkg/unpackerr/start.go index 9a3b0a8b..54180b44 100644 --- a/pkg/unpackerr/start.go +++ b/pkg/unpackerr/start.go @@ -33,6 +33,7 @@ const ( defaultStartDelay = time.Minute minimumDeleteDelay = time.Second defaultDeleteDelay = 5 * time.Minute + staleItemTimeout = 24 * time.Hour // Safety net: items stuck at intermediate states are cleaned up. defaultHistory = 10 // items kept in history. suffix = "_unpackerred" // suffix for unpacked folders. updateChanBuf = 100 // Size of xtractr callback update channels. diff --git a/pkg/unpackerr/webserver.go b/pkg/unpackerr/webserver.go index 6688b030..18c49881 100644 --- a/pkg/unpackerr/webserver.go +++ b/pkg/unpackerr/webserver.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/http" + "net/http/pprof" "path" "strings" @@ -15,6 +16,7 @@ import ( type WebServer struct { Metrics bool `json:"metrics" toml:"metrics" xml:"metrics" yaml:"metrics"` + Pprof bool `json:"pprof" toml:"pprof" xml:"pprof" yaml:"pprof"` LogFiles int `json:"logFiles" toml:"log_files" xml:"log_files" yaml:"logFiles"` LogFileMb int `json:"logFileMb" toml:"log_file_mb" xml:"log_file_mb" yaml:"logFileMb"` ListenAddr string `json:"listenAddr" toml:"listen_addr" xml:"listen_addr" yaml:"listenAddr"` @@ -89,6 +91,11 @@ func (u *Unpackerr) startWebServer() { func (u *Unpackerr) webRoutes() { u.Webserver.router.GET(path.Join(u.Webserver.URLBase, "/"), Index) + if u.Webserver.Pprof { + u.registerPprof() + u.Printf(" => WARNING: pprof debug endpoints enabled at /debug/pprof/") + } + if !u.Webserver.Metrics { return } @@ -102,6 +109,25 @@ func (u *Unpackerr) webRoutes() { } } +// registerPprof adds Go's built-in pprof handlers for runtime profiling. +// Access heap profiles at /debug/pprof/heap, goroutine dumps at /debug/pprof/goroutine, etc. +func (u *Unpackerr) registerPprof() { + wrap := func(h http.HandlerFunc) httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + h(w, r) + } + } + + u.Webserver.router.GET("/debug/pprof/", wrap(pprof.Index)) + u.Webserver.router.GET("/debug/pprof/cmdline", wrap(pprof.Cmdline)) + u.Webserver.router.GET("/debug/pprof/profile", wrap(pprof.Profile)) + u.Webserver.router.GET("/debug/pprof/symbol", wrap(pprof.Symbol)) + u.Webserver.router.GET("/debug/pprof/trace", wrap(pprof.Trace)) + u.Webserver.router.Handler(http.MethodGet, "/debug/pprof/heap", pprof.Handler("heap")) + u.Webserver.router.Handler(http.MethodGet, "/debug/pprof/goroutine", pprof.Handler("goroutine")) + u.Webserver.router.Handler(http.MethodGet, "/debug/pprof/allocs", pprof.Handler("allocs")) +} + // runWebServer starts the http or https listener. func (u *Unpackerr) runWebServer() { var err error