From 5ca00698b617146992572b6eb70e349a56506240 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Mon, 27 Nov 2023 14:55:40 +0100 Subject: [PATCH] WIP: Log streaming API --- cmd/icinga-kubernetes/main.go | 8 ++ config.example.yml | 8 ++ internal/config.go | 6 ++ pkg/api/config.go | 13 +++ pkg/api/log.go | 176 ++++++++++++++++++++++++++++++++++ schema/mysql/schema.sql | 4 +- 6 files changed, 213 insertions(+), 2 deletions(-) create mode 100644 pkg/api/config.go create mode 100644 pkg/api/log.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index a6bfde93..7f9376cb 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-go-library/driver" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-kubernetes/internal" + "github.com/icinga/icinga-kubernetes/pkg/api" "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/schema" "github.com/icinga/icinga-kubernetes/pkg/sync" @@ -157,6 +158,13 @@ func main() { return nodeMetricSync.Clean(ctx, forwardDeleteNodesToMetricChannel) }) + // stream log api + logStreamApi := api.NewLogStreamApi(k, logs.GetChildLogger("LogStreamApi"), &cfg.Api.Log) + + g.Go(func() error { + return logStreamApi.Stream(ctx) + }) + if err := g.Wait(); err != nil { logging.Fatal(errors.Wrap(err, "can't sync")) } diff --git a/config.example.yml b/config.example.yml index bb43e014..28d9b331 100644 --- a/config.example.yml +++ b/config.example.yml @@ -39,3 +39,11 @@ logging: # Valid units are "ms", "s", "m", "h". # Defaults to "20s". # interval: 20s + +api: + log: + # Address where the api is reachable + address: '/logStream' + + # Port where the api is reachable + port: 8080 diff --git a/internal/config.go b/internal/config.go index 60f6fb51..edf68e7b 100644 --- a/internal/config.go +++ b/internal/config.go @@ -3,12 +3,14 @@ package internal import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-kubernetes/pkg/api" ) // Config defines Icinga Kubernetes config. type Config struct { Database database.Config `yaml:"database"` Logging logging.Config `yaml:"logging"` + Api api.Config `yaml:"api"` } // Validate checks constraints in the supplied configuration and returns an error if they are violated. @@ -21,5 +23,9 @@ func (c *Config) Validate() error { return err } + if err := c.Api.Validate(); err != nil { + + } + return nil } diff --git a/pkg/api/config.go b/pkg/api/config.go new file mode 100644 index 00000000..d1dfa017 --- /dev/null +++ b/pkg/api/config.go @@ -0,0 +1,13 @@ +package api + +type Config struct { + Log LogStreamApiConfig `yaml:"log"` +} + +func (c *Config) Validate() error { + if err := c.Log.Validate(); err != nil { + return err + } + + return nil +} diff --git a/pkg/api/log.go b/pkg/api/log.go new file mode 100644 index 00000000..b5c3d6d5 --- /dev/null +++ b/pkg/api/log.go @@ -0,0 +1,176 @@ +package api + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "github.com/icinga/icinga-go-library/logging" + "github.com/pkg/errors" + "io" + v1 "k8s.io/api/core/v1" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "net" + "net/http" + "strconv" + "time" +) + +// LogStreamApiConfig stores config for log streaming api +type LogStreamApiConfig struct { + Address string `yaml:"address"` + Port int `yaml:"port"` +} + +// Validate validates LogStreamApiConfig +func (c *LogStreamApiConfig) Validate() error { + + if c.Address == "" { + return errors.New("address missing") + } + + if c.Port < 1 || c.Port > 65536 { + return errors.New("invalid port number") + } + + return nil +} + +// LogStreamApi streams log per http rest api +type LogStreamApi struct { + clientset *kubernetes.Clientset + logger *logging.Logger + config *LogStreamApiConfig +} + +// NewLogStreamApi creates new LogStreamApi initialized with clienset, logger and config +func NewLogStreamApi(clientset *kubernetes.Clientset, logger *logging.Logger, config *LogStreamApiConfig) *LogStreamApi { + return &LogStreamApi{ + clientset: clientset, + logger: logger, + config: config, + } +} + +// Handle returns HandlerFunc that handles the api +func (lsa *LogStreamApi) Handle() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + lsa.HandleHttp(w, r) + lsa.logger.Debug("Finished sending response...") + } +} + +// HandleHttp handles the api +func (lsa *LogStreamApi) HandleHttp(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + namespace := query.Get("namespace") + podName := query.Get("podName") + containerName := query.Get("containerName") + lastTimestampParam, err := strconv.ParseInt(query.Get("lastTimestamp"), 10, 64) + if err != nil { + fmt.Println("error getting last timestamp from url:", err) + } + + lastTimestamp := kmetav1.Time{Time: time.Unix(lastTimestampParam, 0)} + + ctx := r.Context() + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } + + podLogOpts := v1.PodLogOptions{ + Container: containerName, + Timestamps: true, + Follow: true, + SinceTime: &lastTimestamp, + } + + reader, err := lsa.clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts).Stream(ctx) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + flusher.Flush() + return + } + + // Send the initial headers saying we're gonna stream the response. + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + enc := json.NewEncoder(w) + + stringReader := bufio.NewReader(reader) + + lsa.logger.Debug("Connected") + + for { + select { + case <-ctx.Done(): + lsa.logger.Debug("Connection closed") + return + default: + msg, err := stringReader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + break + } + + // Send some data + err = enc.Encode(msg) + if err != nil { + lsa.logger.Fatal(err) + } + + flusher.Flush() + } + } +} + +// Stream starts a local webserver and provides the api +func (lsa *LogStreamApi) Stream(ctx context.Context) (err error) { + + mux := http.NewServeMux() + mux.Handle(lsa.config.Address, lsa.Handle()) + + srv := &http.Server{ + Addr: ":" + strconv.Itoa(lsa.config.Port), + Handler: mux, + BaseContext: func(net.Listener) context.Context { + return ctx + }, + } + + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + lsa.logger.Fatalf("listen:%+s\n", err) + } + }() + + lsa.logger.Info("Starting stream") + + select { + case <-ctx.Done(): + lsa.logger.Info("Stopped") + // + ctxShutDown, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := srv.Shutdown(ctxShutDown) + if err != nil { + lsa.logger.Fatalf("Shutdown Failed:%+s", err) + } + + lsa.logger.Info("Exited properly") + + if errors.Is(err, http.ErrServerClosed) { + err = nil + } + + return err + } +} diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 78802d45..50328dcb 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -43,8 +43,8 @@ CREATE TABLE log id BINARY(20) NOT NULL, reference_id BINARY(20) NOT NULL, container_name VARCHAR(255) COLLATE utf8mb4_unicode_ci NOT NULL, - time LONGTEXT NOT NULL, - log LONGTEXT NOT NULL, + time LONGTEXT, + log LONGTEXT, PRIMARY KEY (id) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4