Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions pkg/dataset/config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package dataset

import (
Expand All @@ -6,8 +10,11 @@ import (
"time"

"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/log"
"gopkg.in/yaml.v2"

"github.com/timescale/promscale/pkg/internal/day"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/rollup"
)

const (
Expand Down Expand Up @@ -38,16 +45,17 @@ type Config struct {

// Metrics contains dataset configuration options for metrics data.
type Metrics struct {
ChunkInterval DayDuration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set.
HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"`
HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"`
RetentionPeriod DayDuration `yaml:"default_retention_period"`
ChunkInterval day.Duration `yaml:"default_chunk_interval"`
Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set.
HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"`
HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
Rollup *rollup.Config `yaml:"rollup,omitempty"`
}

// Traces contains dataset configuration options for traces data.
type Traces struct {
RetentionPeriod DayDuration `yaml:"default_retention_period"`
RetentionPeriod day.Duration `yaml:"default_retention_period"`
}

// NewConfig creates a new dataset config based on the configuration YAML contents.
Expand All @@ -57,9 +65,15 @@ func NewConfig(contents string) (cfg Config, err error) {
}

// Apply applies the configuration to the database via the supplied DB connection.
func (c *Config) Apply(conn *pgx.Conn) error {
func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error {
c.applyDefaults()

if c.Metrics.Rollup != nil {
if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil {
return fmt.Errorf("error applying configuration for downsampling: %w", err)
}
}

log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval))
log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression))
log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh))
Expand Down Expand Up @@ -87,21 +101,21 @@ func (c *Config) Apply(conn *pgx.Conn) error {

func (c *Config) applyDefaults() {
if c.Metrics.ChunkInterval <= 0 {
c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval)
c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval)
}
if c.Metrics.Compression == nil {
c.Metrics.Compression = &defaultMetricCompressionVar
}
if c.Metrics.HALeaseRefresh <= 0 {
c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh)
c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh)
}
if c.Metrics.HALeaseTimeout <= 0 {
c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout)
c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout)
}
if c.Metrics.RetentionPeriod <= 0 {
c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod)
c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod)
}
if c.Traces.RetentionPeriod <= 0 {
c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod)
c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod)
}
}
33 changes: 17 additions & 16 deletions pkg/dataset/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/internal/day"
)

var testCompressionSetting = true
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestNewConfig(t *testing.T) {
default_retention_period: 3d2h`,
cfg: Config{
Metrics: Metrics{
RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour),
RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour),
},
},
},
Expand All @@ -61,14 +62,14 @@ traces:
default_retention_period: 15d`,
cfg: Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
},
},
Expand Down Expand Up @@ -97,29 +98,29 @@ func TestApplyDefaults(t *testing.T) {
t,
Config{
Metrics: Metrics{
ChunkInterval: DayDuration(defaultMetricChunkInterval),
ChunkInterval: day.Duration(defaultMetricChunkInterval),
Compression: &defaultMetricCompressionVar,
HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh),
HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout),
RetentionPeriod: DayDuration(defaultMetricRetentionPeriod),
HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh),
HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout),
RetentionPeriod: day.Duration(defaultMetricRetentionPeriod),
},
Traces: Traces{
RetentionPeriod: DayDuration(defaultTraceRetentionPeriod),
RetentionPeriod: day.Duration(defaultTraceRetentionPeriod),
},
},
c,
)

