diff --git a/pkg/cache/upstream/cache.go b/pkg/cache/upstream/cache.go index bf84fa65..13fe16c3 100644 --- a/pkg/cache/upstream/cache.go +++ b/pkg/cache/upstream/cache.go @@ -22,6 +22,7 @@ import ( "github.com/kalbasit/ncps/pkg/helper" "github.com/kalbasit/ncps/pkg/nar" + "github.com/kalbasit/ncps/pkg/nixcacheindex" "github.com/kalbasit/ncps/pkg/nixcacheinfo" ) @@ -70,6 +71,9 @@ type Cache struct { publicKeys []signature.PublicKey netrcAuth *NetrcCredentials + // indexClient is the client for the binary cache index. + indexClient *nixcacheindex.Client + mu sync.RWMutex isHealthy bool @@ -100,6 +104,9 @@ type Options struct { // ResponseHeaderTimeout is the timeout for waiting for the server's response headers. // If zero, defaults to defaultHTTPTimeout (3s). ResponseHeaderTimeout time.Duration + + // ExperimentalCacheIndex enables the use of the experimental binary cache index. + ExperimentalCacheIndex bool } // New creates a new upstream cache with the given URL and options. @@ -178,6 +185,10 @@ func New(ctx context.Context, u *url.URL, opts *Options) (*Cache, error) { c.priority = 40 // Default priority } + if opts.ExperimentalCacheIndex { + c.indexClient = nixcacheindex.NewClient(ctx, c) + } + return c, nil } @@ -294,6 +305,10 @@ func (c *Cache) GetNarInfo(ctx context.Context, hash string) (*narinfo.NarInfo, Info(). Msg("download the narinfo from upstream") + if c.isDefiniteMissFromIndex(ctx, hash) { + return nil, ErrNotFound + } + resp, err := c.doRequest(ctx, http.MethodGet, u) if err != nil { return nil, err @@ -364,6 +379,10 @@ func (c *Cache) HasNarInfo(ctx context.Context, hash string) (bool, error) { Info(). Msg("heading the narinfo from upstream") + if c.isDefiniteMissFromIndex(ctx, hash) { + return false, nil + } + resp, err := c.doRequest(ctx, http.MethodHead, u) if err != nil { if isTimeout(err) { @@ -535,6 +554,64 @@ func (c *Cache) validateURL(u *url.URL) error { return nil } +// isDefiniteMissFromIndex checks the cache index for a hash. +// It returns true if the index reports a definite miss, and false otherwise. +func (c *Cache) isDefiniteMissFromIndex(ctx context.Context, hash string) bool { + if c.indexClient == nil { + return false + } + + res, err := c.indexClient.Query(ctx, hash) + if err != nil { + zerolog.Ctx(ctx). + Warn(). + Err(err). + Msg("failed to query the index") + + return false // On error, fallback to normal behavior. + } + + if res == nixcacheindex.DefiniteMiss { + zerolog.Ctx(ctx). + Debug(). + Str("hash", hash). + Msg("definite miss from index") + + return true + } + + return false +} + +// Fetch implements nixcacheindex.Fetcher. +func (c *Cache) Fetch(ctx context.Context, path string) (io.ReadCloser, error) { + var u string + + // Check if path is an absolute URL + if parsed, err := url.Parse(path); err == nil && parsed.IsAbs() { + u = path + } else { + u = c.url.JoinPath(path).String() + } + + resp, err := c.doRequest(ctx, http.MethodGet, u) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, nixcacheindex.ErrShardNotFound + } + + return nil, fmt.Errorf("%w: %d", ErrUnexpectedHTTPStatusCode, resp.StatusCode) + } + + return resp.Body, nil +} + // isTimeout checks if an error is a timeout error. func isTimeout(err error) bool { var netErr net.Error diff --git a/pkg/cache/upstream/cache_index_test.go b/pkg/cache/upstream/cache_index_test.go new file mode 100644 index 00000000..97069967 --- /dev/null +++ b/pkg/cache/upstream/cache_index_test.go @@ -0,0 +1,92 @@ +package upstream_test + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kalbasit/ncps/pkg/cache/upstream" + "github.com/kalbasit/ncps/pkg/nixcacheindex" + "github.com/kalbasit/ncps/testhelper" +) + +func TestExperimentalCacheIndex(t *testing.T) { + t.Parallel() + + // 1. Setup Mock Server + var requestedNarInfo bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == "/nix-cache-info": + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("StoreDir: /nix/store\nWantMassQuery: 1\nPriority: 40\n")) + + case r.URL.Path == "/nix-cache-index/manifest.json": + w.WriteHeader(http.StatusOK) + + m := nixcacheindex.NewManifest() + // Update URLs to point to this mock server + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + + baseURL := fmt.Sprintf("%s://%s/nix-cache-index/", scheme, r.Host) + m.Urls.JournalBase = baseURL + "journal/" + m.Urls.ShardsBase = baseURL + "shards/" + m.Urls.DeltasBase = baseURL + "deltas/" + + _ = json.NewEncoder(w).Encode(m) + + case strings.HasPrefix(r.URL.Path, "/nix-cache-index/journal/"): + // Simulate missing/empty journal segments + w.WriteHeader(http.StatusNotFound) + + case strings.HasPrefix(r.URL.Path, "/nix-cache-index/shards/"): + // Simulate missing shards -> implies DefiniteMiss + w.WriteHeader(http.StatusNotFound) + + case strings.HasSuffix(r.URL.Path, ".narinfo"): + requestedNarInfo = true + + w.WriteHeader(http.StatusOK) + // Should not be reached in Hit case if we were testing Hits, + // but for Miss check we want to ensure it's NOT reached + _, _ = w.Write([]byte("StorePath: /nix/store/abc-example")) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + // 2. Setup Upstream Cache with Index Enabled + opts := &upstream.Options{ + ExperimentalCacheIndex: true, + } + + c, err := upstream.New( + newContext(), + testhelper.MustParseURL(t, ts.URL), + opts, + ) + require.NoError(t, err) + + // 3. Perform Request + // The mock setup ensures that checking shards returns 404, which the client interprets as DefiniteMiss. + // Therefore, GetNarInfo should return ErrNotFound WITHOUT requesting the .narinfo file. + + _, err = c.GetNarInfo(context.Background(), "00000000000000000000000000000000") // 32 chars + + // 4. Verification + require.ErrorIs(t, err, upstream.ErrNotFound) + assert.False(t, requestedNarInfo, "Should not have requested the narinfo file from upstream") +} diff --git a/pkg/ncps/serve.go b/pkg/ncps/serve.go index 6a5fe977..65485e3c 100644 --- a/pkg/ncps/serve.go +++ b/pkg/ncps/serve.go @@ -250,6 +250,11 @@ func serveCommand( Sources: flagSources("server.addr", "SERVER_ADDR"), Value: ":8501", }, + &cli.BoolFlag{ + Name: "experimental-cache-index", + Usage: "Enable the use of the experimental binary cache index", + Sources: flagSources("experimental.cache-index", "EXPERIMENTAL_CACHE_INDEX"), + }, // Redis Configuration (optional - for distributed locking in HA deployments) &cli.StringSliceFlag{ @@ -583,6 +588,7 @@ func getUpstreamCaches(ctx context.Context, cmd *cli.Command, netrcData *netrc.N dialerTimeout := cmd.Duration("cache-upstream-dialer-timeout") deprecatedResponseHeaderTimeout := cmd.Duration("upstream-response-header-timeout") responseHeaderTimeout := cmd.Duration("cache-upstream-response-header-timeout") + experimentalCacheIndex := cmd.Bool("experimental-cache-index") // Show deprecation warning for upstream-cache if len(deprecatedUpstreamCache) > 0 { @@ -679,8 +685,9 @@ func getUpstreamCaches(ctx context.Context, cmd *cli.Command, netrcData *netrc.N // Build options for this upstream cache opts := &upstream.Options{ - DialerTimeout: dialerTimeout, - ResponseHeaderTimeout: responseHeaderTimeout, + DialerTimeout: dialerTimeout, + ResponseHeaderTimeout: responseHeaderTimeout, + ExperimentalCacheIndex: experimentalCacheIndex, } // Find public keys for this upstream diff --git a/pkg/nixcacheindex/client.go b/pkg/nixcacheindex/client.go index 249b7385..496cab70 100644 --- a/pkg/nixcacheindex/client.go +++ b/pkg/nixcacheindex/client.go @@ -35,29 +35,32 @@ func (r Result) String() string { // Fetcher abstraction for retrieving files (e.g., HTTP, local file system). type Fetcher interface { - Fetch(path string) (io.ReadCloser, error) + Fetch(ctx context.Context, path string) (io.ReadCloser, error) } // Client for querying the binary cache index. type Client struct { - fetcher Fetcher - manifest *Manifest - + fetcher Fetcher + baseCtx context.Context + manifest *Manifest + manifestOnce sync.Once + manifestErr error shardCacheMu sync.Mutex shardCache map[string]*ShardReader } // NewClient creates a new client. -func NewClient(fetcher Fetcher) *Client { +func NewClient(ctx context.Context, fetcher Fetcher) *Client { return &Client{ fetcher: fetcher, + baseCtx: ctx, shardCache: make(map[string]*ShardReader), } } // LoadManifest fetches and parses the manifest. -func (c *Client) LoadManifest() error { - r, err := c.fetcher.Fetch(ManifestPath) +func (c *Client) LoadManifest(ctx context.Context) error { + r, err := c.fetcher.Fetch(ctx, ManifestPath) if err != nil { return err } @@ -76,10 +79,12 @@ func (c *Client) LoadManifest() error { // Query checks if the cache contains the given hash. // hashStr is the 32-character base32 hash. func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) { - if c.manifest == nil { - if err := c.LoadManifest(); err != nil { - return DefiniteMiss, err - } + c.manifestOnce.Do(func() { + c.manifestErr = c.LoadManifest(c.baseCtx) + }) + + if c.manifestErr != nil { + return DefiniteMiss, c.manifestErr } // 1. Check Journal (Layer 1) @@ -151,7 +156,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) { // In reality, some segments might not exist if no writes happened? // Or if rotation logic is strict. - rc, err := c.fetcher.Fetch(path) + rc, err := c.fetcher.Fetch(ctx, path) if err != nil { // If journal segment is missing, we assume no mutations in that window. zerolog.Ctx(ctx).Debug().Err(err).Str("path", path). @@ -217,7 +222,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) { return c.queryShard(shardReader, hashStr) } - rc, err := c.fetcher.Fetch(shardPath) + rc, err := c.fetcher.Fetch(ctx, shardPath) if err == nil { defer rc.Close() @@ -241,7 +246,7 @@ func (c *Client) Query(ctx context.Context, hashStr string) (Result, error) { return c.queryShard(shardReader, hashStr) } - rc, err = c.fetcher.Fetch(shardPath) + rc, err = c.fetcher.Fetch(ctx, shardPath) if err != nil { return DefiniteMiss, err // Both missing -> Miss (or error) } diff --git a/pkg/nixcacheindex/client_test.go b/pkg/nixcacheindex/client_test.go index 5585c507..ec061edd 100644 --- a/pkg/nixcacheindex/client_test.go +++ b/pkg/nixcacheindex/client_test.go @@ -23,7 +23,7 @@ type MockFetcher struct { fetchCalls int } -func (m *MockFetcher) Fetch(path string) (io.ReadCloser, error) { +func (m *MockFetcher) Fetch(_ context.Context, path string) (io.ReadCloser, error) { m.fetchCalls++ if data, ok := m.files[path]; ok { return io.NopCloser(bytes.NewReader(data)), nil @@ -96,7 +96,7 @@ func TestClientQuery_EndToEnd(t *testing.T) { } fetcher := &MockFetcher{files: mockFiles} - client := nixcacheindex.NewClient(fetcher) + client := nixcacheindex.NewClient(context.Background(), fetcher) // Test Cases @@ -164,7 +164,7 @@ func TestClientQuery_Caching(t *testing.T) { mockFiles["https://mock/shards/1/root.idx.zst"] = compressedShardBuf.Bytes() fetcher := &MockFetcher{files: mockFiles} - client := nixcacheindex.NewClient(fetcher) + client := nixcacheindex.NewClient(context.Background(), fetcher) // First query: should fetch shard res, err := client.Query(context.Background(), hashA)