Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/query.mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,7 @@ SELECT nf.*
FROM nar_files nf
LEFT JOIN narinfo_nar_files ninf ON nf.id = ninf.nar_file_id
WHERE ninf.narinfo_id IS NULL;

-- name: GetAllNarInfos :many
SELECT hash
FROM narinfos;
4 changes: 4 additions & 0 deletions db/query.postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,7 @@ SELECT nf.*
FROM nar_files nf
LEFT JOIN narinfo_nar_files ninf ON nf.id = ninf.nar_file_id
WHERE ninf.narinfo_id IS NULL;

-- name: GetAllNarInfos :many
SELECT hash
FROM narinfos;
4 changes: 4 additions & 0 deletions db/query.sqlite.sql
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,7 @@ SELECT nf.*
FROM nar_files nf
LEFT JOIN narinfo_nar_files ninf ON nf.id = ninf.nar_file_id
WHERE ninf.narinfo_id IS NULL;

-- name: GetAllNarInfos :many
SELECT hash
FROM narinfos;
67 changes: 67 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kalbasit/ncps/pkg/database"
"github.com/kalbasit/ncps/pkg/lock"
"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/nixcacheindex"
"github.com/kalbasit/ncps/pkg/storage"
)

Expand Down Expand Up @@ -137,10 +138,17 @@ type Cache struct {
configStore storage.ConfigStore
narInfoStore storage.NarInfoStore
narStore storage.NarStore
fileStore storage.FileStore

// Should the cache sign the narinfos?
shouldSignNarinfo bool

// Should the cache generate the experimental cache index?
experimentalCacheIndex bool
experimentalCacheIndexHTTPS bool
indexClient *nixcacheindex.Client
indexGenerationJobID cron.EntryID

// recordAgeIgnoreTouch represents the duration at which a record is
// considered up to date and a touch is not invoked. This helps avoid
// repetitive touching of records in the database which are causing `database
Expand Down Expand Up @@ -216,6 +224,13 @@ func (ds *downloadState) getError() error {
return ds.downloadError
}

// Fetch implements the nixcacheindex.Fetcher interface.
func (c *Cache) Fetch(ctx context.Context, path string) (io.ReadCloser, error) {
_, rc, err := c.fileStore.GetFile(ctx, path)

return rc, err
}

// New returns a new Cache.
func New(
ctx context.Context,
Expand All @@ -225,6 +240,7 @@ func New(
configStore storage.ConfigStore,
narInfoStore storage.NarInfoStore,
narStore storage.NarStore,
fileStore storage.FileStore,
secretKeyPath string,
downloadLocker lock.Locker,
cacheLocker lock.RWLocker,
Expand All @@ -238,6 +254,7 @@ func New(
configStore: configStore,
narInfoStore: narInfoStore,
narStore: narStore,
fileStore: fileStore,
shouldSignNarinfo: true,
downloadLocker: downloadLocker,
cacheLocker: cacheLocker,
Expand Down Expand Up @@ -276,6 +293,9 @@ func New(
c.processHealthChanges(ctx, healthChangeCh)
})

c.cron = cron.New()
c.cron.Start()

return c, nil
}

Expand Down Expand Up @@ -376,6 +396,40 @@ func (c *Cache) GetHealthChecker() *healthcheck.HealthChecker { return c.healthC
// SetCacheSignNarinfo configure ncps to sign or not sign narinfos.
func (c *Cache) SetCacheSignNarinfo(shouldSignNarinfo bool) { c.shouldSignNarinfo = shouldSignNarinfo }

// SetExperimentalCacheIndex configure ncps to generate the cache index or not.
func (c *Cache) SetExperimentalCacheIndex(experimentalCacheIndex bool) {
c.experimentalCacheIndex = experimentalCacheIndex

if !c.experimentalCacheIndex {
c.indexClient = nil

if c.indexGenerationJobID != 0 {
c.cron.Remove(c.indexGenerationJobID)
c.indexGenerationJobID = 0
}

return
}

c.indexClient = nixcacheindex.NewClient(c.baseContext, c)

if c.indexGenerationJobID != 0 {
return
}

jobID, err := c.cron.AddFunc("@hourly", c.generateIndex)
if err != nil {
zerolog.Ctx(c.baseContext).Error().Err(err).Msg("failed to schedule cache index generation")

return
}

c.indexGenerationJobID = jobID
}

// SetExperimentalCacheIndexHTTPS configure ncps to use HTTPS for the cache index.
func (c *Cache) SetExperimentalCacheIndexHTTPS(https bool) { c.experimentalCacheIndexHTTPS = https }

// SetMaxSize sets the maxsize of the cache. This will be used by the LRU
// cronjob to automatically clean-up the store.
func (c *Cache) SetMaxSize(maxSize uint64) { c.maxSize = maxSize }
Expand Down Expand Up @@ -1087,6 +1141,19 @@ func (c *Cache) GetNarInfo(ctx context.Context, hash string) (*narinfo.NarInfo,
}
}

if c.indexClient != nil {
status, err := c.indexClient.Query(ctx, hash)
if err != nil {
// Don't fail the request if the index lookup fails, just log it and proceed
zerolog.Ctx(ctx).Warn().Err(err).Str("hash", hash).Msg("cache index query failed")
} else if status == nixcacheindex.DefiniteMiss {
// Avoid checking upstream if we know it's a definite miss
zerolog.Ctx(ctx).Debug().Str("hash", hash).Msg("cache index says definite miss")

return storage.ErrNotFound
}
}

metricAttrs = append(metricAttrs,
attribute.String("result", "miss"),
attribute.String("status", "success"),
Expand Down
3 changes: 3 additions & 0 deletions pkg/cache/cache_distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func TestDistributedDownloadDeduplication(t *testing.T) {
sharedStore,
sharedStore,
sharedStore,
sharedStore,
"",
downloadLocker,
cacheLocker,
Expand Down Expand Up @@ -247,6 +248,7 @@ func TestDistributedConcurrentReads(t *testing.T) {
sharedStore,
sharedStore,
sharedStore,
sharedStore,
"",
downloadLocker,
cacheLocker,
Expand Down Expand Up @@ -288,6 +290,7 @@ func TestDistributedConcurrentReads(t *testing.T) {
sharedStore,
sharedStore,
sharedStore,
sharedStore,
"",
downloadLocker,
cacheLocker,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/cache_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func setupTestCache(t *testing.T) (*Cache, func()) {
downloadLocker := locklocal.NewLocker()
cacheLocker := locklocal.NewRWLocker()

c, err := New(newContext(), cacheName, db, localStore, localStore, localStore, "",
c, err := New(newContext(), cacheName, db, localStore, localStore, localStore, localStore, "",
downloadLocker, cacheLocker, downloadLockTTL, cacheLockTTL)
if err != nil {
cleanup()
Expand Down
11 changes: 6 additions & 5 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ func newTestCache(
configStore storage.ConfigStore,
narInfoStore storage.NarInfoStore,
narStore storage.NarStore,
fileStore storage.FileStore,
secretKeyPath string,
) (*cache.Cache, error) {
downloadLocker := locklocal.NewLocker()
cacheLocker := locklocal.NewRWLocker()

return cache.New(ctx, hostName, db, configStore, narInfoStore, narStore, secretKeyPath,
return cache.New(ctx, hostName, db, configStore, narInfoStore, narStore, fileStore, secretKeyPath,
downloadLocker, cacheLocker, 5*time.Minute, 30*time.Minute)
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func setupTestCache(t *testing.T) (*cache.Cache, database.Querier, *local.Store,

db, localStore, dir, cleanup := setupTestComponents(t)

c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, "")
c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, localStore, "")
require.NoError(t, err)

return c, db, localStore, dir, cleanup
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestNew(t *testing.T) {
db, localStore, _, cleanup := setupTestComponents(t)
defer cleanup()

_, err := newTestCache(newContext(), tt.hostname, db, localStore, localStore, localStore, "")
_, err := newTestCache(newContext(), tt.hostname, db, localStore, localStore, localStore, localStore, "")
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr)
} else {
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestNew(t *testing.T) {

require.NoError(t, skFile.Close())

c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, skFile.Name())
c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, localStore, skFile.Name())
require.NoError(t, err)

// Verify key is NOT in local store
Expand All @@ -207,7 +208,7 @@ func TestNew(t *testing.T) {
err = localStore.PutSecretKey(newContext(), sk)
require.NoError(t, err)

c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, "")
c, err := newTestCache(newContext(), cacheName, db, localStore, localStore, localStore, localStore, "")
require.NoError(t, err)

// Verify key is NOT in local store anymore
Expand Down
7 changes: 7 additions & 0 deletions pkg/cache/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cache

import "context"

func (c *Cache) GenerateIndexForTest(ctx context.Context) error {
return c.doGenerateIndex(ctx)
}
2 changes: 1 addition & 1 deletion pkg/cache/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestHealthCheck(t *testing.T) {
downloadLocker := locklocal.NewLocker()
cacheLocker := locklocal.NewRWLocker()

c, err := cache.New(newContext(), cacheName, db, localStore, localStore, localStore, "",
c, err := cache.New(newContext(), cacheName, db, localStore, localStore, localStore, localStore, "",
downloadLocker, cacheLocker, 5*time.Minute, 30*time.Minute)
require.NoError(t, err)

Expand Down
Loading