Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slow-deer-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Expanded `admin profile` to collect PPROF profiles from LOOP Plugins. Added `-vitals` flag for more granular profiling.
146 changes: 121 additions & 25 deletions core/cmd/admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand All @@ -20,6 +22,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/utils"
"github.com/smartcontractkit/chainlink/v2/core/web"
"github.com/smartcontractkit/chainlink/v2/core/web/presenters"
)

Expand Down Expand Up @@ -65,6 +68,10 @@ func initAdminSubCmds(s *Shell) []cli.Command {
Usage: "output directory of the captured profile",
Value: "/tmp/",
},
cli.StringSliceFlag{
Name: "vitals, v",
Usage: "vitals to collect, can be specified multiple times. Options: 'allocs', 'block', 'cmdline', 'goroutine', 'heap', 'mutex', 'profile', 'threadcreate', 'trace'",
},
},
},
{
Expand Down Expand Up @@ -319,16 +326,13 @@ func (s *Shell) Status(c *cli.Context) error {
// Profile will collect pprof metrics and store them in a folder.
func (s *Shell) Profile(c *cli.Context) error {
ctx := s.ctx()
seconds := c.Uint("seconds")
seconds := c.Int("seconds")
baseDir := c.String("output_dir")

genDir := filepath.Join(baseDir, "debuginfo-"+time.Now().Format(time.RFC3339))

if err := os.Mkdir(genDir, 0o755); err != nil {
return s.errorOut(err)
}
var wgPprof sync.WaitGroup
vitals := []string{
vitals := c.StringSlice("vitals")
allVitals := []string{
"allocs", // A sampling of all past memory allocations
"block", // Stack traces that led to blocking on synchronization primitives
"cmdline", // The command line invocation of the current program
Expand All @@ -339,36 +343,120 @@ func (s *Shell) Profile(c *cli.Context) error {
"threadcreate", // Stack traces that led to the creation of new OS threads
"trace", // A trace of execution of the current program.
}
wgPprof.Add(len(vitals))
s.Logger.Infof("Collecting profiles: %v", vitals)
if len(vitals) == 0 {
vitals = slices.Clone(allVitals)
} else if slices.ContainsFunc(vitals, func(s string) bool { return !slices.Contains(allVitals, s) }) {
return fmt.Errorf("invalid vitals: must be from the set: %v", allVitals)
}

plugins, err := s.discoverPlugins(ctx)
if err != nil {
return s.errorOut(err)
}
var names []string
for _, group := range plugins {
if name := group.Labels[web.LabelMetaPluginName]; name != "" {
names = append(names, name)
}
}

if len(names) == 0 {
s.Logger.Infof("Collecting profiles: %v", vitals)
} else {
s.Logger.Infof("Collecting profiles from host and %d plugins: %v", len(names), vitals)
}
s.Logger.Infof("writing debug info to %s", genDir)

var wg sync.WaitGroup
errs := make([]error, len(names)+1)
wg.Add(len(names) + 1)
go func() {
defer wg.Done()
errs[0] = s.profile(ctx, genDir, "", vitals, seconds)
}()
for i, name := range names {
go func() {
defer wg.Done()
errs[i] = s.profile(ctx, genDir, name, vitals, seconds)
}()
}
wg.Wait()

err = errors.Join(errs...)
if err != nil {
return s.errorOut(err)
}
return nil
}
func (s *Shell) discoverPlugins(ctx context.Context) (
got []struct {
Targets []string `yaml:"targets"`
Labels map[string]string `yaml:"labels"`
},
err error,
) {
resp, err := s.HTTP.Get(ctx, "/discovery")
if err != nil {
return
}
defer func() {
if resp.Body != nil {
resp.Body.Close()
}
}()
data, err := io.ReadAll(resp.Body)
if err != nil {
return
}

if err = json.Unmarshal(data, &got); err != nil {
s.Logger.Errorf("failed to unmarshal discovery response: %s", string(data))
return
}
return
}

func (s *Shell) profile(ctx context.Context, genDir string, name string, vitals []string, seconds int) error {
lggr := s.Logger
path := "/v2"
if name != "" {
genDir = filepath.Join(genDir, "plugins", name)
path = "/plugins/" + name
lggr = lggr.With("plugin", name)
}
if err := os.MkdirAll(genDir, 0o755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}

errs := make(chan error, len(vitals))
var wgPprof sync.WaitGroup
wgPprof.Add(len(vitals))
for _, vt := range vitals {
go func(vt string) {
go func(ctx context.Context, vt string) {
defer wgPprof.Done()
uri := fmt.Sprintf("/v2/debug/pprof/%s?seconds=%d", vt, seconds)
ctx, cancel := context.WithTimeout(ctx, time.Duration(max(seconds, 0)+web.PPROFOverheadSeconds)*time.Second)
defer cancel()
uri := fmt.Sprintf(path+"/debug/pprof/%s?seconds=%d", vt, seconds)
resp, err := s.HTTP.Get(ctx, uri)
if err != nil {
errs <- fmt.Errorf("error collecting %s: %w", vt, err)
errs <- fmt.Errorf("error collecting %s: %w", uri, err)
return
}
defer func() {
if resp.Body != nil {
resp.Body.Close()
}
}()
if resp.StatusCode == http.StatusUnauthorized {
errs <- fmt.Errorf("error collecting %s: %w", vt, errUnauthorized)
switch {
case resp.StatusCode == http.StatusUnauthorized:
errs <- fmt.Errorf("error collecting %s: %w", uri, errUnauthorized)
return
}
if resp.StatusCode == http.StatusBadRequest {
// best effort to interpret the underlying problem
case resp.StatusCode == http.StatusBadRequest:
pprofVersion := resp.Header.Get("X-Go-Pprof")
if pprofVersion == "1" {
b, err2 := io.ReadAll(resp.Body)
if err2 != nil {
errs <- fmt.Errorf("error collecting %s: %w", vt, err2)
errs <- fmt.Errorf("error collecting %s: %w", uri, err2)
return
}
respContent := string(b)
Expand All @@ -377,38 +465,46 @@ func (s *Shell) Profile(c *cli.Context) error {
if strings.Contains(respContent, "profile duration exceeds server's WriteTimeout") {
errs <- fmt.Errorf("%w: %s", ErrProfileTooLong, respContent)
} else {
errs <- fmt.Errorf("error collecting %s: %w: %s", vt, errBadRequest, respContent)
errs <- fmt.Errorf("error collecting %s: %w: %s", uri, errBadRequest, respContent)
}
} else {
errs <- fmt.Errorf("error collecting %s: %w", vt, errBadRequest)
errs <- fmt.Errorf("error collecting %s: %w", uri, errBadRequest)
}
return
case resp.StatusCode < 200 || resp.StatusCode > 299:
body, rerr := io.ReadAll(resp.Body)
if rerr != nil {
errs <- fmt.Errorf("error collecting %s: status %d: error reading response: %w", uri, resp.StatusCode, rerr)
} else {
errs <- fmt.Errorf("error collecting %s: status %d: %s", uri, resp.StatusCode, string(body))
}
return
}
// write to file
f, err := os.Create(filepath.Join(genDir, vt))
if err != nil {
errs <- fmt.Errorf("error creating file for %s: %w", vt, err)
errs <- fmt.Errorf("error creating file for %s: %w", uri, err)
return
}
wc := utils.NewDeferableWriteCloser(f)
defer wc.Close()

_, err = io.Copy(wc, resp.Body)
if err != nil {
errs <- fmt.Errorf("error writing to file for %s: %w", vt, err)
errs <- fmt.Errorf("error writing to file for %s: %w", uri, err)
return
}
err = wc.Close()
if err != nil {
errs <- fmt.Errorf("error closing file for %s: %w", vt, err)
errs <- fmt.Errorf("error closing file for %s: %w", uri, err)
return
}
}(vt)
}(ctx, vt)
}
wgPprof.Wait()
close(errs)
// Atmost one err is emitted per vital.
s.Logger.Infof("collected %d/%d profiles", len(vitals)-len(errs), len(vitals))
// At most one err is emitted per vital.
lggr.Infof("collected %d/%d profiles", len(vitals)-len(errs), len(vitals))
if len(errs) > 0 {
var merr error
for err := range errs {
Expand Down
4 changes: 4 additions & 0 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command {
Usage: "output directory of the captured profile",
Value: "/tmp/",
},
cli.StringSliceFlag{
Name: "vitals, v",
Usage: "vitals to collect, can be specified multiple times. Options: 'allocs', 'block', 'cmdline', 'goroutine', 'heap', 'mutex', 'profile', 'threadcreate', 'trace'",
},
},
Hidden: true,
Before: func(_ *cli.Context) error {
Expand Down
Loading
Loading