Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions pkg/cache/upstream/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/kalbasit/ncps/pkg/helper"
"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/nixcacheindex"
"github.com/kalbasit/ncps/pkg/nixcacheinfo"
)

Expand Down Expand Up @@ -70,6 +71,9 @@ type Cache struct {
publicKeys []signature.PublicKey
netrcAuth *NetrcCredentials

// indexClient is the client for the binary cache index.
indexClient *nixcacheindex.Client

mu sync.RWMutex
isHealthy bool

Expand Down Expand Up @@ -100,6 +104,9 @@ type Options struct {
// ResponseHeaderTimeout is the timeout for waiting for the server's response headers.
// If zero, defaults to defaultHTTPTimeout (3s).
ResponseHeaderTimeout time.Duration

// ExperimentalCacheIndex enables the use of the experimental binary cache index.
ExperimentalCacheIndex bool
}

// New creates a new upstream cache with the given URL and options.
Expand Down Expand Up @@ -178,6 +185,10 @@ func New(ctx context.Context, u *url.URL, opts *Options) (*Cache, error) {
c.priority = 40 // Default priority
}

if opts.ExperimentalCacheIndex {
c.indexClient = nixcacheindex.NewClient(ctx, c)
}

return c, nil
}

Expand Down Expand Up @@ -294,6 +305,10 @@ func (c *Cache) GetNarInfo(ctx context.Context, hash string) (*narinfo.NarInfo,
Info().
Msg("download the narinfo from upstream")

if c.isDefiniteMissFromIndex(ctx, hash) {
return nil, ErrNotFound
}

resp, err := c.doRequest(ctx, http.MethodGet, u)
if err != nil {
return nil, err
Expand Down Expand Up @@ -364,6 +379,10 @@ func (c *Cache) HasNarInfo(ctx context.Context, hash string) (bool, error) {
Info().
Msg("heading the narinfo from upstream")

if c.isDefiniteMissFromIndex(ctx, hash) {
return false, nil
}

resp, err := c.doRequest(ctx, http.MethodHead, u)
if err != nil {
if isTimeout(err) {
Expand Down Expand Up @@ -535,6 +554,64 @@ func (c *Cache) validateURL(u *url.URL) error {
return nil
}

// isDefiniteMissFromIndex checks the cache index for a hash.
// It returns true if the index reports a definite miss, and false otherwise.
func (c *Cache) isDefiniteMissFromIndex(ctx context.Context, hash string) bool {
if c.indexClient == nil {
return false
}

res, err := c.indexClient.Query(ctx, hash)
if err != nil {
zerolog.Ctx(ctx).
Warn().
Err(err).
Msg("failed to query the index")

return false // On error, fallback to normal behavior.
}

if res == nixcacheindex.DefiniteMiss {
zerolog.Ctx(ctx).
Debug().
Str("hash", hash).
Msg("definite miss from index")

return true
}

return false
}

// Fetch implements nixcacheindex.Fetcher.
func (c *Cache) Fetch(ctx context.Context, path string) (io.ReadCloser, error) {
var u string

// Check if path is an absolute URL
if parsed, err := url.Parse(path); err == nil && parsed.IsAbs() {
u = path
} else {
u = c.url.JoinPath(path).String()
}

resp, err := c.doRequest(ctx, http.MethodGet, u)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
resp.Body.Close()

if resp.StatusCode == http.StatusNotFound {
return nil, nixcacheindex.ErrShardNotFound
}

return nil, fmt.Errorf("%w: %d", ErrUnexpectedHTTPStatusCode, resp.StatusCode)
}

return resp.Body, nil
}

// isTimeout checks if an error is a timeout error.
func isTimeout(err error) bool {
var netErr net.Error
Expand Down
92 changes: 92 additions & 0 deletions pkg/cache/upstream/cache_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package upstream_test

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kalbasit/ncps/pkg/cache/upstream"
"github.com/kalbasit/ncps/pkg/nixcacheindex"
"github.com/kalbasit/ncps/testhelper"
)

func TestExperimentalCacheIndex(t *testing.T) {
t.Parallel()

// 1. Setup Mock Server
var requestedNarInfo bool

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.URL.Path == "/nix-cache-info":
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("StoreDir: /nix/store\nWantMassQuery: 1\nPriority: 40\n"))

case r.URL.Path == "/nix-cache-index/manifest.json":
w.WriteHeader(http.StatusOK)

m := nixcacheindex.NewManifest()
// Update URLs to point to this mock server
scheme := "http"
if r.TLS != nil {
scheme = "https"
}

baseURL := fmt.Sprintf("%s://%s/nix-cache-index/", scheme, r.Host)
m.Urls.JournalBase = baseURL + "journal/"
m.Urls.ShardsBase = baseURL + "shards/"
m.Urls.DeltasBase = baseURL + "deltas/"

_ = json.NewEncoder(w).Encode(m)

case strings.HasPrefix(r.URL.Path, "/nix-cache-index/journal/"):
// Simulate missing/empty journal segments
w.WriteHeader(http.StatusNotFound)

case strings.HasPrefix(r.URL.Path, "/nix-cache-index/shards/"):
// Simulate missing shards -> implies DefiniteMiss
w.WriteHeader(http.StatusNotFound)

case strings.HasSuffix(r.URL.Path, ".narinfo"):
requestedNarInfo = true

w.WriteHeader(http.StatusOK)
// Should not be reached in Hit case if we were testing Hits,
// but for Miss check we want to ensure it's NOT reached
_, _ = w.Write([]byte("StorePath: /nix/store/abc-example"))

default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer ts.Close()

// 2. Setup Upstream Cache with Index Enabled
opts := &upstream.Options{
ExperimentalCacheIndex: true,
}

c, err := upstream.New(
newContext(),
testhelper.MustParseURL(t, ts.URL),
opts,
)
require.NoError(t, err)

// 3. Perform Request
// The mock setup ensures that checking shards returns 404, which the client interprets as DefiniteMiss.
// Therefore, GetNarInfo should return ErrNotFound WITHOUT requesting the .narinfo file.

_, err = c.GetNarInfo(context.Background(), "00000000000000000000000000000000") // 32 chars

// 4. Verification
require.ErrorIs(t, err, upstream.ErrNotFound)
assert.False(t, requestedNarInfo, "Should not have requested the narinfo file from upstream")
}
11 changes: 9 additions & 2 deletions pkg/ncps/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func serveCommand(
Sources: flagSources("server.addr", "SERVER_ADDR"),
Value: ":8501",
},
&cli.BoolFlag{
Name: "experimental-cache-index",
Usage: "Enable the use of the experimental binary cache index",
Sources: flagSources("experimental.cache-index", "EXPERIMENTAL_CACHE_INDEX"),
},

// Redis Configuration (optional - for distributed locking in HA deployments)
&cli.StringSliceFlag{
Expand Down Expand Up @@ -583,6 +588,7 @@ func getUpstreamCaches(ctx context.Context, cmd *cli.Command, netrcData *netrc.N
dialerTimeout := cmd.Duration("cache-upstream-dialer-timeout")
deprecatedResponseHeaderTimeout := cmd.Duration("upstream-response-header-timeout")
responseHeaderTimeout := cmd.Duration("cache-upstream-response-header-timeout")
experimentalCacheIndex := cmd.Bool("experimental-cache-index")

// Show deprecation warning for upstream-cache
if len(deprecatedUpstreamCache) > 0 {
Expand Down Expand Up @@ -679,8 +685,9 @@ func getUpstreamCaches(ctx context.Context, cmd *cli.Command, netrcData *netrc.N

// Build options for this upstream cache
opts := &upstream.Options{
DialerTimeout: dialerTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
DialerTimeout: dialerTimeout,
ResponseHeaderTimeout: responseHeaderTimeout,
ExperimentalCacheIndex: experimentalCacheIndex,
}

// Find public keys for this upstream
Expand Down
33 changes: 19 additions & 14 deletions pkg/nixcacheindex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,32 @@ func (r Result) String() string {

// Fetcher abstraction for retrieving files (e.g., HTTP, local file system).
type Fetcher interface {
Fetch(path string) (io.ReadCloser, error)
Fetch(ctx context.Context, path string) (io.ReadCloser, error)
}

// Client for querying the binary cache index.
type Client struct {
fetcher Fetcher
manifest *Manifest

fetcher Fetcher
baseCtx context.Context
manifest *Manifest
manifestOnce sync.Once
manifestErr error
shardCacheMu sync.Mutex
shardCache map[string]*ShardReader
}

// NewClient creates a new client.
func NewClient(fetcher Fetcher) *Client {
func NewClient(ctx context.Context, fetcher Fetcher) *Client {
return &Client{
fetcher: fetcher,
baseCtx: ctx,
shardCache: make(map[string]*ShardReader),
}
}

// LoadManifest fetches and parses the manifest.
func (c *Client) LoadManifest() error {
r, err := c.fetcher.Fetch(ManifestPath)
func (c *Client) LoadManifest(ctx context.Context) error {
r, err := c.fetcher.Fetch(ctx, ManifestPath)
if err != nil {
return err
}
Expand All @@ -76,10 +79,12 @@ func (c *Client) LoadManifest() error {
// Query checks if the cache contains the given hash.
// hashStr is the 32-character base32 hash.
func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) {
if c.manifest == nil {
if err := c.LoadManifest(); err != nil {
return DefiniteMiss, err
}
c.manifestOnce.Do(func() {
c.manifestErr = c.LoadManifest(c.baseCtx)
})

if c.manifestErr != nil {
return DefiniteMiss, c.manifestErr
}

// 1. Check Journal (Layer 1)
Expand Down Expand Up @@ -151,7 +156,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) {
// In reality, some segments might not exist if no writes happened?
// Or if rotation logic is strict.

rc, err := c.fetcher.Fetch(path)
rc, err := c.fetcher.Fetch(ctx, path)
if err != nil {
// If journal segment is missing, we assume no mutations in that window.
zerolog.Ctx(ctx).Debug().Err(err).Str("path", path).
Expand Down Expand Up @@ -217,7 +222,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) {
return c.queryShard(shardReader, hashStr)
}

rc, err := c.fetcher.Fetch(shardPath)
rc, err := c.fetcher.Fetch(ctx, shardPath)
if err == nil {
defer rc.Close()

Expand All @@ -241,7 +246,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) {
return c.queryShard(shardReader, hashStr)
}

rc, err = c.fetcher.Fetch(shardPath)
rc, err = c.fetcher.Fetch(ctx, shardPath)
if err != nil {
return DefiniteMiss, err // Both missing -> Miss (or error)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/nixcacheindex/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type MockFetcher struct {
fetchCalls int
}

func (m *MockFetcher) Fetch(path string) (io.ReadCloser, error) {
func (m *MockFetcher) Fetch(_ context.Context, path string) (io.ReadCloser, error) {
m.fetchCalls++
if data, ok := m.files[path]; ok {
return io.NopCloser(bytes.NewReader(data)), nil
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestClientQuery_EndToEnd(t *testing.T) {
}

fetcher := &MockFetcher{files: mockFiles}
client := nixcacheindex.NewClient(fetcher)
client := nixcacheindex.NewClient(context.Background(), fetcher)

// Test Cases

Expand Down Expand Up @@ -164,7 +164,7 @@ func TestClientQuery_Caching(t *testing.T) {
mockFiles["https://mock/shards/1/root.idx.zst"] = compressedShardBuf.Bytes()

fetcher := &MockFetcher{files: mockFiles}
client := nixcacheindex.NewClient(fetcher)
client := nixcacheindex.NewClient(context.Background(), fetcher)

// First query: should fetch shard
res, err := client.Query(context.Background(), hashA)
Expand Down