Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ deps:
$(GOCMD) get -tool github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@latest

api:
$(GOGEN) ./apifeeder
$(GOGEN) ./api
$(GOGEN) ./feeder
$(GOGEN) ./server

clean:
$(GOCLEAN)
Expand Down
13 changes: 0 additions & 13 deletions apihandlers/log.go

This file was deleted.

4 changes: 2 additions & 2 deletions cdb/db_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"log/slog"
"time"

"github.com/opensvc/oc3/apifeeder"
"github.com/opensvc/oc3/feeder"
)

type (
Expand Down Expand Up @@ -549,7 +549,7 @@ func (oDb *DB) InstanceResourcesDeleteObsolete(ctx context.Context, svcID, nodeI
return nil
}

func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID string, data apifeeder.InstanceResourceInfo) error {
func (oDb *DB) InstanceResourceInfoUpdate(ctx context.Context, svcID, nodeID string, data feeder.InstanceResourceInfo) error {
defer logDuration("InstanceResourceInfoUpdate "+svcID+"@"+nodeID, time.Now())
const (
query = "" +
Expand Down
96 changes: 56 additions & 40 deletions cmd.go → cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"fmt"
Expand All @@ -13,13 +13,23 @@ import (

var (
debug bool

GroupIDSubsystems = "subsystems"
)

func NewGroupSubsystems() *cobra.Group {
return &cobra.Group{
ID: GroupIDSubsystems,
Title: "Subsystems:",
}
}

func cmdWorker() *cobra.Command {
var maxRunners int
cmd := &cobra.Command{
Use: "worker",
Short: "run jobs from a list of queues",
GroupID: GroupIDSubsystems,
Use: "worker",
Short: "run queued jobs",
RunE: func(cmd *cobra.Command, args []string) error {
if err := setup(); err != nil {
return err
Expand All @@ -28,90 +38,95 @@ func cmdWorker() *cobra.Command {
for _, q := range args {
queues = append(queues, cachekeys.QueuePrefix+q)
}
return work(maxRunners, queues)
return startWorker(maxRunners, queues)
},
}
cmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners")
return cmd
}

func cmdApiFeeder() *cobra.Command {
func cmdFeeder() *cobra.Command {
return &cobra.Command{
Use: "apifeeder",
Short: "serve the feeder api",
GroupID: GroupIDSubsystems,
Use: "feeder",
Short: "serve the data feed api to nodes",
RunE: func(cmd *cobra.Command, args []string) error {
if err := setup(); err != nil {
return err
}
return startApiFeeder()
return startFeeder()
},
}
}

func cmdApiCollector() *cobra.Command {
return &cobra.Command{
Use: "apicollector",
Short: "serve the collector api",
GroupID: GroupIDSubsystems,
Use: "server",
Short: "serve the collector api to clients",
RunE: func(cmd *cobra.Command, args []string) error {
if err := setup(); err != nil {
return err
}
return startApi()
return startServer()
},
}
}

func cmdSchedulerList() *cobra.Command {
func cmdScheduler() *cobra.Command {
return &cobra.Command{
Use: "list",
Short: "list the tasks",
GroupID: GroupIDSubsystems,
Use: "scheduler",
Short: "start running db maintenance tasks",
RunE: func(cmd *cobra.Command, args []string) error {
return scheduleList()
return startScheduler()
},
}
}

func cmdSchedulerExec() *cobra.Command {
var name string
cmd := &cobra.Command{
Use: "exec",
Short: "execute a task inline",
func cmdRunner() *cobra.Command {
return &cobra.Command{
GroupID: GroupIDSubsystems,
Use: "runner",
Short: "dispatch actions to nodes",
RunE: func(cmd *cobra.Command, args []string) error {
return scheduleExec(name)
return startRunner()
},
}
cmd.Flags().StringVar(&name, "name", "", "the task name")
return cmd
}

func cmdScheduler() *cobra.Command {
func cmdMessenger() *cobra.Command {
return &cobra.Command{
Use: "scheduler",
Short: "start running db maintenance tasks",
GroupID: GroupIDSubsystems,
Use: "messenger",
Short: "notify clients of data changes",
RunE: func(cmd *cobra.Command, args []string) error {
return schedule()
return startMessenger()
},
}
}

func cmdActions() *cobra.Command {
func cmdSchedulerList() *cobra.Command {
return &cobra.Command{
Use: "actions",
Short: "start the daemon dispatching actions to the agents",
Use: "list",
Short: "list the tasks",
RunE: func(cmd *cobra.Command, args []string) error {
return actions()
return scheduleList()
},
}
}

func cmdComet() *cobra.Command {
return &cobra.Command{
Use: "comet",
Short: "websocket group messaging daemon",
func cmdSchedulerExec() *cobra.Command {
var name string
cmd := &cobra.Command{
Use: "exec",
Short: "execute a task inline",
RunE: func(cmd *cobra.Command, args []string) error {
return cometRun()
return scheduleExec(name)
},
}
cmd.Flags().StringVar(&name, "name", "", "the task name")
return cmd
}

func cmdVersion() *cobra.Command {
Expand All @@ -125,26 +140,27 @@ func cmdVersion() *cobra.Command {
return cmd
}

func cmdRoot(args []string) *cobra.Command {
func Root(args []string) *cobra.Command {
cmd := &cobra.Command{
Use: filepath.Base(args[0]),
Short: "Manage the opensvc collector infrastructure components.",
SilenceUsage: true,
}
cmd.AddGroup(NewGroupSubsystems())
cmd.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug")
grpScheduler := cmdScheduler()
grpScheduler.AddCommand(
cmdSchedulerExec(),
cmdSchedulerList(),
)
cmd.AddCommand(
cmdApiFeeder(),
cmdFeeder(),
cmdApiCollector(),
grpScheduler,
cmdVersion(),
cmdWorker(),
cmdActions(),
cmdComet(),
cmdRunner(),
cmdMessenger(),
)
return cmd
}
Expand Down
36 changes: 18 additions & 18 deletions conf.go → cmd/conf.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"fmt"
Expand Down Expand Up @@ -29,16 +29,16 @@ func initConfig() error {
viper.AutomaticEnv()

// defaults
viper.SetDefault("listener_feed.addr", "127.0.0.1:8080")
viper.SetDefault("listener_feed.pprof.enable", false)
viper.SetDefault("listener_feed.metrics.enable", false)
viper.SetDefault("listener_feed.ui.enable", false)
viper.SetDefault("listener_feed.sync.timeout", "2s")
viper.SetDefault("listener_api.addr", "127.0.0.1:8081")
viper.SetDefault("listener_api.pprof.enable", false)
viper.SetDefault("listener_api.metrics.enable", false)
viper.SetDefault("listener_api.ui.enable", false)
viper.SetDefault("listener_api.sync.timeout", "2s")
viper.SetDefault("feeder.addr", "127.0.0.1:8080")
viper.SetDefault("feeder.pprof.enable", false)
viper.SetDefault("feeder.metrics.enable", false)
viper.SetDefault("feeder.ui.enable", false)
viper.SetDefault("feeder.sync.timeout", "2s")
viper.SetDefault("server.addr", "127.0.0.1:8081")
viper.SetDefault("server.pprof.enable", false)
viper.SetDefault("server.metrics.enable", false)
viper.SetDefault("server.ui.enable", false)
viper.SetDefault("server.sync.timeout", "2s")
viper.SetDefault("db.username", "opensvc")
viper.SetDefault("db.host", "127.0.0.1")
viper.SetDefault("db.port", "3306")
Expand All @@ -48,11 +48,11 @@ func initConfig() error {
viper.SetDefault("redis.address", "localhost:6379")
viper.SetDefault("redis.password", "")
viper.SetDefault("feeder.tx", true)
viper.SetDefault("websocket.key", "magix123")
viper.SetDefault("websocket.url", "http://127.0.0.1:8889")
viper.SetDefault("websocket.require_token", false)
viper.SetDefault("websocket.key_file", "")
viper.SetDefault("websocket.cert_file", "")
viper.SetDefault("messenger.key", "magix123")
viper.SetDefault("messenger.url", "http://127.0.0.1:8889")
viper.SetDefault("messenger.require_token", false)
viper.SetDefault("messenger.key_file", "")
viper.SetDefault("messenger.cert_file", "")
viper.SetDefault("worker.runners", 1)
viper.SetDefault("worker.pprof.uxsocket", "/var/run/oc3_worker_pprof.sock")
//viper.SetDefault("worker.pprof.addr", "127.0.0.1:9999")
Expand Down Expand Up @@ -85,7 +85,7 @@ func initConfig() error {

func newEv() *oc2websocket.T {
return &oc2websocket.T{
Url: viper.GetString("websocket.url"),
Key: []byte(viper.GetString("websocket.key")),
Url: viper.GetString("messenger.url"),
Key: []byte(viper.GetString("messenger.key")),
}
}
2 changes: 1 addition & 1 deletion db.go → cmd/db.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"database/sql"
Expand Down
34 changes: 17 additions & 17 deletions apifeeder.go → cmd/feeder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -10,18 +10,18 @@ import (
"github.com/shaj13/go-guardian/v2/auth/strategies/union"
"github.com/spf13/viper"

"github.com/opensvc/oc3/apifeeder"
"github.com/opensvc/oc3/apifeederhandlers"
"github.com/opensvc/oc3/feeder"
feederhandlers "github.com/opensvc/oc3/feeder/handlers"
"github.com/opensvc/oc3/xauth"
)

func startApiFeeder() error {
addr := viper.GetString("listener_feed.addr")
return listenAndServeApiFeeder(addr)
func startFeeder() error {
addr := viper.GetString("feeder.addr")
return listenAndServeFeeder(addr)
}

func listenAndServeApiFeeder(addr string) error {
enableUI := viper.GetBool("listener_feed.ui.enable")
func listenAndServeFeeder(addr string) error {
enableUI := viper.GetBool("feeder.ui.enable")

db, err := newDatabase()
if err != nil {
Expand All @@ -34,7 +34,7 @@ func listenAndServeApiFeeder(addr string) error {
e.HideBanner = true
e.HidePort = true

if viper.GetBool("listener_feed.pprof.enable") {
if viper.GetBool("feeder.pprof.enable") {
slog.Info("add handler /oc3/feed/api/public/pprof")
// TODO: move to authenticated path
pprof.Register(e, "/oc3/feed/api/public/pprof")
Expand All @@ -44,28 +44,28 @@ func listenAndServeApiFeeder(addr string) error {
xauth.NewPublicStrategy("/oc3/feed/api/public/", "/oc3/feed/api/docs", "/oc3/feed/api/version"),
xauth.NewBasicNode(db),
)
if viper.GetBool("listener_feed.metrics.enable") {
if viper.GetBool("feeder.metrics.enable") {
slog.Info("add handler /oc3/feed/api/public/metrics")
e.Use(echoprometheus.NewMiddleware("oc3_apifeeder"))
e.Use(echoprometheus.NewMiddleware("oc3_feeder"))
e.GET("/oc3/feed/api/public/metrics", echoprometheus.NewHandler())
}
e.Use(apifeederhandlers.AuthMiddleware(strategy))
e.Use(feederhandlers.AuthMiddleware(strategy))
slog.Info("register openapi handlers with base url: /oc3/feed/api")
apifeeder.RegisterHandlersWithBaseURL(e, &apifeederhandlers.Api{
feeder.RegisterHandlersWithBaseURL(e, &feederhandlers.Api{
DB: db,
Redis: redisClient,
UI: enableUI,
SyncTimeout: viper.GetDuration("listener_feed.sync.timeout"),
SyncTimeout: viper.GetDuration("feeder.sync.timeout"),
}, "/oc3/feed/api")
if enableUI {
registerAPIUI(e)
registerFeederUI(e)
}
slog.Info("listen on " + addr)
return e.Start(addr)
}

func registerAPIUI(e *echo.Echo) {
func registerFeederUI(e *echo.Echo) {
slog.Info("add handler /oc3/feed/api/docs/")
g := e.Group("/oc3/feed/api/docs")
g.Use(apifeederhandlers.UIMiddleware(context.Background()))
g.Use(feederhandlers.UIMiddleware(context.Background()))
}
Loading
Loading