Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ For further information on what is supported by GitHub and what's managed by the

ARC documentation is available on [docs.github.com](https://docs.github.com/en/actions/hosting-your-own-runners/managing-self-hosted-runners-with-actions-runner-controller/quickstart-for-actions-runner-controller).

### Multi-Repository Listener Support

The listener now supports listening to multiple repositories from a single listener pod, which is ideal for individual users or small teams with multiple repositories. See [Multi-Repository Listener Guide](/docs/multi-repository-listener.md) for configuration details.

### Legacy documentation

The following documentation is for the legacy autoscaling modes that continue to be maintained by the community:
Expand Down
189 changes: 144 additions & 45 deletions cmd/ghalistener/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ type App struct {
logger logr.Logger

// initialized fields
listener Listener
worker Worker
metrics metrics.ServerExporter
listener Listener // Single listener (used in single-repo mode)
listeners []Listener // Multiple listeners (used in multi-repo mode)
worker Worker
metrics metrics.ServerExporter
}

//go:generate mockery --name Listener --output ./mocks --outpkg mocks --case underscore
Expand All @@ -46,11 +47,6 @@ func New(config config.Config) (*App, error) {
config: &config,
}

ghConfig, err := actions.ParseGitHubConfigFromURL(config.ConfigureUrl)
if err != nil {
return nil, fmt.Errorf("failed to parse GitHub config from URL: %w", err)
}

{
logger, err := config.Logger()
if err != nil {
Expand All @@ -59,25 +55,6 @@ func New(config config.Config) (*App, error) {
app.logger = logger.WithName("listener-app")
}

actionsClient, err := config.ActionsClient(app.logger)
if err != nil {
return nil, fmt.Errorf("failed to create actions client: %w", err)
}

if config.MetricsAddr != "" {
app.metrics = metrics.NewExporter(metrics.ExporterConfig{
ScaleSetName: config.EphemeralRunnerSetName,
ScaleSetNamespace: config.EphemeralRunnerSetNamespace,
Enterprise: ghConfig.Enterprise,
Organization: ghConfig.Organization,
Repository: ghConfig.Repository,
ServerAddr: config.MetricsAddr,
ServerEndpoint: config.MetricsEndpoint,
Metrics: config.Metrics,
Logger: app.logger.WithName("metrics exporter"),
})
}

worker, err := worker.New(
worker.Config{
EphemeralRunnerSetNamespace: config.EphemeralRunnerSetNamespace,
Expand All @@ -92,18 +69,121 @@ func New(config config.Config) (*App, error) {
}
app.worker = worker

listener, err := listener.New(listener.Config{
Client: actionsClient,
ScaleSetID: app.config.RunnerScaleSetId,
MinRunners: app.config.MinRunners,
MaxRunners: app.config.MaxRunners,
Logger: app.logger.WithName("listener"),
Metrics: app.metrics,
})
if err != nil {
return nil, fmt.Errorf("failed to create new listener: %w", err)
if config.IsMultiRepository() {
// Multi-repository mode
app.logger.Info("Initializing multi-repository mode", "repositories", len(config.Repositories))

for _, repoUrl := range config.Repositories {
ghConfig, err := actions.ParseGitHubConfigFromURL(repoUrl)
if err != nil {
return nil, fmt.Errorf("failed to parse GitHub config from URL %s: %w", repoUrl, err)
}

actionsClient, err := config.ActionsClientForURL(repoUrl, 0, app.logger)
if err != nil {
return nil, fmt.Errorf("failed to create actions client for %s: %w", repoUrl, err)
}

// Get or create scale set for this repository
scaleSetName := config.RunnerScaleSetName
if scaleSetName == "" {
scaleSetName = config.EphemeralRunnerSetName
}

app.logger.Info("Getting or creating scale set", "repository", repoUrl, "scaleSetName", scaleSetName)

scaleSet, err := actionsClient.GetRunnerScaleSet(context.Background(), 1, scaleSetName)
if err != nil {
// Try to create the scale set
app.logger.Info("Scale set not found, attempting to create", "repository", repoUrl, "scaleSetName", scaleSetName)
newScaleSet := &actions.RunnerScaleSet{
Name: scaleSetName,
RunnerGroupId: 1,
Labels: []actions.Label{{Type: "System", Name: "self-hosted"}},
RunnerSetting: actions.RunnerSetting{},
}
scaleSet, err = actionsClient.CreateRunnerScaleSet(context.Background(), newScaleSet)
if err != nil {
return nil, fmt.Errorf("failed to create scale set for %s: %w", repoUrl, err)
}
}

app.logger.Info("Using scale set", "repository", repoUrl, "scaleSetId", scaleSet.Id, "scaleSetName", scaleSet.Name)

// Create metrics exporter if configured
var metricsPublisher metrics.Publisher
if config.MetricsAddr != "" && app.metrics == nil {
// Only create one metrics server for all listeners
app.metrics = metrics.NewExporter(metrics.ExporterConfig{
ScaleSetName: config.EphemeralRunnerSetName,
ScaleSetNamespace: config.EphemeralRunnerSetNamespace,
Enterprise: ghConfig.Enterprise,
Organization: ghConfig.Organization,
Repository: ghConfig.Repository,
ServerAddr: config.MetricsAddr,
ServerEndpoint: config.MetricsEndpoint,
Metrics: config.Metrics,
Logger: app.logger.WithName("metrics exporter"),
})
metricsPublisher = app.metrics
}

// Create listener for this repository
listener, err := listener.New(listener.Config{
Client: actionsClient,
ScaleSetID: scaleSet.Id,
MinRunners: app.config.MinRunners,
MaxRunners: app.config.MaxRunners,
Logger: app.logger.WithName("listener").WithValues("repository", repoUrl, "scaleSetId", scaleSet.Id),
Metrics: metricsPublisher,
})
if err != nil {
return nil, fmt.Errorf("failed to create listener for %s: %w", repoUrl, err)
}

app.listeners = append(app.listeners, listener)
}

app.logger.Info("Multi-repository mode initialized", "listeners", len(app.listeners))
} else {
// Single repository mode (existing behavior)
ghConfig, err := actions.ParseGitHubConfigFromURL(config.ConfigureUrl)
if err != nil {
return nil, fmt.Errorf("failed to parse GitHub config from URL: %w", err)
}

actionsClient, err := config.ActionsClient(app.logger)
if err != nil {
return nil, fmt.Errorf("failed to create actions client: %w", err)
}

if config.MetricsAddr != "" {
app.metrics = metrics.NewExporter(metrics.ExporterConfig{
ScaleSetName: config.EphemeralRunnerSetName,
ScaleSetNamespace: config.EphemeralRunnerSetNamespace,
Enterprise: ghConfig.Enterprise,
Organization: ghConfig.Organization,
Repository: ghConfig.Repository,
ServerAddr: config.MetricsAddr,
ServerEndpoint: config.MetricsEndpoint,
Metrics: config.Metrics,
Logger: app.logger.WithName("metrics exporter"),
})
}

listener, err := listener.New(listener.Config{
Client: actionsClient,
ScaleSetID: app.config.RunnerScaleSetId,
MinRunners: app.config.MinRunners,
MaxRunners: app.config.MaxRunners,
Logger: app.logger.WithName("listener"),
Metrics: app.metrics,
})
if err != nil {
return nil, fmt.Errorf("failed to create new listener: %w", err)
}
app.listener = listener
}
app.listener = listener

app.logger.Info("app initialized")

Expand All @@ -115,7 +195,7 @@ func (app *App) Run(ctx context.Context) error {
if app.worker == nil {
errs = append(errs, fmt.Errorf("worker not initialized"))
}
if app.listener == nil {
if app.listener == nil && len(app.listeners) == 0 {
errs = append(errs, fmt.Errorf("listener not initialized"))
}
if err := errors.Join(errs...); err != nil {
Expand All @@ -125,12 +205,31 @@ func (app *App) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
metricsCtx, cancelMetrics := context.WithCancelCause(ctx)

g.Go(func() error {
app.logger.Info("Starting listener")
listnerErr := app.listener.Listen(ctx, app.worker)
cancelMetrics(fmt.Errorf("Listener exited: %w", listnerErr))
return listnerErr
})
if len(app.listeners) > 0 {
// Multi-repository mode: run all listeners
for i, listener := range app.listeners {
listenerIndex := i
listenerInstance := listener
g.Go(func() error {
app.logger.Info("Starting multi-repo listener", "listenerIndex", listenerIndex)
err := listenerInstance.Listen(ctx, app.worker)
if err != nil {
app.logger.Error(err, "Multi-repo listener exited with error", "listenerIndex", listenerIndex)
}
// Don't cancel metrics or other listeners if one fails
// Just log and return the error
return fmt.Errorf("listener %d failed: %w", listenerIndex, err)
})
}
} else {
// Single repository mode
g.Go(func() error {
app.logger.Info("Starting listener")
listenerErr := app.listener.Listen(ctx, app.worker)
cancelMetrics(fmt.Errorf("Listener exited: %w", listenerErr))
return listenerErr
})
}

if app.metrics != nil {
g.Go(func() error {
Expand Down
104 changes: 97 additions & 7 deletions cmd/ghalistener/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
)

type Config struct {
ConfigureUrl string `json:"configure_url"`
ConfigureUrl string `json:"configure_url"`
// Repositories is an optional list of repository URLs to listen to.
// If provided, the listener will create/get scale sets for each repository and listen to all of them.
// This is mutually exclusive with ConfigureUrl (if Repositories is set, ConfigureUrl is ignored).
Repositories []string `json:"repositories,omitempty"`
VaultType vault.VaultType `json:"vault_type"`
VaultLookupKey string `json:"vault_lookup_key"`
// If the VaultType is set to "azure_key_vault", this field must be populated.
Expand Down Expand Up @@ -100,16 +104,32 @@ func Read(ctx context.Context, configPath string) (*Config, error) {

// Validate checks the configuration for errors.
func (c *Config) Validate() error {
if len(c.ConfigureUrl) == 0 {
return fmt.Errorf("GitHubConfigUrl is not provided")
// If Repositories is provided, ConfigureUrl is not required
if len(c.Repositories) == 0 && len(c.ConfigureUrl) == 0 {
return fmt.Errorf("either GitHubConfigUrl or Repositories list must be provided")
}

if len(c.EphemeralRunnerSetNamespace) == 0 || len(c.EphemeralRunnerSetName) == 0 {
return fmt.Errorf("EphemeralRunnerSetNamespace %q or EphemeralRunnerSetName %q is missing", c.EphemeralRunnerSetNamespace, c.EphemeralRunnerSetName)
// If Repositories is provided, validate each URL
if len(c.Repositories) > 0 {
for _, repo := range c.Repositories {
if len(repo) == 0 {
return fmt.Errorf("empty repository URL in Repositories list")
}
}
// When using Repositories, RunnerScaleSetName should be provided
if len(c.RunnerScaleSetName) == 0 {
return fmt.Errorf("RunnerScaleSetName is required when using multi-repository mode")
}
// RunnerScaleSetId and ConfigureUrl are optional in multi-repo mode
} else {
// Single repository mode - require RunnerScaleSetId
if c.RunnerScaleSetId == 0 {
return fmt.Errorf(`RunnerScaleSetId "%d" is missing`, c.RunnerScaleSetId)
}
}

if c.RunnerScaleSetId == 0 {
return fmt.Errorf(`RunnerScaleSetId "%d" is missing`, c.RunnerScaleSetId)
if len(c.EphemeralRunnerSetNamespace) == 0 || len(c.EphemeralRunnerSetName) == 0 {
return fmt.Errorf("EphemeralRunnerSetNamespace %q or EphemeralRunnerSetName %q is missing", c.EphemeralRunnerSetNamespace, c.EphemeralRunnerSetName)
}

if c.MaxRunners < c.MinRunners {
Expand Down Expand Up @@ -153,6 +173,23 @@ func (c *Config) Logger() (logr.Logger, error) {
return logger, nil
}

// GetConfigureUrls returns the list of GitHub configuration URLs to listen to.
// If Repositories is set, returns those URLs; otherwise returns ConfigureUrl as a single-element slice.
func (c *Config) GetConfigureUrls() []string {
if len(c.Repositories) > 0 {
return c.Repositories
}
if len(c.ConfigureUrl) > 0 {
return []string{c.ConfigureUrl}
}
return []string{}
}

// IsMultiRepository returns true if the config specifies multiple repositories
func (c *Config) IsMultiRepository() bool {
return len(c.Repositories) > 0
}

func (c *Config) ActionsClient(logger logr.Logger, clientOptions ...actions.ClientOption) (*actions.Client, error) {
var creds actions.ActionsAuth
switch c.Token {
Expand Down Expand Up @@ -205,6 +242,59 @@ func (c *Config) ActionsClient(logger logr.Logger, clientOptions ...actions.Clie
return client, nil
}

// ActionsClientForURL creates an actions client for a specific GitHub URL
func (c *Config) ActionsClientForURL(configureUrl string, scaleSetId int, logger logr.Logger, clientOptions ...actions.ClientOption) (*actions.Client, error) {
var creds actions.ActionsAuth
switch c.Token {
case "":
creds.AppCreds = &actions.GitHubAppAuth{
AppID: c.AppID,
AppInstallationID: c.AppInstallationID,
AppPrivateKey: c.AppPrivateKey,
}
default:
creds.Token = c.Token
}

options := append([]actions.ClientOption{
actions.WithLogger(logger),
}, clientOptions...)

if c.ServerRootCA != "" {
systemPool, err := x509.SystemCertPool()
if err != nil {
return nil, fmt.Errorf("failed to load system cert pool: %w", err)
}
pool := systemPool.Clone()
ok := pool.AppendCertsFromPEM([]byte(c.ServerRootCA))
if !ok {
return nil, fmt.Errorf("failed to parse root certificate")
}

options = append(options, actions.WithRootCAs(pool))
}

proxyFunc := httpproxy.FromEnvironment().ProxyFunc()
options = append(options, actions.WithProxy(func(req *http.Request) (*url.URL, error) {
return proxyFunc(req.URL)
}))

client, err := actions.NewClient(configureUrl, &creds, options...)
if err != nil {
return nil, fmt.Errorf("failed to create actions client: %w", err)
}

client.SetUserAgent(actions.UserAgentInfo{
Version: build.Version,
CommitSHA: build.CommitSHA,
ScaleSetID: scaleSetId,
HasProxy: hasProxy(),
Subsystem: "ghalistener",
})

return client, nil
}

func hasProxy() bool {
proxyFunc := httpproxy.FromEnvironment().ProxyFunc()
return proxyFunc != nil
Expand Down
Loading