diff --git a/pkg/dataset/config.go b/pkg/dataset/config.go index 7fd0e96a61..469413491e 100644 --- a/pkg/dataset/config.go +++ b/pkg/dataset/config.go @@ -1,3 +1,7 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + package dataset import ( @@ -6,8 +10,11 @@ import ( "time" "github.com/jackc/pgx/v4" - "github.com/timescale/promscale/pkg/log" "gopkg.in/yaml.v2" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/rollup" ) const ( @@ -38,16 +45,17 @@ type Config struct { // Metrics contains dataset configuration options for metrics data. type Metrics struct { - ChunkInterval DayDuration `yaml:"default_chunk_interval"` - Compression *bool `yaml:"compress_data"` // Using pointer to check if the the value was set. - HALeaseRefresh DayDuration `yaml:"ha_lease_refresh"` - HALeaseTimeout DayDuration `yaml:"ha_lease_timeout"` - RetentionPeriod DayDuration `yaml:"default_retention_period"` + ChunkInterval day.Duration `yaml:"default_chunk_interval"` + Compression *bool `yaml:"compress_data"` // Using pointer to check if the value was set. + HALeaseRefresh day.Duration `yaml:"ha_lease_refresh"` + HALeaseTimeout day.Duration `yaml:"ha_lease_timeout"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` + Rollup *rollup.Config `yaml:"rollup,omitempty"` } // Traces contains dataset configuration options for traces data. type Traces struct { - RetentionPeriod DayDuration `yaml:"default_retention_period"` + RetentionPeriod day.Duration `yaml:"default_retention_period"` } // NewConfig creates a new dataset config based on the configuration YAML contents. @@ -57,9 +65,15 @@ func NewConfig(contents string) (cfg Config, err error) { } // Apply applies the configuration to the database via the supplied DB connection. -func (c *Config) Apply(conn *pgx.Conn) error { +func (c *Config) Apply(ctx context.Context, conn *pgx.Conn) error { c.applyDefaults() + if c.Metrics.Rollup != nil { + if err := c.Metrics.Rollup.Apply(ctx, conn); err != nil { + return fmt.Errorf("error applying configuration for downsampling: %w", err) + } + } + log.Info("msg", fmt.Sprintf("Setting metric dataset default chunk interval to %s", c.Metrics.ChunkInterval)) log.Info("msg", fmt.Sprintf("Setting metric dataset default compression to %t", *c.Metrics.Compression)) log.Info("msg", fmt.Sprintf("Setting metric dataset default high availability lease refresh to %s", c.Metrics.HALeaseRefresh)) @@ -87,21 +101,21 @@ func (c *Config) Apply(conn *pgx.Conn) error { func (c *Config) applyDefaults() { if c.Metrics.ChunkInterval <= 0 { - c.Metrics.ChunkInterval = DayDuration(defaultMetricChunkInterval) + c.Metrics.ChunkInterval = day.Duration(defaultMetricChunkInterval) } if c.Metrics.Compression == nil { c.Metrics.Compression = &defaultMetricCompressionVar } if c.Metrics.HALeaseRefresh <= 0 { - c.Metrics.HALeaseRefresh = DayDuration(defaultMetricHALeaseRefresh) + c.Metrics.HALeaseRefresh = day.Duration(defaultMetricHALeaseRefresh) } if c.Metrics.HALeaseTimeout <= 0 { - c.Metrics.HALeaseTimeout = DayDuration(defaultMetricHALeaseTimeout) + c.Metrics.HALeaseTimeout = day.Duration(defaultMetricHALeaseTimeout) } if c.Metrics.RetentionPeriod <= 0 { - c.Metrics.RetentionPeriod = DayDuration(defaultMetricRetentionPeriod) + c.Metrics.RetentionPeriod = day.Duration(defaultMetricRetentionPeriod) } if c.Traces.RetentionPeriod <= 0 { - c.Traces.RetentionPeriod = DayDuration(defaultTraceRetentionPeriod) + c.Traces.RetentionPeriod = day.Duration(defaultTraceRetentionPeriod) } } diff --git a/pkg/dataset/config_test.go b/pkg/dataset/config_test.go index e9f954abb4..42468bc2f0 100644 --- a/pkg/dataset/config_test.go +++ b/pkg/dataset/config_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/internal/day" ) var testCompressionSetting = true @@ -45,7 +46,7 @@ func TestNewConfig(t *testing.T) { default_retention_period: 3d2h`, cfg: Config{ Metrics: Metrics{ - RetentionPeriod: DayDuration(((3 * 24) + 2) * time.Hour), + RetentionPeriod: day.Duration(((3 * 24) + 2) * time.Hour), }, }, }, @@ -61,14 +62,14 @@ traces: default_retention_period: 15d`, cfg: Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, }, }, @@ -97,14 +98,14 @@ func TestApplyDefaults(t *testing.T) { t, Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(defaultMetricChunkInterval), + ChunkInterval: day.Duration(defaultMetricChunkInterval), Compression: &defaultMetricCompressionVar, - HALeaseRefresh: DayDuration(defaultMetricHALeaseRefresh), - HALeaseTimeout: DayDuration(defaultMetricHALeaseTimeout), - RetentionPeriod: DayDuration(defaultMetricRetentionPeriod), + HALeaseRefresh: day.Duration(defaultMetricHALeaseRefresh), + HALeaseTimeout: day.Duration(defaultMetricHALeaseTimeout), + RetentionPeriod: day.Duration(defaultMetricRetentionPeriod), }, Traces: Traces{ - RetentionPeriod: DayDuration(defaultTraceRetentionPeriod), + RetentionPeriod: day.Duration(defaultTraceRetentionPeriod), }, }, c, @@ -112,14 +113,14 @@ func TestApplyDefaults(t *testing.T) { untouched := Config{ Metrics: Metrics{ - ChunkInterval: DayDuration(3 * time.Hour), + ChunkInterval: day.Duration(3 * time.Hour), Compression: &testCompressionSetting, - HALeaseRefresh: DayDuration(2 * time.Minute), - HALeaseTimeout: DayDuration(5 * time.Second), - RetentionPeriod: DayDuration(30 * 24 * time.Hour), + HALeaseRefresh: day.Duration(2 * time.Minute), + HALeaseTimeout: day.Duration(5 * time.Second), + RetentionPeriod: day.Duration(30 * 24 * time.Hour), }, Traces: Traces{ - RetentionPeriod: DayDuration(15 * 24 * time.Hour), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, } diff --git a/pkg/dataset/duration.go b/pkg/internal/day/duration.go similarity index 77% rename from pkg/dataset/duration.go rename to pkg/internal/day/duration.go index 14d6b9bfaf..7d3546928b 100644 --- a/pkg/dataset/duration.go +++ b/pkg/internal/day/duration.go @@ -1,4 +1,8 @@ -package dataset +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package day import ( "fmt" @@ -11,13 +15,13 @@ const ( unknownUnitDErrorPrefix = `time: unknown unit "d"` ) -// DayDuration acts like a time.Duration with support for "d" unit +// Duration acts like a time.Duration with support for "d" unit // which is used for specifying number of days in duration. -type DayDuration time.Duration +type Duration time.Duration // UnmarshalText unmarshals strings into DayDuration values while // handling the day unit. It leans heavily into time.ParseDuration. -func (d *DayDuration) UnmarshalText(s []byte) error { +func (d *Duration) UnmarshalText(s []byte) error { val, err := time.ParseDuration(string(s)) if err != nil { // Check for specific error indicating we are using days unit. @@ -30,7 +34,7 @@ func (d *DayDuration) UnmarshalText(s []byte) error { return err } } - *d = DayDuration(val) + *d = Duration(val) return nil } @@ -61,6 +65,6 @@ func handleDays(s []byte) (time.Duration, error) { } // String returns a string value of DayDuration. -func (d DayDuration) String() string { +func (d Duration) String() string { return time.Duration(d).String() } diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e6747082c0..b53cfa4bfc 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -61,7 +61,7 @@ type Client struct { } // NewClient creates a new PostgreSQL client -func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) { +func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) { var ( err error dbMaxConns int @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche if err != nil { return nil, fmt.Errorf("err creating reader connection pool: %w", err) } - client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly) + client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups) if err != nil { return client, err } @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string { } // NewClientWithPool creates a new PostgreSQL client with an existing connection pool. -func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) { +func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) { sigClose := make(chan struct{}) metricsCache := cache.NewMetricCache(cfg.CacheConfig) labelsCache := cache.NewLabelsCache(cfg.CacheConfig) @@ -223,7 +223,11 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig) labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer()) - dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer()) + dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), cfg.MetricsScrapeInterval, useRollups) + if err != nil { + return nil, fmt.Errorf("error starting querier: %w", err) + } + queryable := query.NewQueryable(dbQuerier, labelsReader) dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{}) @@ -232,8 +236,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri writerConn = pgxconn.NewPgxConn(writerPool) dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c) if err != nil { - log.Error("msg", "err starting the ingestor", "err", err) - return nil, err + return nil, fmt.Errorf("error starting ingestor: %w", err) } } if maintPool != nil { diff --git a/pkg/pgclient/config.go b/pkg/pgclient/config.go index 8d03f5be4a..4a8be24b93 100644 --- a/pkg/pgclient/config.go +++ b/pkg/pgclient/config.go @@ -18,6 +18,7 @@ import ( "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/ingestor/trace" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/version" ) @@ -34,6 +35,7 @@ type Config struct { DbConnectionTimeout time.Duration IgnoreCompressedChunks bool MetricsAsyncAcks bool + MetricsScrapeInterval time.Duration TracesAsyncAcks bool WriteConnections int WriterPoolSize int @@ -107,6 +109,7 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config { fs.IntVar(&cfg.TracesMaxBatchSize, "tracing.max-batch-size", trace.DefaultBatchSize, "Maximum size of trace batch that is written to DB") fs.DurationVar(&cfg.TracesBatchTimeout, "tracing.batch-timeout", trace.DefaultBatchTimeout, "Timeout after new trace batch is created") fs.IntVar(&cfg.TracesBatchWorkers, "tracing.batch-workers", trace.DefaultBatchWorkers, "Number of workers responsible for creating trace batches. Defaults to number of CPUs.") + fs.DurationVar(&cfg.MetricsScrapeInterval, "metrics.rollup.scrape-interval", rollup.DefaultScrapeInterval, "Default scrape interval in Prometheus. This is used to estimate samples while choosing rollup for querying.") return cfg } diff --git a/pkg/pgmodel/metrics/database/database.go b/pkg/pgmodel/metrics/database/database.go index 56f128156c..35d5f87f7e 100644 --- a/pkg/pgmodel/metrics/database/database.go +++ b/pkg/pgmodel/metrics/database/database.go @@ -19,10 +19,11 @@ type Engine interface { } type metricsEngineImpl struct { - conn pgxconn.PgxConn - ctx context.Context - isRunning atomic.Value - metrics []metricQueryWrap + conn pgxconn.PgxConn + ctx context.Context + isRunning atomic.Value + metrics []metricQueryWrap + metricSeries []metricsWithSeries } // NewEngine creates an engine that performs database metrics evaluation every evalInterval. @@ -33,9 +34,10 @@ type metricsEngineImpl struct { // will cause evaluation errors. func NewEngine(ctx context.Context, conn pgxconn.PgxConn) *metricsEngineImpl { engine := &metricsEngineImpl{ - conn: conn, - ctx: ctx, - metrics: metrics, + conn: conn, + ctx: ctx, + metrics: metrics, + metricSeries: metricSeries, } engine.isRunning.Store(false) return engine @@ -136,6 +138,13 @@ func (e *metricsEngineImpl) Update() error { return err } handleResults(results, batchMetrics) + + for _, ms := range e.metricSeries { + if err = ms.update(e.conn); err != nil { + log.Warn("msg", "error evaluating metrics with series", "err", err.Error()) + continue // We shouldn't stop everything if this fails. + } + } return results.Close() } diff --git a/pkg/pgmodel/metrics/database/metric_series.go b/pkg/pgmodel/metrics/database/metric_series.go new file mode 100644 index 0000000000..fd84fc63ee --- /dev/null +++ b/pkg/pgmodel/metrics/database/metric_series.go @@ -0,0 +1,66 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/util" +) + +var ( + caggsRefreshTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_total", + Help: "Total number of caggs policy executed.", + }, []string{"refresh_interval"}) + caggsRefreshSuccess = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_refresh_success", + Help: "Total number of caggs policy executed successfully.", + }, []string{"refresh_interval"}) +) + +func init() { + prometheus.MustRegister(caggsRefreshSuccess, caggsRefreshTotal) +} + +type metricsWithSeries struct { + update func(conn pgxconn.PgxConn) error +} + +var metricSeries = []metricsWithSeries{ + { + update: func(conn pgxconn.PgxConn) error { + rows, err := conn.Query(context.Background(), ` +SELECT + total_successes, + total_runs, + (config ->> 'refresh_interval')::INTERVAL +FROM timescaledb_information.jobs j +INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_refresh_policy') + `) + if err != nil { + return fmt.Errorf("error running instrumentation for execute_caggs_refresh_policy: %w", err) + } + defer rows.Close() + for rows.Next() { + var ( + success, total int64 + refreshInterval time.Duration + ) + err = rows.Scan(&success, &total, &refreshInterval) + if err != nil { + return fmt.Errorf("error scanning values for execute_caggs_refresh_policy: %w", err) + } + caggsRefreshSuccess.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(success)) + caggsRefreshTotal.With(prometheus.Labels{"refresh_interval": refreshInterval.String()}).Set(float64(total)) + } + return nil + }, + }, +} diff --git a/pkg/pgmodel/metrics/database/metrics.go b/pkg/pgmodel/metrics/database/metrics.go index bf63cb5b79..cccd8c4f51 100644 --- a/pkg/pgmodel/metrics/database/metrics.go +++ b/pkg/pgmodel/metrics/database/metrics.go @@ -424,6 +424,48 @@ var metrics = []metricQueryWrap{ }, ), query: `select count(*)::bigint from _prom_catalog.metric`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_total", + Help: "Total number of caggs compression policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_compression_policy_success", + Help: "Total number of caggs compression policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_compression_policy')`, + }, { + metrics: gauges( + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_total", + Help: "Total number of caggs retention policy executed.", + }, + prometheus.GaugeOpts{ + Namespace: util.PromNamespace, + Subsystem: "sql_database", + Name: "caggs_retention_policy_success", + Help: "Total number of caggs retention policy executed successfully.", + }, + ), + query: ` +SELECT + total_runs, + total_successes +FROM timescaledb_information.jobs j + INNER JOIN timescaledb_information.job_stats js ON ( j.job_id = js.job_id AND j.proc_name = 'execute_caggs_retention_policy')`, }, } diff --git a/pkg/pgmodel/querier/querier.go b/pkg/pgmodel/querier/querier.go index d6951d08bf..12f19f62f0 100644 --- a/pkg/pgmodel/querier/querier.go +++ b/pkg/pgmodel/querier/querier.go @@ -6,17 +6,21 @@ package querier import ( "context" + "fmt" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/timescale/promscale/pkg/pgmodel/cache" "github.com/timescale/promscale/pkg/pgmodel/lreader" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" "github.com/timescale/promscale/pkg/tenancy" ) type pgxQuerier struct { - tools *queryTools + tools *queryTools + schema *rollup.Decider } var _ Querier = (*pgxQuerier)(nil) @@ -29,7 +33,9 @@ func NewQuerier( labelsReader lreader.LabelsReader, exemplarCache cache.PositionCache, rAuth tenancy.ReadAuthorizer, -) Querier { + scrapeInterval time.Duration, + useRollups bool, +) (Querier, error) { querier := &pgxQuerier{ tools: &queryTools{ conn: conn, @@ -39,7 +45,14 @@ func NewQuerier( rAuth: rAuth, }, } - return querier + if useRollups { + decider, err := rollup.NewDecider(context.Background(), conn, scrapeInterval) + if err != nil { + return nil, fmt.Errorf("error creating rollups schema decider: %w", err) + } + querier.schema = decider + } + return querier, nil } func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { @@ -47,7 +60,7 @@ func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier { } func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier { - return newQuerySamples(ctx, q) + return newQuerySamples(ctx, q, q.schema) } func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier { diff --git a/pkg/pgmodel/querier/querier_sql_test.go b/pkg/pgmodel/querier/querier_sql_test.go index ce43cddc1f..a8ee0c582f 100644 --- a/pkg/pgmodel/querier/querier_sql_test.go +++ b/pkg/pgmodel/querier/querier_sql_test.go @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}} + querier := pgxQuerier{ + &queryTools{ + conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()), + }, + nil, + } result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query) diff --git a/pkg/pgmodel/querier/query_remote_read.go b/pkg/pgmodel/querier/query_remote_read.go index 4644dbecc5..8ac59fffe6 100644 --- a/pkg/pgmodel/querier/query_remote_read.go +++ b/pkg/pgmodel/querier/query_remote_read.go @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro return nil, err } - qrySamples := newQuerySamples(q.ctx, q.pgxQuerier) + qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil) sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers) if err != nil { return nil, err diff --git a/pkg/pgmodel/querier/query_sample.go b/pkg/pgmodel/querier/query_sample.go index afd4ce0bf0..3127127b33 100644 --- a/pkg/pgmodel/querier/query_sample.go +++ b/pkg/pgmodel/querier/query_sample.go @@ -9,17 +9,20 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/common/errors" "github.com/timescale/promscale/pkg/pgmodel/common/schema" + "github.com/timescale/promscale/pkg/rollup" ) type querySamples struct { *pgxQuerier - ctx context.Context + ctx context.Context + schema *rollup.Decider } -func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples { - return &querySamples{qr, ctx} +func newQuerySamples(ctx context.Context, qr *pgxQuerier, schema *rollup.Decider) *querySamples { + return &querySamples{qr, ctx, schema} } // Select implements the SamplesQuerier interface. It is the entry point for our @@ -39,6 +42,24 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH return nil, nil, fmt.Errorf("get evaluation metadata: %w", err) } + var useRollups bool + if q.schema != nil { + // Querying via rollups is available. + supported := q.schema.SupportsRollup(metadata.metric) // Ensure that rollups for the given metric is supported. + if !supported { + // Rollups for the given metric wasn't supported. Let's refresh and check again. + if err := q.schema.Refresh(); err != nil { + log.Error("msg", "error refreshing schema decider", "error", err.Error()) + } + supported = q.schema.SupportsRollup(metadata.metric) // If supported is still false, then rollups really don't exist for 'metadata.metric'. + } + if supported { + schemaName := q.schema.Decide(mint, maxt) + useRollups = schemaName != rollup.DefaultSchema + } + } + _ = useRollups // To avoid unused error. This will be used in the following PRs for querying rollups. + filter := metadata.timeFilter if metadata.isSingleMetric { // Single vector selector case. diff --git a/pkg/rollup/config.go b/pkg/rollup/config.go new file mode 100644 index 0000000000..45a054bee4 --- /dev/null +++ b/pkg/rollup/config.go @@ -0,0 +1,98 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/jackc/pgx/v4" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/log" +) + +const ( + DefaultScrapeInterval = time.Second * 30 + setDefaultDownsampleStateSQL = "SELECT prom_api.set_automatic_downsample($1)" + + // short and long represent system resolutions. + short = "short" + long = "long" +) + +var ( + defaultDownsampleState = false + useDefaultResolution = false + systemResolution = map[string]Definition{ + short: { + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(90 * 24 * time.Hour), + }, + long: { + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), + }, + } +) + +type Config struct { + Enabled *bool `yaml:"enabled,omitempty"` + UseDefaultResolution *bool `yaml:"use_default_resolution"` + Resolutions `yaml:"resolutions,omitempty"` +} + +type Definition struct { + Resolution day.Duration `yaml:"resolution"` + Retention day.Duration `yaml:"retention"` + Delete bool `yaml:"delete"` +} + +type Resolutions map[string]Definition + +func (d *Config) Apply(ctx context.Context, conn *pgx.Conn) error { + d.applyDefaults() + + if containsSystemResolutions(d.Resolutions) { + return fmt.Errorf("'short' and 'long' are system resolutions. These cannot be applied as rollup labels") + } + + log.Info("msg", fmt.Sprintf("Setting automatic metric downsample to %t", *d.Enabled)) + if _, err := conn.Exec(context.Background(), setDefaultDownsampleStateSQL, d.Enabled); err != nil { + return err + } + + if *d.Enabled { + if *d.UseDefaultResolution { + d.Resolutions["short"] = systemResolution["short"] + d.Resolutions["long"] = systemResolution["long"] + } + if err := Sync(ctx, conn, d.Resolutions); err != nil { + return fmt.Errorf("ensure rollup with: %w", err) + } + } + return nil +} + +func (d *Config) applyDefaults() { + if d.Enabled == nil { + d.Enabled = &defaultDownsampleState + } + if d.UseDefaultResolution == nil { + d.UseDefaultResolution = &useDefaultResolution + } +} + +func containsSystemResolutions(r Resolutions) bool { + for k := range r { + k = strings.ToLower(k) + if k == short || k == long { + return true + } + } + return false +} diff --git a/pkg/rollup/decider.go b/pkg/rollup/decider.go new file mode 100644 index 0000000000..e570001a7b --- /dev/null +++ b/pkg/rollup/decider.go @@ -0,0 +1,177 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/pgxconn" +) + +const ( + DefaultSchema = "prom_data" + upperLimit = 5000 // Maximum samples allowed + refreshRollupInterval = time.Minute * 30 +) + +type rollupInfo struct { + schemaName string + refreshInterval time.Duration +} + +type Decider struct { + conn pgxconn.PgxConn + ctx context.Context + refreshMtx sync.RWMutex + + scrapeInterval time.Duration + downsamplingEnabled bool + + supportedMetrics map[string]struct{} + rollups []rollupInfo // {schemaName, refreshInterval} in ascending order of interval. Lesser the interval, more the granularity. +} + +func NewDecider(ctx context.Context, conn pgxconn.PgxConn, scrapeInterval time.Duration) (*Decider, error) { + helper := &Decider{ + ctx: ctx, + conn: conn, + scrapeInterval: scrapeInterval, + } + if err := helper.runRefreshRoutine(refreshRollupInterval); err != nil { + return nil, fmt.Errorf("refresh: %w", err) + } + return helper, nil +} + +// Decide returns the schema name of the rollups that should be used for querying. +// The returned schema represents a downsampled Prometheus data that should provide optimal +// granularity for querying. +// +// If no rollups exists or if downsampling is disabled, DefaultSchema (i.e., "prom_data") is returned. +func (h *Decider) Decide(minTs, maxTs int64) string { + h.refreshMtx.RLock() + defer h.refreshMtx.RUnlock() + + if !h.downsamplingEnabled || len(h.rollups) == 0 { + return DefaultSchema + } + estimateSamples := func(interval time.Duration) int64 { + return int64(float64(maxTs-minTs) / interval.Seconds()) + } + + numRawSamples := estimateSamples(h.scrapeInterval) + if numRawSamples < upperLimit { + return DefaultSchema + } + + for _, info := range h.rollups { + samples := estimateSamples(info.refreshInterval) // Interval between 2 samples. + if samples < upperLimit { + // h.rollups is sorted by interval. So, the first rollup that is below upper limit is our answer. + // This is because it gives the maximum granularity while being in acceptable limits. + return info.schemaName + } + } + // All rollups are above upper limit. Hence, send the schema of the highest interval so the granularity + // is minimum and we do not affect the performance of PromQL engine. + highestInterval := h.rollups[len(h.rollups)-1] + return highestInterval.schemaName +} + +func (h *Decider) SupportsRollup(metricName string) bool { + _, rollupExists := h.supportedMetrics[metricName] + return rollupExists +} + +func (h *Decider) Refresh() error { + h.refreshMtx.Lock() + defer h.refreshMtx.Unlock() + + if err := h.refreshDownsamplingState(); err != nil { + return fmt.Errorf("downsampling state: %w", err) + } + if err := h.refreshSupportedMetrics(); err != nil { + return fmt.Errorf("metric-type: %w", err) + } + if err := h.refreshRollup(); err != nil { + return fmt.Errorf("rollup: %w", err) + } + return nil +} + +func (h *Decider) runRefreshRoutine(refreshInterval time.Duration) error { + if err := h.Refresh(); err != nil { + return fmt.Errorf("refresh: %w", err) + } + go func() { + t := time.NewTicker(refreshInterval) + defer t.Stop() + for { + select { + case <-h.ctx.Done(): + return + case <-t.C: + } + if err := h.Refresh(); err != nil { + log.Error("msg", "error refreshing rollups", "error", err.Error()) + } + } + }() + return nil +} + +func (h *Decider) refreshDownsamplingState() error { + var state bool + if err := h.conn.QueryRow(h.ctx, "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil { + return fmt.Errorf("fetching automatic downsampling state: %w", err) + } + h.downsamplingEnabled = state + return nil +} + +const supportedMetricsQuery = `SELECT m.metric_name AS supported_metrics FROM _prom_catalog.metric_rollup mr INNER JOIN _prom_catalog.metric m ON mr.metric_id = m.id GROUP BY supported_metrics;` + +func (h *Decider) refreshSupportedMetrics() error { + rows, err := h.conn.Query(h.ctx, supportedMetricsQuery) + if err != nil { + return fmt.Errorf("fetching supported metrics for rollups: %w", err) + } + defer rows.Close() + + h.supportedMetrics = make(map[string]struct{}) // metric_name: metric_type + for rows.Next() { + var supportedMetric string + err = rows.Scan(&supportedMetric) + if err != nil { + return fmt.Errorf("error scanning the fetched supported metric: %w", err) + } + h.supportedMetrics[supportedMetric] = struct{}{} + } + return nil +} + +func (h *Decider) refreshRollup() error { + rows, err := h.conn.Query(h.ctx, "SELECT schema_name, resolution FROM _prom_catalog.rollup ORDER BY resolution ASC") + if err != nil { + return fmt.Errorf("fetching rollup: %w", err) + } + defer rows.Close() + h.rollups = []rollupInfo{} + for rows.Next() { + var ( + schemaName string + refreshInterval time.Duration + ) + if err = rows.Scan(&schemaName, &refreshInterval); err != nil { + return fmt.Errorf("error scanning rows: %w", err) + } + h.rollups = append(h.rollups, rollupInfo{schemaName: schemaName, refreshInterval: refreshInterval}) + } + return nil +} diff --git a/pkg/rollup/decider_test.go b/pkg/rollup/decider_test.go new file mode 100644 index 0000000000..7e6b88b7be --- /dev/null +++ b/pkg/rollup/decider_test.go @@ -0,0 +1,113 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDecider(t *testing.T) { + r := &Decider{ + scrapeInterval: DefaultScrapeInterval, + downsamplingEnabled: true, + rollups: []rollupInfo{ + {"5_minute", 5 * time.Minute}, + {"15_minute", 15 * time.Minute}, + {"1_hour", time.Hour}, + {"1_week", 7 * 24 * time.Hour}, + }, + } + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: DefaultSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: DefaultSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: DefaultSchema, + }, { + // DRY RUN + // ------- + // + // Assumed default scrape interval being 30 secs + // raw -> 2,880 <-- Falls in the acceptable range. + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: DefaultSchema, + }, + { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup intervals, num samples: + // 5 mins -> 2,016 <-- Falls in the acceptable range. + // 15 mins -> 672 + // 1 hour -> 168 + // 1 week -> 1 + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "5_minute", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "15_minute", + }, { + // DRY RUN on 500 - 5000 logic + // -------- + // + // Assumed default scrape interval being 30 secs + // raw -> 20,160 + // + // And, when using following rollup intervals, num samples: + // 5 mins -> 1,051,200 <-- Falls in the acceptable range. + // 15 mins -> 35,040 + // 1 hour -> 8,760 + // 1 week -> 52 + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "1_week", + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + schemaName := r.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, schemaName, tc.name) + }) + } +} diff --git a/pkg/rollup/rollup.go b/pkg/rollup/rollup.go new file mode 100644 index 0000000000..fe6a9134aa --- /dev/null +++ b/pkg/rollup/rollup.go @@ -0,0 +1,122 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package rollup + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v4" + + "github.com/timescale/promscale/pkg/internal/day" +) + +// Sync updates the rollups in the DB in accordance with the given resolutions. It handles: +// 1. Creating of new rollups +// 2. Deletion of rollups that have `delete: true` +// 3. Update retention duration of rollups that have same label name but different retention duration. If resolution of +// existing rollups are updated, an error is returned +func Sync(ctx context.Context, conn *pgx.Conn, r Resolutions) error { + rows, err := conn.Query(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup") + if err != nil { + return fmt.Errorf("querying existing resolutions: %w", err) + } + defer rows.Close() + + existingResolutions := make(Resolutions) + for rows.Next() { + var lName string + var resolution, retention time.Duration + if err := rows.Scan(&lName, &resolution, &retention); err != nil { + return fmt.Errorf("error scanning output rows for existing resolutions: %w", err) + } + existingResolutions[lName] = Definition{Resolution: day.Duration(resolution), Retention: day.Duration(retention)} + } + + if err := errOnResolutionMismatch(existingResolutions, r); err != nil { + return fmt.Errorf("error on existing resolution mismatch: %w", err) + } + + if err := updateExistingRollups(ctx, conn, existingResolutions, r); err != nil { + return fmt.Errorf("update existing rollups: %w", err) + } + + // Delete rollups that are no longer required. + if err = deleteRollups(ctx, conn, existingResolutions, r); err != nil { + return fmt.Errorf("delete rollups: %w", err) + } + + // Create new rollups. + if err = createRollups(ctx, conn, existingResolutions, r); err != nil { + return fmt.Errorf("create rollups: %w", err) + } + return nil +} + +// errOnResolutionMismatch returns an error if a given resolution exists in the DB with a different resolution duration. +func errOnResolutionMismatch(existing, r Resolutions) error { + for labelName, res := range r { + if oldRes, exists := existing[labelName]; exists { + if oldRes.Resolution != res.Resolution { + return fmt.Errorf("existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them") + } + } + } + return nil +} + +// updateExistingRollups updates the existing rollups retention if the new resolutions with a same name has +// different retention duration. +func updateExistingRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for labelName, res := range r { + if oldRes, exists := existingRes[labelName]; exists && oldRes.Retention != res.Retention { + batch.Queue("UPDATE _prom_catalog.rollup SET retention = $1 WHERE name = $2", time.Duration(res.Retention), labelName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error closing batch: %w", err) + } + } + return nil +} + +func createRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if !exists && !res.Delete { + batch.Queue("CALL _prom_catalog.create_rollup($1, $2, $3)", lName, time.Duration(res.Resolution), time.Duration(res.Retention)) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error creating new rollups: %w", err) + } + } + return nil +} + +func deleteRollups(ctx context.Context, conn *pgx.Conn, existingRes, r Resolutions) error { + var batch pgx.Batch + for lName, res := range r { + _, exists := existingRes[lName] + if exists && res.Delete { + // Delete the rollup only if it exists in the DB. + batch.Queue("CALL _prom_catalog.delete_rollup($1)", lName) + } + } + if batch.Len() > 0 { + results := conn.SendBatch(ctx, &batch) + if err := results.Close(); err != nil { + return fmt.Errorf("error deleting new rollups: %w", err) + } + } + return nil +} diff --git a/pkg/runner/client.go b/pkg/runner/client.go index 63dbd49f4c..af8164c21e 100644 --- a/pkg/runner/client.go +++ b/pkg/runner/client.go @@ -165,16 +165,18 @@ func CreateClient(r prometheus.Registerer, cfg *Config) (*pgclient.Client, error cfg.APICfg.MultiTenancy = multiTenancy } + var useRollups bool if cfg.DatasetConfig != "" { - err = ApplyDatasetConfig(conn, cfg.DatasetConfig) + cfg, err := ApplyDatasetConfig(context.Background(), conn, cfg.DatasetConfig) if err != nil { return nil, fmt.Errorf("error applying dataset configuration: %w", err) } + useRollups = *cfg.Metrics.Rollup.Enabled } // client has to be initiated after migrate since migrate // can change database GUC settings - client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly) + client, err := pgclient.NewClient(r, &cfg.PgmodelCfg, multiTenancy, leasingFunction, cfg.APICfg.ReadOnly, useRollups) if err != nil { return nil, fmt.Errorf("client creation error: %w", err) } @@ -226,13 +228,15 @@ func isBGWLessThanDBs(conn *pgx.Conn) (bool, error) { return false, nil } -func ApplyDatasetConfig(conn *pgx.Conn, cfgFilename string) error { +func ApplyDatasetConfig(ctx context.Context, conn *pgx.Conn, cfgFilename string) (*dataset.Config, error) { cfg, err := dataset.NewConfig(cfgFilename) if err != nil { - return err + return nil, err } - - return cfg.Apply(conn) + if err = cfg.Apply(ctx, conn); err != nil { + return nil, fmt.Errorf("error applying dataset config: %w", err) + } + return &cfg, nil } func compileAnchoredRegexString(s string) (*regexp.Regexp, error) { diff --git a/pkg/tests/constants.go b/pkg/tests/constants.go index e88e8aa959..463f757289 100644 --- a/pkg/tests/constants.go +++ b/pkg/tests/constants.go @@ -10,6 +10,8 @@ var ( PromscaleExtensionContainer string ) +const rollupsDBImage = "ghcr.io/timescale/dev_promscale_extension:rollups-development-ts2.8-pg14" + func init() { content, err := os.ReadFile("../../../EXTENSION_VERSION") if err != nil { @@ -17,5 +19,6 @@ func init() { } PromscaleExtensionVersion = strings.TrimSpace(string(content)) - PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + //PromscaleExtensionContainer = "ghcr.io/timescale/dev_promscale_extension:" + PromscaleExtensionVersion + "-ts2-pg14" + PromscaleExtensionContainer = rollupsDBImage // This will be removed once we plan to merge with master. } diff --git a/pkg/tests/end_to_end_tests/alerts_test.go b/pkg/tests/end_to_end_tests/alerts_test.go index 9f7902c670..4fbecab42c 100644 --- a/pkg/tests/end_to_end_tests/alerts_test.go +++ b/pkg/tests/end_to_end_tests/alerts_test.go @@ -47,7 +47,7 @@ func TestAlerts(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{ diff --git a/pkg/tests/end_to_end_tests/config_dataset_test.go b/pkg/tests/end_to_end_tests/config_dataset_test.go index 2468062add..12b186b5ca 100644 --- a/pkg/tests/end_to_end_tests/config_dataset_test.go +++ b/pkg/tests/end_to_end_tests/config_dataset_test.go @@ -9,6 +9,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/dataset" + "github.com/timescale/promscale/pkg/internal/day" ) func TestDatasetConfigApply(t *testing.T) { @@ -28,18 +29,18 @@ func TestDatasetConfigApply(t *testing.T) { cfg := dataset.Config{ Metrics: dataset.Metrics{ - ChunkInterval: dataset.DayDuration(4 * time.Hour), + ChunkInterval: day.Duration(4 * time.Hour), Compression: &disableCompression, - HALeaseRefresh: dataset.DayDuration(15 * time.Second), - HALeaseTimeout: dataset.DayDuration(2 * time.Minute), - RetentionPeriod: dataset.DayDuration(15 * 24 * time.Hour), + HALeaseRefresh: day.Duration(15 * time.Second), + HALeaseTimeout: day.Duration(2 * time.Minute), + RetentionPeriod: day.Duration(15 * 24 * time.Hour), }, Traces: dataset.Traces{ - RetentionPeriod: dataset.DayDuration(10 * 24 * time.Hour), + RetentionPeriod: day.Duration(10 * 24 * time.Hour), }, } - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 4*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) @@ -53,7 +54,7 @@ func TestDatasetConfigApply(t *testing.T) { cfg, err = dataset.NewConfig("") require.NoError(t, err) - err = cfg.Apply(pgxConn) + err = cfg.Apply(context.Background(), pgxConn) require.NoError(t, err) require.Equal(t, 8*time.Hour, getMetricsDefaultChunkInterval(t, pgxConn)) diff --git a/pkg/tests/end_to_end_tests/continuous_agg_test.go b/pkg/tests/end_to_end_tests/continuous_agg_test.go index 850140e590..3dbec38a17 100644 --- a/pkg/tests/end_to_end_tests/continuous_agg_test.go +++ b/pkg/tests/end_to_end_tests/continuous_agg_test.go @@ -190,7 +190,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } @@ -209,7 +209,8 @@ WITH (timescaledb.continuous) AS lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { @@ -372,7 +373,7 @@ WITH (timescaledb.continuous) AS FROM prom_data.test GROUP BY public.time_bucket('1hour', time), series_id`) require.NoError(t, err) - _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg')") + _, err = db.Exec(context.Background(), "SELECT prom_api.register_metric_view('cagg_schema', 'cagg', NULL)") require.NoError(t, err) _, err = db.Exec(context.Background(), "SELECT prom_api.set_metric_retention_period('cagg_schema', 'cagg', INTERVAL '180 days')") @@ -450,7 +451,7 @@ WITH (timescaledb.continuous) AS t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('public', 'tw_1hour', NULL)"); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } diff --git a/pkg/tests/end_to_end_tests/create_test.go b/pkg/tests/end_to_end_tests/create_test.go index 1d3d9a9edd..bb21aacf39 100644 --- a/pkg/tests/end_to_end_tests/create_test.go +++ b/pkg/tests/end_to_end_tests/create_test.go @@ -1597,7 +1597,7 @@ func TestRegisterMetricView(t *testing.T) { withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { // Cannot register non-existant schema. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('nonexistant', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant schema") } @@ -1606,7 +1606,7 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register non-existant view. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'missing', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view from a non-existant metric view") } @@ -1640,7 +1640,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_in_data_schema AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view in data schema: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_data', 'metric_view_in_data_schema', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view in data schema") } @@ -1648,7 +1648,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_columns AS SELECT time, series_id, true as bad_column FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_columns', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with different columns than raw metric") } @@ -1656,7 +1656,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_bad_column_types AS SELECT time, series_id, true as value FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_bad_column_types', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1664,7 +1664,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view_not_based AS SELECT time, series_id, 1.0 as value FROM prom_view."metric_view_bad_columns"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view_not_based', NULL)"); err == nil { t.Fatal("Should not be able to register a metric view with column types different than raw metric") } @@ -1672,7 +1672,7 @@ func TestRegisterMetricView(t *testing.T) { if _, err = db.Exec(context.Background(), `CREATE VIEW prom_view.metric_view AS SELECT * FROM prom_data."rawMetric"`); err != nil { t.Fatalf("unexpected error while creating view: %s", err) } - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err != nil { t.Fatalf("Error creating valid metric view: %v", err) } @@ -1694,12 +1694,12 @@ func TestRegisterMetricView(t *testing.T) { } // Cannot register the same view twice. - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view')"); err == nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL)"); err == nil { t.Fatal("Should not be able to register the same view twice") } // Should succeed if we register same view twice but also use `if_not_exists` - if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', true)"); err != nil { + if _, err := db.Exec(context.Background(), "SELECT prom_api.register_metric_view('prom_view', 'metric_view', NULL, true)"); err != nil { t.Fatalf("Should be able to register the same view twice when using `if_not_exists`: %v", err) } diff --git a/pkg/tests/end_to_end_tests/exemplar_test.go b/pkg/tests/end_to_end_tests/exemplar_test.go index 2cedc067d8..3e9382a51d 100644 --- a/pkg/tests/end_to_end_tests/exemplar_test.go +++ b/pkg/tests/end_to_end_tests/exemplar_test.go @@ -205,11 +205,12 @@ func TestExemplarQueryingAPI(t *testing.T) { // since the return will be 0, as they have already been ingested by TestExemplarIngestion. labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig), tenancy.NewNoopAuthorizer().ReadAuthorizer()) - r := querier.NewQuerier( + r, err := querier.NewQuerier( pgxconn.NewPgxConn(db), cache.NewMetricCache(cache.DefaultConfig), labelsReader, - cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil) + cache.NewExemplarLabelsPosCache(cache.DefaultConfig), nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) // Query all exemplars corresponding to metric_2 histogram. diff --git a/pkg/tests/end_to_end_tests/multi_tenancy_test.go b/pkg/tests/end_to_end_tests/multi_tenancy_test.go index 956bcb497e..999e359385 100644 --- a/pkg/tests/end_to_end_tests/multi_tenancy_test.go +++ b/pkg/tests/end_to_end_tests/multi_tenancy_test.go @@ -36,7 +36,7 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -56,7 +56,8 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -224,7 +225,7 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -256,7 +257,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a valid tenant (tenant-a) ----- expectedResult := []prompb.TimeSeries{ @@ -382,7 +384,8 @@ func TestMultiTenancyWithValidTenants(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{} @@ -413,7 +416,7 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -461,7 +464,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a non-tenant ----- expectedResult := []prompb.TimeSeries{ @@ -550,7 +554,8 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) { require.NoError(t, err) labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) expectedResult = []prompb.TimeSeries{ { @@ -627,7 +632,7 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { require.NoError(t, err) // Ingestion. - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, mt, false, false) require.NoError(t, err) defer client.Close() @@ -659,7 +664,8 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer()) - qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer()) + qr, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer(), 0, false) + require.NoError(t, err) // ----- query-test: querying a single tenant (tenant-b) ----- expectedResult := []prompb.TimeSeries{ @@ -759,7 +765,7 @@ func TestMultiTenancyLabelNamesValues(t *testing.T) { ts, _ := generateSmallMultiTenantTimeseries() withDB(t, *testDatabase, func(db *pgxpool.Pool, tb testing.TB) { getClient := func(auth tenancy.Authorizer) *pgclient.Client { - client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false) + client, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), &testConfig, 1, db, db, nil, auth, false, false) require.NoError(t, err) return client } diff --git a/pkg/tests/end_to_end_tests/nan_test.go b/pkg/tests/end_to_end_tests/nan_test.go index 578923bf4e..95b0d2b9a3 100644 --- a/pkg/tests/end_to_end_tests/nan_test.go +++ b/pkg/tests/end_to_end_tests/nan_test.go @@ -14,6 +14,7 @@ import ( "github.com/jackc/pgx/v4/pgxpool" _ "github.com/jackc/pgx/v4/stdlib" "github.com/prometheus/prometheus/model/value" + "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/clockcache" "github.com/timescale/promscale/pkg/internal/testhelpers" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -129,7 +130,8 @@ func TestSQLStaleNaN(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(c.query) if err != nil { t.Fatalf("unexpected error while ingesting test dataset: %s", err) diff --git a/pkg/tests/end_to_end_tests/null_chars_test.go b/pkg/tests/end_to_end_tests/null_chars_test.go index 073a7f9a7f..acf6b50e2f 100644 --- a/pkg/tests/end_to_end_tests/null_chars_test.go +++ b/pkg/tests/end_to_end_tests/null_chars_test.go @@ -67,7 +67,8 @@ func TestOperationWithNullChars(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(db) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) resp, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { diff --git a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go index 363db64b0b..a57dab1031 100644 --- a/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go +++ b/pkg/tests/end_to_end_tests/promql_endpoint_integration_test.go @@ -311,7 +311,7 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config, authWrapper m MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, nil, tenancy.NewNoopAuthorizer(), cfg.ReadOnly, false) if err != nil { return nil, pgClient, fmt.Errorf("cannot run test, cannot instantiate pgClient: %w", err) } diff --git a/pkg/tests/end_to_end_tests/query_integration_test.go b/pkg/tests/end_to_end_tests/query_integration_test.go index 2b25312305..9fda0862b0 100644 --- a/pkg/tests/end_to_end_tests/query_integration_test.go +++ b/pkg/tests/end_to_end_tests/query_integration_test.go @@ -114,8 +114,9 @@ func TestDroppedViewQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) - _, err := r.RemoteReadQuerier(ctx).Query(&prompb.Query{ + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) + _, err = r.RemoteReadQuerier(ctx).Query(&prompb.Query{ Matchers: []*prompb.LabelMatcher{ { Type: prompb.LabelMatcher_EQ, @@ -692,7 +693,8 @@ func TestSQLQuery(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { resp, err := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -766,7 +768,7 @@ func createMetricView(db *pgxpool.Pool, t testing.TB, schemaName, viewName, metr if _, err := db.Exec(context.Background(), fmt.Sprintf(`CREATE VIEW "%s"."%s" AS SELECT * FROM prom_data."%s"`, schemaName, viewName, metricName)); err != nil { t.Fatalf("unexpected error while creating metric view: %s", err) } - if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s')", schemaName, viewName)); err != nil { + if _, err := db.Exec(context.Background(), fmt.Sprintf("SELECT prom_api.register_metric_view('%s', '%s', NULL)", schemaName, viewName)); err != nil { t.Fatalf("unexpected error while registering metric view: %s", err) } } @@ -1099,7 +1101,8 @@ func TestPromQL(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) for _, c := range testCases { tester.Run(c.name, func(t *testing.T) { connResp, connErr := r.RemoteReadQuerier(context.Background()).Query(c.query) @@ -1300,7 +1303,8 @@ func TestPushdownDelta(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { @@ -1375,7 +1379,8 @@ func TestPushdownVecSel(t *testing.T) { lCache := clockcache.WithMax(100) dbConn := pgxconn.NewPgxConn(readOnly) labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer) - r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil) + r, err := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil, 0, false) + require.NoError(t, err) queryable := query.NewQueryable(r, labelsReader) queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil) if err != nil { diff --git a/pkg/tests/end_to_end_tests/rollup_query_helper_test.go b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go new file mode 100644 index 0000000000..828a6655bf --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_query_helper_test.go @@ -0,0 +1,102 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/rollup" +) + +func TestRollupQueryHelper(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + _, err := db.Exec(context.Background(), "SELECT prom_api.set_automatic_downsample($1)", true) + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('short', interval '5 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('medium', interval '15 minutes', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('long', interval '1 hour', interval '30 days')") + require.NoError(t, err) + _, err = db.Exec(context.Background(), "CALL _prom_catalog.create_rollup('very_long', interval '1 week', interval '30 days')") + require.NoError(t, err) + + var numRollups int + err = db.QueryRow(context.Background(), "SELECT count(*) FROM _prom_catalog.rollup").Scan(&numRollups) + require.NoError(t, err) + require.Equal(t, 4, numRollups) + + helper, err := rollup.NewDecider(context.Background(), pgxconn.NewPgxConn(db), rollup.DefaultScrapeInterval) + require.NoError(t, err) + require.NotNil(t, helper) + + const originalSchema = "prom_data" + + tcs := []struct { + name string + min time.Duration + max time.Duration + expectedSchemaName string + }{ + { + name: "1 sec", + min: 0, + max: time.Second, + expectedSchemaName: originalSchema, + }, { + name: "5 min", + min: 0, + max: 5 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "30 mins", + min: 0, + max: 30 * time.Minute, + expectedSchemaName: originalSchema, + }, { + name: "1 hour", + min: 0, + max: time.Hour, + expectedSchemaName: originalSchema, + }, { + name: "1 day", + min: 0, + max: 24 * time.Hour, + expectedSchemaName: originalSchema, + }, + { + name: "7 days", + min: 0, + max: 7 * 24 * time.Hour, + expectedSchemaName: "ps_short", + }, + { + name: "30 days", + min: 0, + max: 30 * 24 * time.Hour, + expectedSchemaName: "ps_medium", + }, { + name: "1 year", + min: 0, + max: 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, { + name: "100 years", + min: 0, + max: 100 * 12 * 30 * 24 * time.Hour, + expectedSchemaName: "ps_very_long", + }, + } + for _, tc := range tcs { + recommendedSchema := helper.Decide(int64(tc.min.Seconds()), int64(tc.max.Seconds())) + require.Equal(t, tc.expectedSchemaName, recommendedSchema, tc.name) + } + }) +} diff --git a/pkg/tests/end_to_end_tests/rollup_test.go b/pkg/tests/end_to_end_tests/rollup_test.go new file mode 100644 index 0000000000..7f38a9f32c --- /dev/null +++ b/pkg/tests/end_to_end_tests/rollup_test.go @@ -0,0 +1,111 @@ +// This file and its contents are licensed under the Apache License 2.0. +// Please see the included NOTICE for copyright information and +// LICENSE for a copy of the license. + +package end_to_end_tests + +import ( + "context" + "testing" + "time" + + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/timescale/promscale/pkg/internal/day" + "github.com/timescale/promscale/pkg/rollup" +) + +func TestRollupSync(t *testing.T) { + withDB(t, *testDatabase, func(db *pgxpool.Pool, t testing.TB) { + rollupResolutions := rollup.Resolutions{ + "short": { + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + }, + } + + pgCon, err := db.Acquire(context.Background()) + require.NoError(t, err) + defer pgCon.Release() + + // Test 1: Check if 'short' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), false) + + rollupResolutions["long"] = rollup.Definition{ + Resolution: day.Duration(time.Hour), + Retention: day.Duration(395 * 24 * time.Hour), + } + + // Test 2: Check if 'long' rollup is created. + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + + // Test 3: Update the resolution and check if error is returned. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(4 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.Equal(t, + "error on existing resolution mismatch: existing rollup resolutions cannot be updated. Either keep the resolution of existing rollup labels same or remove them", + err.Error()) + // Reset back to original resolution. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + } + + // Test 4: Remove the first entry and see if the entry is removed or not. + rollupResolutions["short"] = rollup.Definition{ + Resolution: day.Duration(5 * time.Minute), + Retention: day.Duration(30 * 24 * time.Hour), + Delete: true, + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + // Check if long exists. + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + // Check if short does not exist. + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) + + // Test 5: Update retention of long and check if the same is reflected in the DB. + rollupResolutions["long"] = rollup.Definition{ + Resolution: day.Duration(time.Hour), + Retention: day.Duration(500 * 24 * time.Hour), // Updated retention duration. + } + err = rollup.Sync(context.Background(), pgCon.Conn(), rollupResolutions) + require.NoError(t, err) + verifyRollupExistence(t, pgCon.Conn(), "long", + time.Duration(rollupResolutions["long"].Resolution), time.Duration(rollupResolutions["long"].Retention), false) + // Short should still not exists. + verifyRollupExistence(t, pgCon.Conn(), "short", + time.Duration(rollupResolutions["short"].Resolution), time.Duration(rollupResolutions["short"].Retention), true) + }) +} + +func verifyRollupExistence(t testing.TB, pgCon *pgx.Conn, name string, resolution, retention time.Duration, shouldError bool) { + var ( + rName string + rResolution time.Duration + rRetention time.Duration + ) + err := pgCon.QueryRow(context.Background(), "SELECT name, resolution, retention FROM _prom_catalog.rollup WHERE name = $1", name).Scan(&rName, &rResolution, &rRetention) + if shouldError { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, resolution, rResolution) + require.Equal(t, retention, rRetention) +} diff --git a/pkg/tests/end_to_end_tests/rules_test.go b/pkg/tests/end_to_end_tests/rules_test.go index 56b2be39df..5b558bfbe0 100644 --- a/pkg/tests/end_to_end_tests/rules_test.go +++ b/pkg/tests/end_to_end_tests/rules_test.go @@ -35,7 +35,7 @@ func TestRecordingRulesEval(t *testing.T) { MaxConnections: -1, } - pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false) + pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, db, db, nil, tenancy.NewNoopAuthorizer(), false, false) require.NoError(t, err) defer pgClient.Close() err = pgClient.InitPromQLEngine(&query.Config{