untouched := Config{
Metrics: Metrics{
ChunkInterval: DayDuration(3 * time.Hour),
ChunkInterval: day.Duration(3 * time.Hour),
Compression: &testCompressionSetting,
HALeaseRefresh: DayDuration(2 * time.Minute),
HALeaseTimeout: DayDuration(5 * time.Second),
RetentionPeriod: DayDuration(30 * 24 * time.Hour),
HALeaseRefresh: day.Duration(2 * time.Minute),
HALeaseTimeout: day.Duration(5 * time.Second),
RetentionPeriod: day.Duration(30 * 24 * time.Hour),
},
Traces: Traces{
RetentionPeriod: DayDuration(15 * 24 * time.Hour),
RetentionPeriod: day.Duration(15 * 24 * time.Hour),
},
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/dataset/duration.go → pkg/internal/day/duration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package dataset
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package day

import (
"fmt"
Expand All @@ -11,13 +15,13 @@ const (
unknownUnitDErrorPrefix = `time: unknown unit "d"`
)

// DayDuration acts like a time.Duration with support for "d" unit
// Duration acts like a time.Duration with support for "d" unit
// which is used for specifying number of days in duration.
type DayDuration time.Duration
type Duration time.Duration

// UnmarshalText unmarshals strings into DayDuration values while
// handling the day unit. It leans heavily into time.ParseDuration.
func (d *DayDuration) UnmarshalText(s []byte) error {
func (d *Duration) UnmarshalText(s []byte) error {
val, err := time.ParseDuration(string(s))
if err != nil {
// Check for specific error indicating we are using days unit.
Expand All @@ -30,7 +34,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error {
return err
}
}
*d = DayDuration(val)
*d = Duration(val)
return nil
}

Expand Down Expand Up @@ -61,6 +65,6 @@ func handleDays(s []byte) (time.Duration, error) {
}

// String returns a string value of DayDuration.
func (d DayDuration) String() string {
func (d Duration) String() string {
return time.Duration(d).String()
}
15 changes: 9 additions & 6 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Client struct {
}

// NewClient creates a new PostgreSQL client
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we add useRollups to cfg *Config? It reads a little strange that we have a config struct, but it doesn't have all the config options and we need extra parameters in the signature.

I haven't checked, but maybe we can't because we tied our configs, cli flags and their namespaces are structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and we can't because rollup comes from dataset

var (
err error
dbMaxConns int
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups)
if err != nil {
return client, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string {
}

// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) {
sigClose := make(chan struct{})
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
Expand All @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)

labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer())
dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups)
if err != nil {
return nil, fmt.Errorf("error starting querier: %w", err)
}

queryable := query.NewQueryable(dbQuerier, labelsReader)

dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{})
Expand All @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
writerConn = pgxconn.NewPgxConn(writerPool)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
if err != nil {
log.Error("msg", "err starting the ingestor", "err", err)
return nil, err
return nil, fmt.Errorf("error starting ingestor: %w", err)
}
}
if maintPool != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pgclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/ingestor/trace"
"github.com/timescale/promscale/pkg/rollup"
"github.com/timescale/promscale/pkg/version"
)

Expand All @@ -34,6 +35,7 @@ type Config struct {
DbConnectionTimeout time.Duration
IgnoreCompressedChunks bool
MetricsAsyncAcks bool
MetricsScrapeInterval time.Duration
TracesAsyncAcks bool
WriteConnections int
WriterPoolSize int
Expand Down Expand Up @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB")
fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created")
fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.")
fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval, "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.")
return cfg
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/pgmodel/metrics/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type Engine interface {
}

type metricsEngineImpl struct {
conn pgxconn.PgxConn
ctx context.Context
isRunning atomic.Value
metrics []metricQueryWrap
conn pgxconn.PgxConn
ctx context.Context
isRunning atomic.Value
metrics []metricQueryWrap
metricSeries []metricsWithSeries
}

// NewEngine creates an engine that performs database metrics evaluation every evalInterval.
Expand All @@ -33,9 +34,10 @@ type metricsEngineImpl struct {
// will cause evaluation errors.
func NewEngine(ctx context.Context, conn pgxconn.PgxConn) *metricsEngineImpl {
engine := &metricsEngineImpl{
conn: conn,
ctx: ctx,
metrics: metrics,
conn: conn,
ctx: ctx,
metrics: metrics,
metricSeries: metricSeries,
}
engine.isRunning.Store(false)
return engine
Expand Down Expand Up @@ -136,6 +138,13 @@ func (e *metricsEngineImpl) Update() error {
return err
}
handleResults(results, batchMetrics)

for _, ms := range e.metricSeries {
if err = ms.update(e.conn); err != nil {
log.Warn("msg", "error evaluating metrics with series", "err", err.Error())
continue // We shouldn't stop everything if this fails.
}
}
return results.Close()
}

Expand Down
66 changes: 66 additions & 0 deletions pkg/pgmodel/metrics/database/metric_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package database

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/util"
)

var (
caggsRefreshTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "caggs_refresh_total",
Help: "Total number of caggs policy executed.",
}, []string{"refresh_interval"})
caggsRefreshSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: util.PromNamespace,
Subsystem: "sql_database",
Name: "caggs_refresh_success",
Help: "Total number of caggs policy executed successfully.",
}, []string{"refresh_interval"})
)

func init() {
prometheus.MustRegister(caggsRefreshSuccess, caggsRefreshTotal)
}

type metricsWithSeries struct {
update func(conn pgxconn.PgxConn) error
}

var metricSeries = []metricsWithSeries{
{
update: func(conn pgxconn.PgxConn) error {
rows, err := conn.Query(context.Background(), `
SELECT
total_successes,
total_runs,
(config ->> 'refresh_interval')::INTERVAL
FROM timescaledb_information.jobs j
INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_refresh_policy')
`)
if err != nil {
return fmt.Errorf("error running instrumentation for execute_caggs_refresh_policy: %w", err)
}
defer rows.Close()
for rows.Next() {
var (
success, total int64
refreshInterval time.Duration
)
err = rows.Scan(&success, &total, &refreshInterval)
if err != nil {
return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err)
}
caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(success))
caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(total))
}
return nil
},
},
}
Loading