diff --git a/cache/remotecache/import.go b/cache/remotecache/import.go index 99b9695f8..22244ea45 100644 --- a/cache/remotecache/import.go +++ b/cache/remotecache/import.go @@ -302,8 +302,9 @@ type image struct { Rootfs struct { DiffIDs []digest.Digest `json:"diff_ids"` } `json:"rootfs"` - Cache []byte `json:"moby.buildkit.cache.v0"` - History []struct { + Cache []byte `json:"moby.buildkit.cache.v0"` + EarthlyInlineCache []byte `json:"earthly.inlinecache.v0"` + History []struct { Created *time.Time `json:"created,omitempty"` CreatedBy string `json:"created_by,omitempty"` EmptyLayer bool `json:"empty_layer,omitempty"` diff --git a/cache/remotecache/inline.go b/cache/remotecache/inline.go new file mode 100644 index 000000000..fafeafa74 --- /dev/null +++ b/cache/remotecache/inline.go @@ -0,0 +1,253 @@ +package remotecache + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/labels" + v1 "github.com/moby/buildkit/cache/remotecache/v1" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" + "github.com/moby/buildkit/util/imageutil" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + ocispecs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +// earthlyInlineCacheItem stores a relation between a simple solver cache key & +// a remote image descriptor. Used for inline caching. +type earthlyInlineCacheItem struct { + Key digest.Digest `json:"cacheKey"` + Descriptor digest.Digest `json:"descriptor"` +} + +// EarthlyInlineCacheRemotes produces a map of cache keys to remote sources by +// parsing inline-cache metadata from a remote image's config data. +func EarthlyInlineCacheRemotes(ctx context.Context, provider content.Provider, desc ocispecs.Descriptor, w worker.Worker) (map[digest.Digest]*solver.Remote, error) { + dt, err := readBlob(ctx, provider, desc) + if err != nil { + return nil, err + } + + manifestType, err := imageutil.DetectManifestBlobMediaType(dt) + if err != nil { + return nil, err + } + + layerDone := progress.OneOff(ctx, fmt.Sprintf("inferred cache manifest type: %s", manifestType)) + layerDone(nil) + + configDesc, err := configDescriptor(dt, manifestType) + if err != nil { + return nil, err + } + + if configDesc.Digest != "" { + return nil, errors.New("expected empty digest value") + } + + m := map[digest.Digest][]byte{} + + if err := allDistributionManifests(ctx, provider, dt, m); err != nil { + return nil, err + } + + remotes := map[digest.Digest]*solver.Remote{} + + for _, dt := range m { + var m ocispecs.Manifest + + if err := json.Unmarshal(dt, &m); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal manifest") + } + + if m.Config.Digest == "" || len(m.Layers) == 0 { + continue + } + + p, err := content.ReadBlob(ctx, provider, m.Config) + if err != nil { + return nil, errors.Wrap(err, "failed to read blob") + } + + var img image + + if err := json.Unmarshal(p, &img); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal image") + } + + if len(img.Rootfs.DiffIDs) != len(m.Layers) { + bklog.G(ctx).Warnf("invalid image with mismatching manifest and config") + continue + } + + if img.EarthlyInlineCache == nil { + continue + } + + cacheItems := []earthlyInlineCacheItem{} + if err := json.Unmarshal(img.EarthlyInlineCache, &cacheItems); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal cache items") + } + + layers, err := preprocessLayers(img, m) + if err != nil { + return nil, err + } + + found := extractRemotes(provider, cacheItems, layers) + for key, remote := range found { + remotes[key] = remote + } + } + + return remotes, nil +} + +// extractRemotes constructs a list of descriptors--which represent the layer +// chain for the given digest--for each of the items discovered in the inline +// metadata. +func extractRemotes(provider content.Provider, cacheItems []earthlyInlineCacheItem, layers []ocispecs.Descriptor) map[digest.Digest]*solver.Remote { + + remotes := map[digest.Digest]*solver.Remote{} + + for _, cacheItem := range cacheItems { + descs := []ocispecs.Descriptor{} + + found := false + for _, layer := range layers { + descs = append(descs, layer) + if layer.Digest == cacheItem.Descriptor { + found = true + break + } + } + + if found { + remote := &solver.Remote{ + Descriptors: descs, + Provider: provider, + } + + remotes[cacheItem.Key] = remote + } + } + + return remotes +} + +// preprocessLayers adds custom annotations which are used later when +// reconstructing the ref. +func preprocessLayers(img image, m ocispecs.Manifest) ([]ocispecs.Descriptor, error) { + createdDates, createdMsg, err := parseCreatedLayerInfo(img) + if err != nil { + return nil, err + } + + n := len(m.Layers) + + if len(createdDates) != n { + return nil, errors.New("unexpected creation dates length") + } + + if len(createdMsg) != n { + return nil, errors.New("unexpected creation messages length") + } + + if len(img.Rootfs.DiffIDs) != n { + return nil, errors.New("unexpected rootfs diff IDs") + } + + ret := []ocispecs.Descriptor{} + + for i, layer := range m.Layers { + if layer.Annotations == nil { + layer.Annotations = map[string]string{} + } + + if createdAt := createdDates[i]; createdAt != "" { + layer.Annotations["buildkit/createdat"] = createdAt + } + + if createdBy := createdMsg[i]; createdBy != "" { + layer.Annotations["buildkit/description"] = createdBy + } + + layer.Annotations[labels.LabelUncompressed] = img.Rootfs.DiffIDs[i].String() + + ret = append(ret, layer) + } + + return ret, nil +} + +// configDescriptor parses and returns the correct manifest for the given manifest type. +func configDescriptor(dt []byte, manifestType string) (ocispecs.Descriptor, error) { + var configDesc ocispecs.Descriptor + + switch manifestType { + case images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex: + var mfst ocispecs.Index + if err := json.Unmarshal(dt, &mfst); err != nil { + return ocispecs.Descriptor{}, err + } + + for _, m := range mfst.Manifests { + if m.MediaType == v1.CacheConfigMediaTypeV0 { + configDesc = m + continue + } + } + case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest: + var mfst ocispecs.Manifest + if err := json.Unmarshal(dt, &mfst); err != nil { + return ocispecs.Descriptor{}, err + } + + if mfst.Config.MediaType == v1.CacheConfigMediaTypeV0 { + configDesc = mfst.Config + } + default: + return ocispecs.Descriptor{}, errors.Errorf("unsupported or uninferrable manifest type %s", manifestType) + } + + return configDesc, nil +} + +// allDistributionManifests pulls all manifest data & linked manifests using the provider. +func allDistributionManifests(ctx context.Context, provider content.Provider, dt []byte, m map[digest.Digest][]byte) error { + mt, err := imageutil.DetectManifestBlobMediaType(dt) + if err != nil { + return err + } + + switch mt { + case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest: + m[digest.FromBytes(dt)] = dt + case images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex: + var index ocispecs.Index + if err := json.Unmarshal(dt, &index); err != nil { + return errors.WithStack(err) + } + + for _, d := range index.Manifests { + if _, ok := m[d.Digest]; ok { + continue + } + p, err := content.ReadBlob(ctx, provider, d) + if err != nil { + return errors.WithStack(err) + } + if err := allDistributionManifests(ctx, provider, p, m); err != nil { + return err + } + } + } + + return nil +} diff --git a/cache/remotecache/registry/inline.go b/cache/remotecache/registry/inline.go new file mode 100644 index 000000000..267e1dca4 --- /dev/null +++ b/cache/remotecache/registry/inline.go @@ -0,0 +1,58 @@ +package registry + +import ( + "context" + "strconv" + + "github.com/containerd/containerd/remotes/docker" + "github.com/moby/buildkit/cache/remotecache" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/contentutil" + "github.com/moby/buildkit/util/resolver" + "github.com/moby/buildkit/util/resolver/limited" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +// EarthlyInlineCacheRemotes fetches a group of remote sources based on values +// discovered in a remote image's inline-cache metadata field. +func EarthlyInlineCacheRemotes(ctx context.Context, sm *session.Manager, w worker.Worker, hosts docker.RegistryHosts, g session.Group, attrs map[string]string) (map[digest.Digest]*solver.Remote, error) { + ref, err := canonicalizeRef(attrs[attrRef]) + if err != nil { + return nil, err + } + + refString := ref.String() + + insecure := false + if v, ok := attrs[attrInsecure]; ok { + val, err := strconv.ParseBool(v) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse %s", attrInsecure) + } + insecure = val + } + + scope, hosts := registryConfig(hosts, ref, "pull", insecure) + remote := resolver.DefaultPool.GetResolver(hosts, refString, scope, sm, g) + + xref, desc, err := remote.Resolve(ctx, refString) + if err != nil { + return nil, err + } + + fetcher, err := remote.Fetcher(ctx, xref) + if err != nil { + return nil, err + } + + src := &withDistributionSourceLabel{ + Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, refString)), + ref: refString, + source: w.ContentStore(), + } + + return remotecache.EarthlyInlineCacheRemotes(ctx, src, desc, w) +} diff --git a/client/client.go b/client/client.go index c590ba8ab..49514fdd1 100644 --- a/client/client.go +++ b/client/client.go @@ -52,6 +52,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error grpc.WithDefaultCallOptions(grpc_retry.WithBackoff(grpc_retry.BackoffExponentialWithJitter(10*time.Millisecond, 0.1))), //earthly } needDialer := true + useDefaultDialer := false // earthly-specific var unary []grpc.UnaryClientInterceptor var stream []grpc.StreamClientInterceptor @@ -94,6 +95,11 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error headersKV = h.kv } + // earthly-specific + if _, ok := o.(*withDefaultGRPCDialer); ok { + useDefaultDialer = true + } + if opt, ok := o.(*withGRPCDialOption); ok { customDialOptions = append(customDialOptions, opt.opt) } @@ -121,7 +127,7 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error stream = append(stream, otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagators))) } - if needDialer { + if needDialer && !useDefaultDialer { dialFn, err := resolveDialer(address) if err != nil { return nil, err @@ -164,6 +170,14 @@ func New(ctx context.Context, address string, opts ...ClientOpt) (*Client, error gopts = append(gopts, grpc.WithChainStreamInterceptor(stream...)) gopts = append(gopts, customDialOptions...) + // earthly-specific + if useDefaultDialer { + split := strings.Split(address, "://") + if len(split) > 0 { + address = split[1] + } + } + conn, err := grpc.DialContext(ctx, address, gopts...) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q . make sure buildkitd is running", address) diff --git a/client/client_earthly.go b/client/client_earthly.go index d45b8a312..4c2f788af 100644 --- a/client/client_earthly.go +++ b/client/client_earthly.go @@ -32,3 +32,14 @@ func headersStreamInterceptor(kv ...string) grpc.StreamClientInterceptor { return streamer(ctx, desc, cc, method, opts...) } } + +// WithDefaultGRPCDialer triggers the internal gRPC dialer to be used instead of the buildkit default. +// This can be important when buildkit server is behind an HTTP connect proxy, +// since the default dialer in gRPC already knows how to use those. +func WithDefaultGRPCDialer() ClientOpt { + return &withDefaultGRPCDialer{} +} + +type withDefaultGRPCDialer struct{} + +func (*withDefaultGRPCDialer) isClientOpt() {} diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index fd316b9fc..c59c2cc12 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -843,6 +843,8 @@ func newController(c *cli.Context, cfg *config.Config, shutdownCh chan struct{}) LeaseManager: w.LeaseManager(), ContentStore: w.ContentStore(), HistoryConfig: cfg.History, + RootDir: cfg.Root, + RegistryHosts: resolverFn, }) } diff --git a/control/control.go b/control/control.go index a9816692d..d3422f0aa 100644 --- a/control/control.go +++ b/control/control.go @@ -11,6 +11,7 @@ import ( contentapi "github.com/containerd/containerd/api/services/content/v1" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/services/content/contentserver" "github.com/distribution/reference" "github.com/hashicorp/go-multierror" @@ -68,6 +69,8 @@ type Opt struct { LeaseManager *leaseutil.Manager ContentStore *containerdsnapshot.Store HistoryConfig *config.HistoryConfig + RootDir string + RegistryHosts docker.RegistryHosts } type Controller struct { // TODO: ControlService @@ -105,6 +108,8 @@ func NewController(opt Opt) (*Controller, error) { SessionManager: opt.SessionManager, Entitlements: opt.Entitlements, HistoryQueue: hq, + RootDir: opt.RootDir, + RegistryHosts: opt.RegistryHosts, }) if err != nil { return nil, errors.Wrap(err, "failed to create solver") diff --git a/executor/runcexecutor/executor.go b/executor/runcexecutor/executor.go index 40e0aa658..4c43f8646 100644 --- a/executor/runcexecutor/executor.go +++ b/executor/runcexecutor/executor.go @@ -323,6 +323,8 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, } } + statsCtx, statsCancel := context.WithCancel(ctx) + trace.SpanFromContext(ctx).AddEvent("Container created") err = w.run(ctx, id, bundle, process, func() { startedOnce.Do(func() { @@ -331,7 +333,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, close(started) } if process.StatsStream != nil { - go w.monitorContainerStats(ctx, id, w.sampleFrequency, process.StatsStream) // earthly-specific + go w.monitorContainerStats(statsCtx, id, w.sampleFrequency, process.StatsStream) // earthly-specific } if rec != nil { rec.Start() @@ -340,6 +342,7 @@ func (w *runcExecutor) Run(ctx context.Context, id string, root executor.Mount, }, true) releaseContainer := func(ctx context.Context) error { + statsCancel() err := w.runc.Delete(ctx, id, &runc.DeleteOpts{}) err1 := namespace.Close() if err == nil { diff --git a/executor/runcexecutor/monitor_stats.go b/executor/runcexecutor/monitor_stats.go index 79b63d872..e1a03e5c1 100644 --- a/executor/runcexecutor/monitor_stats.go +++ b/executor/runcexecutor/monitor_stats.go @@ -46,33 +46,42 @@ func writeStatsToStream(w io.Writer, stats *runc.Stats) error { func (w *runcExecutor) monitorContainerStats(ctx context.Context, id string, sampleFrequency time.Duration, statsWriter io.WriteCloser) { numFailuresAllowed := 10 - for { - // sleep at the top of the loop to give it time to start - time.Sleep(sampleFrequency) - stats, err := w.runc.Stats(ctx, id) - if err != nil { - if errors.Is(err, context.Canceled) { + timer := time.NewTimer(sampleFrequency) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + bklog.G(ctx).Debugf("stats collection context done: %v", ctx.Err()) + return + case <-timer.C: // Initial sleep will give container the chance to start. + stats, err := w.runc.Stats(ctx, id) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + if numFailuresAllowed > 0 { + // Allow the initial calls to runc.Stats to fail, for cases + // where the program didn't start within the initial + // sampleFrequency; this should only occur under heavy + // workloads. + bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err) + numFailuresAllowed-- + continue + } + bklog.G(ctx).Errorf("runc stats collection error: %s", err) return } - if numFailuresAllowed > 0 { - // allow the initial calls to runc.Stats to fail, for cases where the program didn't start within the initial - // sampleFrequency; this should only occur under heavy workloads - bklog.G(ctx).Warnf("ignoring runc stats collection error: %s", err) - numFailuresAllowed-- - continue - } - bklog.G(ctx).Errorf("runc stats collection error: %s", err) - return - } - // once runc.Stats has succeeded, don't ignore future errors - numFailuresAllowed = 0 + // Once runc.Stats has succeeded, don't ignore future errors. + numFailuresAllowed = 0 - err = writeStatsToStream(statsWriter, stats) - if err != nil { - bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err) - return + err = writeStatsToStream(statsWriter, stats) + if err != nil { + bklog.G(ctx).Errorf("failed to send runc stats to client-stream: %s", err) + return + } } } } diff --git a/exporter/containerimage/exptypes/types.go b/exporter/containerimage/exptypes/types.go index c4d5721ea..83a85c379 100644 --- a/exporter/containerimage/exptypes/types.go +++ b/exporter/containerimage/exptypes/types.go @@ -11,6 +11,7 @@ const ( ExporterImageConfigDigestKey = "containerimage.config.digest" ExporterImageDescriptorKey = "containerimage.descriptor" ExporterInlineCache = "containerimage.inlinecache" + EarthlyInlineCache = "earthly.inlinecache" ExporterPlatformsKey = "refs.platforms" ) diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index 0c1e91bf1..22525b579 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -138,6 +138,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session config := exptypes.ParseKey(inp.Metadata, exptypes.ExporterImageConfigKey, p) inlineCache := exptypes.ParseKey(inp.Metadata, exptypes.ExporterInlineCache, p) + earthlyInlineCache := exptypes.ParseKey(inp.Metadata, exptypes.EarthlyInlineCache, p) remote := &remotes[0] if opts.RewriteTimestamp { remote, err = ic.rewriteRemoteWithEpoch(ctx, opts, remote) @@ -145,7 +146,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session return nil, err } } - mfstDesc, configDesc, err := ic.commitDistributionManifest(ctx, opts, ref, config, remote, annotations, inlineCache, opts.Epoch, session.NewGroup(sessionID)) + mfstDesc, configDesc, err := ic.commitDistributionManifest(ctx, opts, ref, config, remote, annotations, inlineCache, earthlyInlineCache, opts.Epoch, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -203,6 +204,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session } config := exptypes.ParseKey(inp.Metadata, exptypes.ExporterImageConfigKey, p) inlineCache := exptypes.ParseKey(inp.Metadata, exptypes.ExporterInlineCache, p) + earthlyInlineCache := exptypes.ParseKey(inp.Metadata, exptypes.EarthlyInlineCache, p) remote := &remotes[remotesMap[p.ID]] if remote == nil { @@ -218,7 +220,7 @@ func (ic *ImageWriter) Commit(ctx context.Context, inp *exporter.Source, session } } - desc, _, err := ic.commitDistributionManifest(ctx, opts, r, config, remote, opts.Annotations.Platform(&p.Platform), inlineCache, opts.Epoch, session.NewGroup(sessionID)) + desc, _, err := ic.commitDistributionManifest(ctx, opts, r, config, remote, opts.Annotations.Platform(&p.Platform), inlineCache, earthlyInlineCache, opts.Epoch, session.NewGroup(sessionID)) if err != nil { return nil, err } @@ -388,7 +390,7 @@ func (ic *ImageWriter) rewriteRemoteWithEpoch(ctx context.Context, opts *ImageCo }, nil } -func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *ImageCommitOpts, ref cache.ImmutableRef, config []byte, remote *solver.Remote, annotations *Annotations, inlineCache []byte, epoch *time.Time, sg session.Group) (*ocispecs.Descriptor, *ocispecs.Descriptor, error) { +func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *ImageCommitOpts, ref cache.ImmutableRef, config []byte, remote *solver.Remote, annotations *Annotations, inlineCache, earthlyInlineCache []byte, epoch *time.Time, sg session.Group) (*ocispecs.Descriptor, *ocispecs.Descriptor, error) { if len(config) == 0 { var err error config, err = defaultImageConfig() @@ -407,7 +409,7 @@ func (ic *ImageWriter) commitDistributionManifest(ctx context.Context, opts *Ima return nil, nil, err } - config, err = patchImageConfig(config, remote.Descriptors, history, inlineCache, epoch) + config, err = patchImageConfig(config, remote.Descriptors, history, inlineCache, earthlyInlineCache, epoch) if err != nil { return nil, nil, err } @@ -633,7 +635,7 @@ func parseHistoryFromConfig(dt []byte) ([]ocispecs.History, error) { return config.History, nil } -func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs.History, cache []byte, epoch *time.Time) ([]byte, error) { +func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs.History, cache, earthlyInlineCache []byte, epoch *time.Time) ([]byte, error) { m := map[string]json.RawMessage{} if err := json.Unmarshal(dt, &m); err != nil { return nil, errors.Wrap(err, "failed to parse image config for patch") @@ -701,6 +703,14 @@ func patchImageConfig(dt []byte, descs []ocispecs.Descriptor, history []ocispecs m["moby.buildkit.cache.v0"] = dt } + if earthlyInlineCache != nil { + dt, err := json.Marshal(earthlyInlineCache) + if err != nil { + return nil, err + } + m["earthly.inlinecache.v0"] = dt + } + dt, err = json.Marshal(m) return dt, errors.Wrap(err, "failed to marshal config after patch") } diff --git a/exporter/earthlyoutputs/export.go b/exporter/earthlyoutputs/export.go index 06c74cb8a..c4c02df7c 100644 --- a/exporter/earthlyoutputs/export.go +++ b/exporter/earthlyoutputs/export.go @@ -268,6 +268,13 @@ func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source simpleMd[exptypes.ExporterInlineCache] = inlineCache } + // TODO: Remove the above (legacy) option. + earthlyInlineCacheK := fmt.Sprintf("%s/%s", exptypes.EarthlyInlineCache, k) + earthlyInlineCache, ok := src.Metadata[earthlyInlineCacheK] + if ok { + simpleMd[exptypes.EarthlyInlineCache] = earthlyInlineCache + } + opts := e.opts as, _, err := containerimage.ParseAnnotations(simpleMd) if err != nil { diff --git a/session/auth/auth.pb.go b/session/auth/auth.pb.go index e23a07fc8..d85b06a46 100644 --- a/session/auth/auth.pb.go +++ b/session/auth/auth.pb.go @@ -202,6 +202,7 @@ type FetchTokenResponse struct { Token string `protobuf:"bytes,1,opt,name=Token,proto3" json:"Token,omitempty"` ExpiresIn int64 `protobuf:"varint,2,opt,name=ExpiresIn,proto3" json:"ExpiresIn,omitempty"` IssuedAt int64 `protobuf:"varint,3,opt,name=IssuedAt,proto3" json:"IssuedAt,omitempty"` + Anonymous bool `protobuf:"varint,99,opt,name=Anonymous,proto3" json:"Anonymous,omitempty"` } func (m *FetchTokenResponse) Reset() { *m = FetchTokenResponse{} } @@ -257,6 +258,13 @@ func (m *FetchTokenResponse) GetIssuedAt() int64 { return 0 } +func (m *FetchTokenResponse) GetAnonymous() bool { + if m != nil { + return m.Anonymous + } + return false +} + type GetTokenAuthorityRequest struct { Host string `protobuf:"bytes,1,opt,name=Host,proto3" json:"Host,omitempty"` Salt []byte `protobuf:"bytes,2,opt,name=Salt,proto3" json:"Salt,omitempty"` @@ -467,40 +475,41 @@ func init() { func init() { proto.RegisterFile("auth.proto", fileDescriptor_8bbd6f3875b0e874) } var fileDescriptor_8bbd6f3875b0e874 = []byte{ - // 513 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xc7, 0xbd, 0x75, 0xd2, 0x36, 0x43, 0x0f, 0x74, 0x89, 0x90, 0x31, 0xd1, 0xaa, 0x32, 0x45, - 0xaa, 0x40, 0x58, 0x02, 0x24, 0x24, 0xb8, 0xb5, 0xe5, 0x2b, 0xe2, 0x52, 0x39, 0x7c, 0x48, 0xbd, - 0x20, 0xc7, 0x9e, 0x12, 0x0b, 0xc7, 0x0e, 0xde, 0x75, 0x85, 0x6f, 0xdc, 0xb9, 0xf0, 0x08, 0x1c, - 0x79, 0x14, 0x8e, 0x39, 0xf6, 0x48, 0x9c, 0x0b, 0xc7, 0x3c, 0x02, 0xf2, 0x66, 0x9d, 0x04, 0x1c, - 0xd2, 0xdc, 0xfc, 0x1f, 0xff, 0x77, 0xe6, 0xb7, 0x33, 0xa3, 0x05, 0x70, 0x53, 0xd1, 0xb3, 0x07, - 0x49, 0x2c, 0x62, 0x7a, 0xb5, 0x1f, 0x77, 0x33, 0xfb, 0x2c, 0x08, 0x91, 0x67, 0x91, 0x67, 0x9f, - 0xdf, 0xb7, 0x0e, 0x80, 0x1e, 0x27, 0xe8, 0x63, 0x24, 0x02, 0x37, 0xe4, 0x0e, 0x7e, 0x4a, 0x91, - 0x0b, 0x4a, 0xa1, 0xf6, 0x32, 0xe6, 0xc2, 0x20, 0x7b, 0xe4, 0xa0, 0xe1, 0xc8, 0x6f, 0xab, 0x0d, - 0xd7, 0xfe, 0x72, 0xf2, 0x41, 0x1c, 0x71, 0xa4, 0x26, 0x6c, 0xbf, 0xe1, 0x98, 0x44, 0x6e, 0x1f, - 0x95, 0x7d, 0xa6, 0xe9, 0x75, 0xd8, 0xec, 0xa0, 0x97, 0xa0, 0x30, 0x36, 0xe4, 0x1f, 0xa5, 0xac, - 0xaf, 0x04, 0x76, 0x9f, 0xa3, 0xf0, 0x7a, 0xaf, 0xe3, 0x8f, 0x18, 0x95, 0x45, 0x4d, 0xd8, 0x3e, - 0x0e, 0x03, 0x8c, 0x44, 0xfb, 0x69, 0x99, 0xa9, 0xd4, 0x33, 0xa0, 0x8d, 0x39, 0x10, 0x6d, 0x42, - 0xdd, 0x41, 0x37, 0xec, 0x1b, 0xba, 0x0c, 0x4e, 0x05, 0x35, 0x60, 0xab, 0x83, 0xc9, 0x79, 0xe0, - 0xa1, 0x51, 0x93, 0xf1, 0x52, 0x4a, 0x1a, 0x2f, 0x1e, 0x20, 0x37, 0xea, 0x7b, 0xba, 0xa4, 0x91, - 0xca, 0xf2, 0x81, 0x2e, 0xc2, 0xa8, 0x7b, 0x35, 0xa1, 0x2e, 0x03, 0x0a, 0x65, 0x2a, 0x68, 0x0b, - 0x1a, 0xcf, 0x3e, 0x0f, 0x82, 0x04, 0x79, 0x3b, 0x92, 0x30, 0xba, 0x33, 0x0f, 0x14, 0x37, 0x68, - 0x73, 0x9e, 0xa2, 0x7f, 0x28, 0x24, 0x94, 0xee, 0xcc, 0xb4, 0x75, 0x04, 0xc6, 0x0b, 0x14, 0x32, - 0xcb, 0x61, 0x2a, 0x7a, 0x71, 0x12, 0x88, 0x6c, 0x45, 0xbb, 0x8b, 0x58, 0xc7, 0x0d, 0xa7, 0x37, - 0xde, 0x71, 0xe4, 0xb7, 0xf5, 0x18, 0x6e, 0x2c, 0xc9, 0xa1, 0x80, 0x5b, 0xd0, 0x38, 0x49, 0xbb, - 0x61, 0xe0, 0xbd, 0xc2, 0x4c, 0x66, 0xda, 0x71, 0xe6, 0x01, 0xeb, 0x3d, 0xdc, 0x7c, 0x8b, 0x49, - 0x70, 0x96, 0xad, 0x4f, 0x60, 0xc0, 0xd6, 0x89, 0x9b, 0x85, 0xb1, 0xeb, 0x2b, 0x88, 0x52, 0xce, - 0xd8, 0xf4, 0x05, 0xb6, 0x47, 0xd0, 0x5a, 0x5e, 0x40, 0xe1, 0x15, 0xdd, 0x0f, 0x3e, 0x44, 0xe8, - 0x2b, 0x36, 0xa5, 0x1e, 0x7c, 0xd7, 0xa1, 0x56, 0xb8, 0xe9, 0x29, 0x5c, 0x59, 0xd8, 0x2f, 0xba, - 0x6f, 0xff, 0xbb, 0xab, 0x76, 0x75, 0x51, 0xcd, 0xdb, 0x97, 0xb8, 0x54, 0xf1, 0x77, 0x00, 0xf3, - 0x11, 0xd3, 0x5b, 0xd5, 0x43, 0x95, 0x6d, 0x34, 0xf7, 0x57, 0x9b, 0x54, 0xe2, 0x10, 0x76, 0x2b, - 0x13, 0xa1, 0x77, 0xaa, 0x47, 0xff, 0x37, 0x7a, 0xf3, 0xee, 0x5a, 0x5e, 0x55, 0x2d, 0x85, 0xe6, - 0xb2, 0x1e, 0xd3, 0x7b, 0xd5, 0x24, 0x2b, 0x86, 0x6d, 0xda, 0xeb, 0xda, 0xa7, 0x65, 0x8f, 0x9e, - 0x0c, 0x47, 0x4c, 0xbb, 0x18, 0x31, 0x6d, 0x32, 0x62, 0xe4, 0x4b, 0xce, 0xc8, 0x8f, 0x9c, 0x91, - 0x9f, 0x39, 0x23, 0xc3, 0x9c, 0x91, 0x5f, 0x39, 0x23, 0xbf, 0x73, 0xa6, 0x4d, 0x72, 0x46, 0xbe, - 0x8d, 0x99, 0x36, 0x1c, 0x33, 0xed, 0x62, 0xcc, 0xb4, 0xd3, 0x5a, 0xf1, 0xee, 0x74, 0x37, 0xe5, - 0xc3, 0xf3, 0xf0, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xba, 0xb3, 0x18, 0x70, 0x86, 0x04, 0x00, - 0x00, + // 530 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xbb, 0x8e, 0xd3, 0x40, + 0x14, 0x86, 0x3d, 0xeb, 0xec, 0xed, 0xb0, 0x05, 0x3b, 0x44, 0xc8, 0x98, 0x68, 0x14, 0x99, 0x45, + 0x8a, 0x40, 0x58, 0x02, 0x24, 0x24, 0xe8, 0xb2, 0xcb, 0x2d, 0xa2, 0x59, 0x39, 0x5c, 0xa4, 0x6d, + 0x90, 0xe3, 0x9c, 0x25, 0x16, 0x8e, 0x27, 0x78, 0xc6, 0x2b, 0xdc, 0x21, 0x5a, 0x1a, 0x1e, 0x81, + 0x92, 0x47, 0xa1, 0x4c, 0xb9, 0x25, 0x71, 0x1a, 0xca, 0x7d, 0x04, 0xe4, 0xc9, 0xe4, 0x02, 0x0e, + 0x21, 0x9d, 0xff, 0xdf, 0xff, 0x9c, 0xf3, 0x8d, 0xcf, 0x91, 0x01, 0xfc, 0x54, 0xf6, 0xdc, 0x41, + 0xc2, 0x25, 0xa7, 0x97, 0xfb, 0xbc, 0x93, 0xb9, 0xa7, 0x61, 0x84, 0x22, 0x8b, 0x03, 0xf7, 0xec, + 0xae, 0xd3, 0x00, 0x7a, 0x94, 0x60, 0x17, 0x63, 0x19, 0xfa, 0x91, 0xf0, 0xf0, 0x43, 0x8a, 0x42, + 0x52, 0x0a, 0x95, 0xe7, 0x5c, 0x48, 0x8b, 0xd4, 0x49, 0x63, 0xd7, 0x53, 0xcf, 0x4e, 0x0b, 0xae, + 0xfc, 0x91, 0x14, 0x03, 0x1e, 0x0b, 0xa4, 0x36, 0xec, 0xbc, 0x12, 0x98, 0xc4, 0x7e, 0x1f, 0x75, + 0x7c, 0xa6, 0xe9, 0x55, 0xd8, 0x6a, 0x63, 0x90, 0xa0, 0xb4, 0x36, 0xd4, 0x1b, 0xad, 0x9c, 0x2f, + 0x04, 0xf6, 0x9f, 0xa2, 0x0c, 0x7a, 0x2f, 0xf9, 0x7b, 0x8c, 0xa7, 0x4d, 0x6d, 0xd8, 0x39, 0x8a, + 0x42, 0x8c, 0x65, 0xeb, 0xf1, 0xb4, 0xd2, 0x54, 0xcf, 0x80, 0x36, 0xe6, 0x40, 0xb4, 0x0a, 0x9b, + 0x1e, 0xfa, 0x51, 0xdf, 0x32, 0x95, 0x39, 0x11, 0xd4, 0x82, 0xed, 0x36, 0x26, 0x67, 0x61, 0x80, + 0x56, 0x45, 0xf9, 0x53, 0xa9, 0x68, 0x02, 0x3e, 0x40, 0x61, 0x6d, 0xd6, 0x4d, 0x45, 0xa3, 0x94, + 0xf3, 0x99, 0x00, 0x5d, 0xa4, 0xd1, 0x17, 0xab, 0xc2, 0xa6, 0x32, 0x34, 0xcb, 0x44, 0xd0, 0x1a, + 0xec, 0x3e, 0xf9, 0x38, 0x08, 0x13, 0x14, 0xad, 0x58, 0xd1, 0x98, 0xde, 0xdc, 0x28, 0xae, 0xd0, + 0x12, 0x22, 0xc5, 0x6e, 0x53, 0x2a, 0x2a, 0xd3, 0x9b, 0xe9, 0xe2, 0x64, 0x33, 0xe6, 0x71, 0xd6, + 0xe7, 0xa9, 0xb0, 0x82, 0x3a, 0x69, 0xec, 0x78, 0x73, 0xc3, 0x39, 0x04, 0xeb, 0x19, 0x4a, 0xd5, + 0xa3, 0x99, 0xca, 0x1e, 0x4f, 0x42, 0x99, 0xad, 0x98, 0x46, 0xe1, 0xb5, 0xfd, 0x68, 0xf2, 0x41, + 0xf6, 0x3c, 0xf5, 0xec, 0x3c, 0x84, 0x6b, 0x4b, 0x6a, 0xe8, 0xeb, 0xd4, 0x60, 0xf7, 0x38, 0xed, + 0x44, 0x61, 0xf0, 0x02, 0x33, 0x55, 0x69, 0xcf, 0x9b, 0x1b, 0xce, 0x5b, 0xb8, 0xfe, 0x1a, 0x93, + 0xf0, 0x34, 0x5b, 0x9f, 0xc0, 0x82, 0xed, 0x63, 0x3f, 0x8b, 0xb8, 0xdf, 0xd5, 0x10, 0x53, 0x39, + 0x63, 0x33, 0x17, 0xd8, 0x1e, 0x40, 0x6d, 0x79, 0x03, 0x8d, 0x57, 0x0c, 0x27, 0x7c, 0x17, 0x63, + 0x57, 0xb3, 0x69, 0x75, 0xef, 0x9b, 0x09, 0x95, 0x22, 0x4d, 0x4f, 0xe0, 0xd2, 0xc2, 0xfa, 0xd1, + 0x03, 0xf7, 0xef, 0x55, 0x76, 0xcb, 0x7b, 0x6c, 0xdf, 0xfc, 0x4f, 0x4a, 0x37, 0x7f, 0x03, 0x30, + 0x5f, 0x00, 0x7a, 0xa3, 0x7c, 0xa8, 0xb4, 0xac, 0xf6, 0xc1, 0xea, 0x90, 0x2e, 0x1c, 0xc1, 0x7e, + 0x69, 0x22, 0xf4, 0x56, 0xf9, 0xe8, 0xbf, 0x46, 0x6f, 0xdf, 0x5e, 0x2b, 0xab, 0xbb, 0xa5, 0x50, + 0x5d, 0xf6, 0x8d, 0xe9, 0x9d, 0x72, 0x91, 0x15, 0xc3, 0xb6, 0xdd, 0x75, 0xe3, 0x93, 0xb6, 0x87, + 0x8f, 0x86, 0x23, 0x66, 0x9c, 0x8f, 0x98, 0x71, 0x31, 0x62, 0xe4, 0x53, 0xce, 0xc8, 0xf7, 0x9c, + 0x91, 0x1f, 0x39, 0x23, 0xc3, 0x9c, 0x91, 0x9f, 0x39, 0x23, 0xbf, 0x72, 0x66, 0x5c, 0xe4, 0x8c, + 0x7c, 0x1d, 0x33, 0x63, 0x38, 0x66, 0xc6, 0xf9, 0x98, 0x19, 0x27, 0x95, 0xe2, 0xb7, 0xd4, 0xd9, + 0x52, 0xff, 0xa5, 0xfb, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xda, 0x65, 0xd1, 0xa5, 0x04, + 0x00, 0x00, } func (this *CredentialsRequest) Equal(that interface{}) bool { @@ -623,6 +632,9 @@ func (this *FetchTokenResponse) Equal(that interface{}) bool { if this.IssuedAt != that1.IssuedAt { return false } + if this.Anonymous != that1.Anonymous { + return false + } return true } func (this *GetTokenAuthorityRequest) Equal(that interface{}) bool { @@ -769,11 +781,12 @@ func (this *FetchTokenResponse) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&auth.FetchTokenResponse{") s = append(s, "Token: "+fmt.Sprintf("%#v", this.Token)+",\n") s = append(s, "ExpiresIn: "+fmt.Sprintf("%#v", this.ExpiresIn)+",\n") s = append(s, "IssuedAt: "+fmt.Sprintf("%#v", this.IssuedAt)+",\n") + s = append(s, "Anonymous: "+fmt.Sprintf("%#v", this.Anonymous)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1164,6 +1177,18 @@ func (m *FetchTokenResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Anonymous { + i-- + if m.Anonymous { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x6 + i-- + dAtA[i] = 0x98 + } if m.IssuedAt != 0 { i = encodeVarintAuth(dAtA, i, uint64(m.IssuedAt)) i-- @@ -1413,6 +1438,9 @@ func (m *FetchTokenResponse) Size() (n int) { if m.IssuedAt != 0 { n += 1 + sovAuth(uint64(m.IssuedAt)) } + if m.Anonymous { + n += 3 + } return n } @@ -1529,6 +1557,7 @@ func (this *FetchTokenResponse) String() string { `Token:` + fmt.Sprintf("%v", this.Token) + `,`, `ExpiresIn:` + fmt.Sprintf("%v", this.ExpiresIn) + `,`, `IssuedAt:` + fmt.Sprintf("%v", this.IssuedAt) + `,`, + `Anonymous:` + fmt.Sprintf("%v", this.Anonymous) + `,`, `}`, }, "") return s @@ -2089,6 +2118,26 @@ func (m *FetchTokenResponse) Unmarshal(dAtA []byte) error { break } } + case 99: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Anonymous", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAuth + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Anonymous = bool(v != 0) default: iNdEx = preIndex skippy, err := skipAuth(dAtA[iNdEx:]) diff --git a/session/auth/auth.proto b/session/auth/auth.proto index 1b4667b91..f40bbc59c 100644 --- a/session/auth/auth.proto +++ b/session/auth/auth.proto @@ -4,51 +4,49 @@ package moby.filesync.v1; option go_package = "auth"; -service Auth{ - rpc Credentials(CredentialsRequest) returns (CredentialsResponse); - rpc FetchToken(FetchTokenRequest) returns (FetchTokenResponse); - rpc GetTokenAuthority(GetTokenAuthorityRequest) returns (GetTokenAuthorityResponse); - rpc VerifyTokenAuthority(VerifyTokenAuthorityRequest) returns (VerifyTokenAuthorityResponse); +service Auth { + rpc Credentials(CredentialsRequest) returns (CredentialsResponse); + rpc FetchToken(FetchTokenRequest) returns (FetchTokenResponse); + rpc GetTokenAuthority(GetTokenAuthorityRequest) + returns (GetTokenAuthorityResponse); + rpc VerifyTokenAuthority(VerifyTokenAuthorityRequest) + returns (VerifyTokenAuthorityResponse); } -message CredentialsRequest { - string Host = 1; -} +message CredentialsRequest { string Host = 1; } message CredentialsResponse { - string Username = 1; - string Secret = 2; + string Username = 1; + string Secret = 2; } message FetchTokenRequest { - string ClientID = 1; - string Host = 2; - string Realm = 3; - string Service = 4; - repeated string Scopes = 5; + string ClientID = 1; + string Host = 2; + string Realm = 3; + string Service = 4; + repeated string Scopes = 5; } message FetchTokenResponse { - string Token = 1; - int64 ExpiresIn = 2; // seconds - int64 IssuedAt = 3; // timestamp + string Token = 1; + int64 ExpiresIn = 2; // seconds + int64 IssuedAt = 3; // timestamp + + bool Anonymous = 99; // earthly-specific } message GetTokenAuthorityRequest { - string Host = 1; - bytes Salt = 2; + string Host = 1; + bytes Salt = 2; } -message GetTokenAuthorityResponse { - bytes PublicKey = 1; -} +message GetTokenAuthorityResponse { bytes PublicKey = 1; } message VerifyTokenAuthorityRequest { - string Host = 1; - bytes Payload = 2; - bytes Salt = 3; + string Host = 1; + bytes Payload = 2; + bytes Salt = 3; } -message VerifyTokenAuthorityResponse { - bytes Signed = 1; -} +message VerifyTokenAuthorityResponse { bytes Signed = 1; } diff --git a/session/auth/authprovider/authprovider.go b/session/auth/authprovider/authprovider.go index 87618caa3..1964f2755 100644 --- a/session/auth/authprovider/authprovider.go +++ b/session/auth/authprovider/authprovider.go @@ -78,7 +78,7 @@ func (ap *authProvider) FetchToken(ctx context.Context, req *auth.FetchTokenRequ // check for statically configured bearer token if ac.RegistryToken != "" { - return toTokenResponse(ac.RegistryToken, time.Time{}, 0), nil + return toTokenResponse(ac.RegistryToken, time.Time{}, 0, false), nil } creds, err := ap.credentials(req.Host) @@ -127,19 +127,20 @@ func (ap *authProvider) FetchToken(ctx context.Context, req *auth.FetchTokenRequ if err != nil { return nil, err } - return toTokenResponse(resp.Token, resp.IssuedAt, resp.ExpiresIn), nil + return toTokenResponse(resp.Token, resp.IssuedAt, resp.ExpiresIn, false), nil } } return nil, err } - return toTokenResponse(resp.AccessToken, resp.IssuedAt, resp.ExpiresIn), nil + return toTokenResponse(resp.AccessToken, resp.IssuedAt, resp.ExpiresIn, false), nil } // do request anonymously resp, err := authutil.FetchToken(ctx, httpClient, nil, to) if err != nil { return nil, errors.Wrap(err, "failed to fetch anonymous token") } - return toTokenResponse(resp.Token, resp.IssuedAt, resp.ExpiresIn), nil + + return toTokenResponse(resp.Token, resp.IssuedAt, resp.ExpiresIn, true), nil } func (ap *authProvider) tlsConfig(host string) (*tls.Config, error) { @@ -276,13 +277,14 @@ func (ap *authProvider) getAuthorityKey(host string, salt []byte) (ed25519.Priva return ed25519.NewKeyFromSeed(sum[:ed25519.SeedSize]), nil } -func toTokenResponse(token string, issuedAt time.Time, expires int) *auth.FetchTokenResponse { +func toTokenResponse(token string, issuedAt time.Time, expires int, anonymous bool) *auth.FetchTokenResponse { if expires == 0 { expires = defaultExpiration } resp := &auth.FetchTokenResponse{ Token: token, ExpiresIn: int64(expires), + Anonymous: anonymous, // earthly-specific } if !issuedAt.IsZero() { resp.IssuedAt = issuedAt.Unix() diff --git a/solver/jobs.go b/solver/jobs.go index d927352a2..f6bf52c70 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/otel/trace" ) -// ResolveOpFunc finds an Op implementation for a Vertex +// ResolveOpFunc finds an Op implementation for a Vertex. type ResolveOpFunc func(Vertex, Builder) (Op, error) type Builder interface { @@ -42,6 +42,8 @@ type Solver struct { updateCond *sync.Cond s *scheduler index *edgeIndex + + simple *simpleSolver } type state struct { @@ -256,21 +258,34 @@ type Job struct { type SolverOpt struct { ResolveOpFunc ResolveOpFunc DefaultCache CacheManager + ResultSource ResultSource + CommitRefFunc CommitRefFunc + IsRunOnceFunc IsRunOnceFunc } func NewSolver(opts SolverOpt) *Solver { if opts.DefaultCache == nil { opts.DefaultCache = NewInMemoryCacheManager() } - jl := &Solver{ + solver := &Solver{ jobs: make(map[string]*Job), actives: make(map[digest.Digest]*state), opts: opts, index: newEdgeIndex(), } - jl.s = newScheduler(jl) - jl.updateCond = sync.NewCond(jl.mu.RLocker()) - return jl + + simple := newSimpleSolver( + opts.ResolveOpFunc, + opts.CommitRefFunc, + solver, + opts.ResultSource, + opts.IsRunOnceFunc, + ) + solver.simple = simple + + solver.s = newScheduler(solver) + solver.updateCond = sync.NewCond(solver.mu.RLocker()) + return solver } func (jl *Solver) setEdge(e Edge, newEdge *edge) { @@ -310,6 +325,7 @@ func (jl *Solver) getEdge(e Edge) *edge { } func (jl *Solver) subBuild(ctx context.Context, e Edge, parent Vertex) (CachedResult, error) { + // MH: Does not appear to be called in my tests v, err := jl.load(e.Vertex, parent, nil) if err != nil { return nil, err @@ -533,19 +549,28 @@ func (j *Job) Build(ctx context.Context, e Edge) (CachedResultWithProvenance, er j.span = span } - v, err := j.list.load(e.Vertex, nil, j) + // TODO: Separate the new solver code from the original Solver & Job code. + res, err := j.list.simple.build(ctx, j, e) if err != nil { return nil, err } - e.Vertex = v - res, err := j.list.s.build(ctx, e) - if err != nil { - return nil, err - } + // MH: Previous solver code is disabled below. + + // v, err := j.list.load(e.Vertex, nil, j) + // if err != nil { + // return nil, err + // } + // e.Vertex = v + + // res, err := j.list.s.build(ctx, e) + // if err != nil { + // return nil, err + // } + + // j.list.mu.Lock() + // defer j.list.mu.Unlock() - j.list.mu.Lock() - defer j.list.mu.Unlock() return &withProvenance{CachedResult: res, j: j, e: e}, nil } diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index 8bfd96e46..e445c9539 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -8,9 +8,11 @@ import ( "time" "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes/docker" "github.com/mitchellh/hashstructure/v2" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/remotecache" + "github.com/moby/buildkit/cache/remotecache/registry" "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/exporter" @@ -31,7 +33,6 @@ import ( "github.com/moby/buildkit/worker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" ) type llbBridge struct { @@ -43,6 +44,10 @@ type llbBridge struct { cms map[string]solver.CacheManager cmsMu sync.Mutex sm *session.Manager + registryHosts docker.RegistryHosts + workerRemoteSource *worker.WorkerRemoteSource + importDone map[string]chan struct{} + importMu sync.Mutex } func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error { @@ -78,11 +83,6 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp return nil, err } - // TODO FIXME earthly-specific wait group is required to ensure the remotecache/registry's ResolveCacheImporterFunc can run - // which requires the session to remain open in order to get dockerhub (or any other registry) credentials. - // It seems like the cleaner approach is to bake this in somewhere into the edge or Load - eg, _ := errgroup.WithContext(ctx) - srcPol, err := loadSourcePolicy(b.builder) if err != nil { return nil, err @@ -94,62 +94,13 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp } polEngine = sourcepolicy.NewEngine(pol) - if err != nil { - return nil, err - } - } - var cms []solver.CacheManager - for _, im := range cacheImports { - cmID, err := cmKey(im) - if err != nil { - return nil, err - } - b.cmsMu.Lock() - var cm solver.CacheManager - if prevCm, ok := b.cms[cmID]; !ok { - func(cmID string, im gw.CacheOptionsEntry) { - cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) { - var cmNew solver.CacheManager - if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error { - resolveCI, ok := b.resolveCacheImporterFuncs[im.Type] - if !ok { - return errors.Errorf("unknown cache importer: %s", im.Type) - } - ci, desc, err := resolveCI(ctx, g, im.Attrs) - if err != nil { - return errors.Wrapf(err, "failed to configure %v cache importer", im.Type) - } - cmNew, err = ci.Resolve(ctx, desc, cmID, w) - return err - }); err != nil { - bklog.G(ctx).Debugf("error while importing cache manifest from cmId=%s: %v", cmID, err) - return nil, err - } - return cmNew, nil - }) - - cmInst := cm - eg.Go(func() error { - if lcm, ok := cmInst.(*lazyCacheManager); ok { - lcm.wait() - } - return nil - }) - }(cmID, im) - b.cms[cmID] = cm - } else { - cm = prevCm - } - cms = append(cms, cm) - b.cmsMu.Unlock() - } - err = eg.Wait() - if err != nil { - return nil, err } + + b.processImports(ctx, cacheImports, w) + dpc := &detectPrunedCacheID{} - edge, err := Load(ctx, def, polEngine, dpc.Load, ValidateEntitlements(ent), WithCacheSources(cms), NormalizeRuntimePlatforms(), WithValidateCaps()) + edge, err := Load(ctx, def, polEngine, dpc.Load, ValidateEntitlements(ent), NormalizeRuntimePlatforms(), WithValidateCaps()) if err != nil { return nil, errors.Wrap(err, "failed to load LLB") } @@ -173,6 +124,57 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp return res, nil } +func (b *llbBridge) processImports(ctx context.Context, cacheImports []gw.CacheOptionsEntry, w worker.Worker) { + var importRefs []string + + // Earthly custom inline cache handling. Other cache import types are ignored. + for _, cacheImport := range cacheImports { + if cacheImport.Type != "registry" { + continue + } + + importRef := cacheImport.Attrs["ref"] + importRefs = append(importRefs, importRef) + + b.importMu.Lock() + _, ok := b.importDone[importRef] + if ok { + b.importMu.Unlock() + continue + } + done := make(chan struct{}) + b.importDone[importRef] = done + b.importMu.Unlock() + + remotes := map[digest.Digest]*solver.Remote{} + name := fmt.Sprintf("importing cache manifest from %s", importRef) + + err := inBuilderContext(ctx, b.builder, name, "", func(ctx context.Context, g session.Group) error { + var err error + remotes, err = registry.EarthlyInlineCacheRemotes(ctx, b.sm, w, b.registryHosts, g, cacheImport.Attrs) + return err + }) + if err != nil { + bklog.G(ctx).Warnf("failed to import cache manifest from %s", importRef) + } + + if len(remotes) > 0 { + for cacheKey, remote := range remotes { + b.workerRemoteSource.AddResult(ctx, cacheKey, remote) + } + } + + close(done) + } + + for _, importRef := range importRefs { + b.importMu.Lock() + done := b.importDone[importRef] + b.importMu.Unlock() + <-done + } +} + // getExporter is earthly specific code which extracts the configured exporter // from the job's metadata func (b *llbBridge) getExporter(ctx context.Context) (*ExporterRequest, error) { diff --git a/solver/llbsolver/inline.go b/solver/llbsolver/inline.go new file mode 100644 index 000000000..133aa13e3 --- /dev/null +++ b/solver/llbsolver/inline.go @@ -0,0 +1,94 @@ +package llbsolver + +import ( + "context" + "encoding/json" + "fmt" + + cacheconfig "github.com/moby/buildkit/cache/config" + "github.com/moby/buildkit/exporter" + "github.com/moby/buildkit/exporter/containerimage/exptypes" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver/result" + "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +type earthlyInlineCacheItem struct { + Key digest.Digest `json:"cacheKey"` + Descriptor digest.Digest `json:"descriptor"` +} + +// earthlyInlineCache attaches custom "inline cache" metadata which can be used +// by a new build to load image layer blobs and use them as cache results. +func earthlyInlineCache(ctx context.Context, job *solver.Job, exp exporter.ExporterInstance, cached *result.Result[solver.CachedResult]) (map[string][]byte, error) { + if cached.Ref != nil { + return nil, errors.New("unexpected ref") + } + + meta := map[string][]byte{} + + err := inBuilderContext(ctx, job, "preparing layers for inline cache", job.SessionID+"-cache-inline", func(ctx context.Context, _ session.Group) error { + for k, res := range cached.Refs { + val, err := earthlyInlineCacheDigests(ctx, job, exp, res) + if err != nil { + return err + } + meta[fmt.Sprintf("%s/%s", exptypes.EarthlyInlineCache, k)] = val + } + return nil + }) + + if err != nil { + return nil, err + } + + return meta, nil +} + +// earthlyInlineCacheDigests creates a map of computed cache keys to manifest +// layer hashes which will be used to load inline cache blobs. +func earthlyInlineCacheDigests(ctx context.Context, job *solver.Job, exp exporter.ExporterInstance, res solver.CachedResult) ([]byte, error) { + workerRef, ok := res.Sys().(*worker.WorkerRef) + if !ok { + return nil, errors.Errorf("invalid reference: %T", res.Sys()) + } + + sess := session.NewGroup(job.SessionID) + + remotes, err := workerRef.GetRemotes(ctx, true, cacheconfig.RefConfig{Compression: exp.Config().Compression()}, false, sess) + if err != nil || len(remotes) == 0 { + return nil, nil + } + + var ( + remote = remotes[0] + cacheItems = []earthlyInlineCacheItem{} + cacheKeys = res.CacheKeys() + ) + + for i := 0; i < len(cacheKeys) && i < len(remote.Descriptors); i++ { + cacheItems = append(cacheItems, earthlyInlineCacheItem{ + Key: cacheKeys[i].Digest(), + Descriptor: remote.Descriptors[i].Digest, + }) + } + + val, err := json.Marshal(cacheItems) + if err != nil { + return nil, err + } + + return val, nil +} + +func hasInlineCacheExporter(exporters []RemoteCacheExporter) bool { + for _, exp := range exporters { + if _, ok := asInlineCache(exp.Exporter); ok { + return true + } + } + return false +} diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index eca2ac14d..ae794ee02 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -269,6 +269,9 @@ func captureProvenance(ctx context.Context, res solver.CachedResultWithProvenanc switch op := pp.(type) { case *ops.SourceOp: id, pin := op.Pin() + if pin == "" { // Hack: latest cache opt changes led to an empty value here. Investigate. + return nil + } err := id.Capture(c, pin) if err != nil { return err diff --git a/solver/llbsolver/simple.go b/solver/llbsolver/simple.go new file mode 100644 index 000000000..3646c5677 --- /dev/null +++ b/solver/llbsolver/simple.go @@ -0,0 +1,29 @@ +package llbsolver + +import ( + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/solver/llbsolver/ops" +) + +// isRunOnce returns a function that can be called to determine if a Vertex +// contains an operation that must be run at least once per build. +func (s *Solver) isRunOnceOp() solver.IsRunOnceFunc { + return func(v solver.Vertex, b solver.Builder) (bool, error) { + w, err := s.resolveWorker() + if err != nil { + return false, err + } + + op, err := w.ResolveOp(v, s.Bridge(b), s.sm) + if err != nil { + return false, err + } + + switch op.(type) { + case *ops.SourceOp: + return true, nil + default: + return false, nil + } + } +} diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index ccc3504c0..285f69c26 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/containerd/containerd/remotes/docker" intoto "github.com/in-toto/in-toto-golang/in_toto" slsa02 "github.com/in-toto/in-toto-golang/in_toto/slsa_provenance/v0.2" controlapi "github.com/moby/buildkit/api/services/control" @@ -79,6 +80,8 @@ type Opt struct { WorkerController *worker.Controller HistoryQueue *HistoryQueue ResourceMonitor *resources.Monitor + RootDir string + RegistryHosts docker.RegistryHosts } type Solver struct { @@ -93,6 +96,8 @@ type Solver struct { entitlements []string history *HistoryQueue sysSampler *resources.Sampler[*resourcetypes.SysSample] + registryHosts docker.RegistryHosts + workerRemoteSource *worker.WorkerRemoteSource } // Processor defines a processing function to be applied after solving, but @@ -100,6 +105,13 @@ type Solver struct { type Processor func(ctx context.Context, result *Result, s *Solver, j *solver.Job, usage *resources.SysSampler) (*Result, error) func New(opt Opt) (*Solver, error) { + defaultWorker, err := opt.WorkerController.GetDefault() + if err != nil { + return nil, err + } + + remoteSource := worker.NewWorkerRemoteSource(defaultWorker) + s := &Solver{ workerController: opt.WorkerController, resolveWorker: defaultResolver(opt.WorkerController), @@ -110,6 +122,8 @@ func New(opt Opt) (*Solver, error) { sm: opt.SessionManager, entitlements: opt.Entitlements, history: opt.HistoryQueue, + registryHosts: opt.RegistryHosts, + workerRemoteSource: remoteSource, } sampler, err := resources.NewSysSampler() @@ -118,9 +132,22 @@ func New(opt Opt) (*Solver, error) { } s.sysSampler = sampler + workerSource, err := worker.NewWorkerResultSource(opt.WorkerController, opt.RootDir) + if err != nil { + return nil, err + } + + sources := worker.NewCombinedResultSource( + workerSource, + remoteSource, + ) + s.solver = solver.NewSolver(solver.SolverOpt{ ResolveOpFunc: s.resolver(), + IsRunOnceFunc: s.isRunOnceOp(), DefaultCache: opt.CacheManager, + ResultSource: sources, + CommitRefFunc: worker.FinalizeRef, }) return s, nil } @@ -144,6 +171,10 @@ func (s *Solver) bridge(b solver.Builder) *provenanceBridge { resolveCacheImporterFuncs: s.resolveCacheImporterFuncs, cms: map[string]solver.CacheManager{}, sm: s.sm, + registryHosts: s.registryHosts, + workerRemoteSource: s.workerRemoteSource, + importDone: map[string]chan struct{}{}, + importMu: sync.Mutex{}, }} } @@ -553,16 +584,17 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro return nil, err } - cacheExporters, inlineCacheExporter := splitCacheExporters(exp.CacheExporters) - + cacheExporters, _ := splitCacheExporters(exp.CacheExporters) var exporterResponse map[string]string if e := exp.Exporter; e != nil { - meta, err := runInlineCacheExporter(ctx, e, inlineCacheExporter, j, cached) - if err != nil { - return nil, err - } - for k, v := range meta { - inp.AddMeta(k, v) + if hasInlineCacheExporter(exp.CacheExporters) { + meta, err := earthlyInlineCache(ctx, j, e, cached) + if err != nil { + return nil, errors.Wrap(err, "failed prepare inline cache") + } + for k, v := range meta { + inp.AddMeta(k, v) + } } if err := inBuilderContext(ctx, j, e.Name(), j.SessionID+"-export", func(ctx context.Context, _ session.Group) error { @@ -573,6 +605,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro } } + // Deprecated. Can be removed later. cacheExporterResponse, err := runCacheExporters(ctx, cacheExporters, j, cached, inp) if err != nil { return nil, err @@ -598,6 +631,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro }, nil } +// Deprecated. Can be removed later. func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult], inp *result.Result[cache.ImmutableRef]) (map[string]string, error) { eg, ctx := errgroup.WithContext(ctx) g := session.NewGroup(j.SessionID) @@ -650,6 +684,7 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j * return cacheExporterResponse, nil } +// Deprecated. Can be removed later. func runInlineCacheExporter(ctx context.Context, e exporter.ExporterInstance, inlineExporter *RemoteCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult]) (map[string][]byte, error) { meta := map[string][]byte{} if inlineExporter == nil { @@ -831,6 +866,7 @@ func asInlineCache(e remotecache.Exporter) (inlineCacheExporter, bool) { return ie, ok } +// Deprecated. Can be removed later. func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedResult, compressionopt compression.Config, g session.Group) ([]byte, error) { ie, ok := asInlineCache(e) if !ok { diff --git a/solver/scheduler.go b/solver/scheduler.go index d36815615..e462ef288 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -34,7 +34,7 @@ func newScheduler(ef edgeFactory) *scheduler { } s.cond = cond.NewStatefulCond(&s.mu) - go s.loop() + //go s.loop() return s } diff --git a/solver/simple.go b/solver/simple.go new file mode 100644 index 000000000..d8ec1c940 --- /dev/null +++ b/solver/simple.go @@ -0,0 +1,582 @@ +package solver + +import ( + "context" + "crypto/sha256" + "fmt" + "hash" + "io" + "sync" + "time" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/moby/buildkit/util/bklog" + "github.com/moby/buildkit/util/progress" + "github.com/moby/buildkit/util/tracing" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +const ( + runOnceLRUSize = 2000 + slowCacheLRUSize = 5000 + parallelGuardWait = time.Millisecond * 100 +) + +// CommitRefFunc can be used to finalize a Result's ImmutableRef. +type CommitRefFunc func(ctx context.Context, result Result) error + +// IsRunOnceFunc determines if the vertex represents an operation that needs to +// be run at least once. +type IsRunOnceFunc func(Vertex, Builder) (bool, error) + +// ResultSource can be any source (local or remote) that allows one to load a +// Result using a cache key digest. +type ResultSource interface { + Load(ctx context.Context, cacheKey digest.Digest) (Result, bool, error) + Link(ctx context.Context, cacheKey digest.Digest, refID string) error +} + +// runOnceCtrl is a simple wrapper around an LRU cache. It's used to ensure that +// an operation is only run once per job. However, this is not guaranteed, as +// the struct uses a reasonable small LRU size to preview excessive memory use. +type runOnceCtrl struct { + lru *simplelru.LRU + mu sync.Mutex +} + +func newRunOnceCtrl() *runOnceCtrl { + lru, _ := simplelru.NewLRU(runOnceLRUSize, nil) // Error impossible on positive first argument. + return &runOnceCtrl{lru: lru} +} + +// hasRun: Here, we use an LRU cache to whether we need to execute the source +// operation for this job. The jobs may be re-run if the LRU size is exceeded, +// but this shouldn't have a big impact on the build. The trade-off is +// worthwhile given the memory-friendliness of LRUs. +func (s *runOnceCtrl) hasRun(d digest.Digest, sessionID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + key := fmt.Sprintf("%s:%s", sessionID, d) + ret := s.lru.Contains(key) + + s.lru.Add(key, struct{}{}) + + return ret +} + +type slowCacheStore struct { + lru *simplelru.LRU + mu sync.Mutex +} + +func newSlowCacheStore() *slowCacheStore { + lru, _ := simplelru.NewLRU(slowCacheLRUSize, nil) // Error impossible on positive first argument. + return &slowCacheStore{lru: lru} +} + +func (s *slowCacheStore) get(cacheKey digest.Digest, refID string) (digest.Digest, bool) { + s.mu.Lock() + defer s.mu.Unlock() + + key := fmt.Sprintf("%s:%s", cacheKey, refID) + v, ok := s.lru.Get(key) + if !ok { + return "", false + } + + return v.(digest.Digest), true +} + +func (s *slowCacheStore) set(cacheKey digest.Digest, refID string, slow digest.Digest) { + s.mu.Lock() + defer s.mu.Unlock() + + key := fmt.Sprintf("%s:%s", cacheKey, refID) + s.lru.Add(key, slow) +} + +type simpleSolver struct { + resolveOpFunc ResolveOpFunc + isRunOnceFunc IsRunOnceFunc + commitRefFunc CommitRefFunc + solver *Solver + parallelGuard *parallelGuard + resultSource ResultSource + runOnceCtrl *runOnceCtrl + slowCacheStore *slowCacheStore +} + +func newSimpleSolver( + resolveOpFunc ResolveOpFunc, + commitRefFunc CommitRefFunc, + solver *Solver, + resultSource ResultSource, + isRunOnceFunc IsRunOnceFunc, +) *simpleSolver { + return &simpleSolver{ + parallelGuard: newParallelGuard(parallelGuardWait), + resolveOpFunc: resolveOpFunc, + commitRefFunc: commitRefFunc, + solver: solver, + resultSource: resultSource, + isRunOnceFunc: isRunOnceFunc, + runOnceCtrl: newRunOnceCtrl(), + slowCacheStore: newSlowCacheStore(), + } +} + +func (s *simpleSolver) build(ctx context.Context, job *Job, e Edge) (CachedResult, error) { + + // Ordered list of vertices to build. + digests, vertices := s.exploreVertices(e) + + var ret Result + var expKeys []ExportableCacheKey + + runCacheMan := newCacheKeyManager() + + closers := []func(context.Context) error{} + + for _, d := range digests { + vertex, ok := vertices[d] + if !ok { + return nil, errors.Errorf("digest %s not found", d) + } + + res, cacheKey, err := s.buildOne(ctx, runCacheMan, d, vertex, job, e) + if err != nil { + return nil, err + } + + closers = append(closers, func(ctx context.Context) error { + return res.Release(ctx) + }) + + ret = res + + // Hijack the CacheKey type in order to export a reference from the new + // cache key to the ref ID. + expKeys = append(expKeys, ExportableCacheKey{ + CacheKey: &CacheKey{ + ID: res.ID(), + digest: cacheKey, + }, + Exporter: nil, // We're not using an exporter here. + }) + } + + // Defer releasing of results until this build has finished to limit + // performance impact. + go func() { + ctx := context.Background() + for i := len(closers) - 1; i >= 0; i-- { + if err := closers[i](ctx); err != nil { + bklog.G(ctx).Warnf("failed to release: %v", err) + } + } + }() + + return NewCachedResult(ret, expKeys), nil +} + +func (s *simpleSolver) buildOne(ctx context.Context, runCacheMan *cacheKeyManager, d digest.Digest, vertex Vertex, job *Job, e Edge) (Result, digest.Digest, error) { + + st := s.state(vertex, job) + + // Add cache opts to context as they will be accessed by cache retrieval. + ctx = withAncestorCacheOpts(ctx, st) + + // CacheMap populates required fields in SourceOp. + cm, err := st.op.CacheMap(ctx, int(e.Index)) + if err != nil { + return nil, "", err + } + + inputs, err := s.preprocessInputs(ctx, runCacheMan, st, vertex, cm.CacheMap, job) + if err != nil { + notifyError(ctx, st, false, err) + return nil, "", err + } + + cacheKey, err := runCacheMan.cacheKey(ctx, d) + if err != nil { + return nil, "", err + } + + // Ensure we don't have multiple threads working on the same operation. The + // computed cache key needs to be used here instead of the vertex + // digest. This is because the vertex can sometimes differ for the same + // operation depending on its ancestors. + wait, done := s.parallelGuard.acquire(ctx, cacheKey) + defer done() + <-wait + + isRunOnce, err := s.isRunOnceFunc(vertex, job) + if err != nil { + return nil, "", err + } + + // Special case for source operations. They need to be run once per build or + // content changes will not be reliably detected. + mayLoadCache := !isRunOnce || isRunOnce && s.runOnceCtrl.hasRun(cacheKey, job.SessionID) + + if mayLoadCache { + v, ok, err := s.resultSource.Load(ctx, cacheKey) + if err != nil { + return nil, "", err + } + + if ok && v != nil { + notifyError(ctx, st, true, nil) + return v, cacheKey, nil + } + } + + results, _, err := st.op.Exec(ctx, inputs) + if err != nil { + return nil, "", err + } + + res := results[int(e.Index)] + + for i := range results { + if i != int(e.Index) { + err = results[i].Release(ctx) + if err != nil { + return nil, "", err + } + } + } + + // Some operations need to be left in a mutable state. All others need to be + // committed in order to be cached and loaded correctly. + if !isRunOnce { + err = s.commitRefFunc(ctx, res) + if err != nil { + return nil, "", err + } + } + + err = s.resultSource.Link(ctx, cacheKey, res.ID()) + if err != nil { + return nil, "", err + } + + return res, cacheKey, nil +} + +func notifyError(ctx context.Context, st *state, cached bool, err error) { + ctx = progress.WithProgress(ctx, st.mpw) + notifyCompleted := notifyStarted(ctx, &st.clientVertex, cached) + notifyCompleted(err, cached) +} + +func (s *simpleSolver) state(vertex Vertex, job *Job) *state { + s.solver.mu.Lock() + defer s.solver.mu.Unlock() + if st, ok := s.solver.actives[vertex.Digest()]; ok { + st.jobs[job] = struct{}{} + return st + } + return s.createState(vertex, job) +} + +// createState creates a new state struct with required and placeholder values. +func (s *simpleSolver) createState(vertex Vertex, job *Job) *state { + defaultCache := NewInMemoryCacheManager() + + st := &state{ + opts: SolverOpt{DefaultCache: defaultCache, ResolveOpFunc: s.resolveOpFunc}, + parents: map[digest.Digest]struct{}{}, + childVtx: map[digest.Digest]struct{}{}, + allPw: map[progress.Writer]struct{}{}, + mpw: progress.NewMultiWriter(progress.WithMetadata("vertex", vertex.Digest())), + mspan: tracing.NewMultiSpan(), + vtx: vertex, + clientVertex: initClientVertex(vertex), + edges: map[Index]*edge{}, + index: s.solver.index, + mainCache: defaultCache, + cache: map[string]CacheManager{}, + solver: s.solver, + origDigest: vertex.Digest(), + } + + st.jobs = map[*Job]struct{}{ + job: {}, + } + + st.mpw.Add(job.pw) + + // Hack: this is used in combination with withAncestorCacheOpts to pass + // necessary dependency information to a few caching components. We'll need + // to expire these keys somehow. We should also move away from using the + // actives map, but it's still being used by withAncestorCacheOpts for now. + s.solver.actives[vertex.Digest()] = st + + op := newSharedOp(st.opts.ResolveOpFunc, st.opts.DefaultCache, st) + + // Required to access cache map results on state. + st.op = op + + return st +} + +func (s *simpleSolver) exploreVertices(e Edge) ([]digest.Digest, map[digest.Digest]Vertex) { + + digests := []digest.Digest{e.Vertex.Digest()} + vertices := map[digest.Digest]Vertex{ + e.Vertex.Digest(): e.Vertex, + } + + for _, edge := range e.Vertex.Inputs() { + d, v := s.exploreVertices(edge) + digests = append(d, digests...) + for key, value := range v { + vertices[key] = value + } + } + + ret := []digest.Digest{} + m := map[digest.Digest]struct{}{} + for _, d := range digests { + if _, ok := m[d]; !ok { + ret = append(ret, d) + m[d] = struct{}{} + } + } + + return ret, vertices +} + +func (s *simpleSolver) preprocessInputs(ctx context.Context, runCacheMan *cacheKeyManager, st *state, vertex Vertex, cm *CacheMap, job *Job) ([]Result, error) { + // This struct is used to reconstruct a cache key from an LLB digest & all + // parents using consistent digests that depend on the full dependency chain. + scm := simpleCacheMap{ + digest: cm.Digest, + deps: make([]cacheMapDep, len(cm.Deps)), + inputs: make([]digest.Digest, len(cm.Deps)), + } + + // By default we generate a cache key that's not salted as the keys need to + // persist across builds. However, when cache is disabled, we scope the keys + // to the current session. This is because some jobs will be duplicated in a + // given build & will need to be cached in a limited way. + if vertex.Options().IgnoreCache { + scm.salt = job.SessionID + } + + var inputs []Result + + for i, in := range vertex.Inputs() { + + d := in.Vertex.Digest() + + // Compute a cache key given the LLB digest value. + cacheKey, err := runCacheMan.cacheKey(ctx, d) + if err != nil { + return nil, err + } + + // Lookup the result for that cache key. + res, ok, err := s.resultSource.Load(ctx, cacheKey) + if err != nil { + return nil, err + } + + if !ok { + return nil, errors.Errorf("result not found for digest: %s", d) + } + + dep := cm.Deps[i] + + // Unlazy the result. + if dep.PreprocessFunc != nil { + err = dep.PreprocessFunc(ctx, res, st) + if err != nil { + return nil, err + } + } + + // Add selectors (usually file references) to the struct. + scm.deps[i] = cacheMapDep{ + selector: dep.Selector, + } + + // ComputeDigestFunc will usually checksum files. This is then used as + // part of the cache key to ensure it's consistent & distinct for this + // operation. The key is then cached based on the key calculated from + // all ancestors & the result ID. + if dep.ComputeDigestFunc != nil { + cachedSlowKey, ok := s.slowCacheStore.get(cacheKey, res.ID()) + if ok { + scm.deps[i].computed = cachedSlowKey + } else { + slowKey, err := dep.ComputeDigestFunc(ctx, res, st) + if err != nil { + bklog.G(ctx).Warnf("failed to compute digest: %v", err) + return nil, err + } else { + scm.deps[i].computed = slowKey + s.slowCacheStore.set(cacheKey, res.ID(), slowKey) + } + } + } + + // The result can be released now that the preprocess & slow cache + // digest functions have been run. This is crucial as failing to do so + // will lead to full file copying from previously executed source + // operations. Releasing can be slow, so we run these concurrently. + go func() { + if err := res.Release(ctx); err != nil { + bklog.G(ctx).Warnf("failed to release result: %v", err) + } + }() + + // Add input references to the struct as to link dependencies. + scm.inputs[i] = in.Vertex.Digest() + + // Add the cached result to the input set. These inputs are used to + // reconstruct dependencies (mounts, etc.) for a new container run. + inputs = append(inputs, res) + } + + runCacheMan.add(vertex.Digest(), &scm) + + return inputs, nil +} + +type cacheKeyManager struct { + cacheMaps map[digest.Digest]*simpleCacheMap + mu sync.Mutex +} + +type cacheMapDep struct { + selector digest.Digest + computed digest.Digest +} + +type simpleCacheMap struct { + digest digest.Digest + inputs []digest.Digest + deps []cacheMapDep + salt string +} + +func newCacheKeyManager() *cacheKeyManager { + return &cacheKeyManager{ + cacheMaps: map[digest.Digest]*simpleCacheMap{}, + } +} + +func (m *cacheKeyManager) add(d digest.Digest, s *simpleCacheMap) { + m.mu.Lock() + m.cacheMaps[d] = s + m.mu.Unlock() +} + +// cacheKey recursively generates a cache key based on a sequence of ancestor +// operations & their cacheable values. +func (m *cacheKeyManager) cacheKey(ctx context.Context, d digest.Digest) (digest.Digest, error) { + h := sha256.New() + + err := m.cacheKeyRecurse(ctx, d, h) + if err != nil { + return "", err + } + + return newDigest(fmt.Sprintf("%x", h.Sum(nil))), nil +} + +func (m *cacheKeyManager) cacheKeyRecurse(ctx context.Context, d digest.Digest, h hash.Hash) error { + m.mu.Lock() + c, ok := m.cacheMaps[d] + m.mu.Unlock() + if !ok { + return errors.New("missing cache map key") + } + + if c.salt != "" { + io.WriteString(h, c.salt) + } + + io.WriteString(h, c.digest.String()) + + for _, dep := range c.deps { + if dep.selector != "" { + io.WriteString(h, dep.selector.String()) + } + if dep.computed != "" { + io.WriteString(h, dep.computed.String()) + } + } + + for _, in := range c.inputs { + err := m.cacheKeyRecurse(ctx, in, h) + if err != nil { + return err + } + } + + return nil +} + +type parallelGuard struct { + wait time.Duration + active map[digest.Digest]struct{} + mu sync.Mutex +} + +func newParallelGuard(wait time.Duration) *parallelGuard { + return ¶llelGuard{wait: wait, active: map[digest.Digest]struct{}{}} +} + +func (f *parallelGuard) acquire(ctx context.Context, d digest.Digest) (<-chan struct{}, func()) { + + ch := make(chan struct{}) + + closer := func() { + f.mu.Lock() + delete(f.active, d) + f.mu.Unlock() + } + + go func() { + tick := time.NewTicker(f.wait) + defer tick.Stop() + // A function is used here as the above ticker does not execute + // immediately. + check := func() bool { + f.mu.Lock() + if _, ok := f.active[d]; !ok { + f.active[d] = struct{}{} + close(ch) + f.mu.Unlock() + return true + } + f.mu.Unlock() + return false + } + if check() { + return + } + for { + select { + case <-tick.C: + if check() { + return + } + case <-ctx.Done(): + return + } + } + }() + + return ch, closer +} + +func newDigest(s string) digest.Digest { + return digest.NewDigestFromEncoded(digest.SHA256, s) +} diff --git a/source/local/source.go b/source/local/source.go index ae480be2e..67665a00b 100644 --- a/source/local/source.go +++ b/source/local/source.go @@ -123,6 +123,13 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, ind } sessionID = id } + + // Hack: The encoded session ID here is breaking the simplified caching + // approach in "simple.go" as it differs for each request. Use the + // SharedKeyHint property which is provided by Earthly and is based off of + // the path & inode names. + sessionID = ls.src.SharedKeyHint + dt, err := json.Marshal(struct { SessionID string IncludePatterns []string diff --git a/worker/simple.go b/worker/simple.go new file mode 100644 index 000000000..17a2c8165 --- /dev/null +++ b/worker/simple.go @@ -0,0 +1,364 @@ +package worker + +import ( + "context" + "path/filepath" + "sync" + "time" + + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/solver" + "github.com/moby/buildkit/util/bklog" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" +) + +const refIDPrunePeriod = 30 * time.Minute + +// WorkerResultSource abstracts the work involved in loading a Result from a +// worker using a ref ID. +type WorkerResultSource struct { + wc *Controller + ids *refIDStore + prunePeriod time.Duration +} + +// NewWorkerResultSource creates and returns a new *WorkerResultSource. +func NewWorkerResultSource(wc *Controller, rootDir string) (*WorkerResultSource, error) { + ids, err := newRefIDStore(rootDir) + if err != nil { + return nil, err + } + w := &WorkerResultSource{ + wc: wc, + ids: ids, + prunePeriod: refIDPrunePeriod, + } + go w.pruneLoop(context.Background()) + return w, nil +} + +// Load a cached result from a worker. +func (w *WorkerResultSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + fullID, ok, err := w.ids.get(cacheKey) + if err != nil { + return nil, false, err + } + + if !ok { + return nil, false, nil + } + + workerID, refID, err := parseWorkerRef(fullID) + if err != nil { + return nil, false, err + } + + worker, err := w.wc.Get(workerID) + if err != nil { + return nil, false, err + } + + ref, err := worker.LoadRef(ctx, refID, false) + if err != nil { + if cache.IsNotFound(err) { + bklog.G(ctx).Warnf("worker ref not found: %v", err) + return nil, false, nil + } + return nil, false, err + } + + return NewWorkerRefResult(ref, worker), true, nil +} + +// Link a simple solver cache key to the worker ref. +func (w *WorkerResultSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + return w.ids.set(cacheKey, refID) +} + +// pruneLoop attempts to prune stale red IDs from the BoltDB database every prunePeriod. +func (w *WorkerResultSource) pruneLoop(ctx context.Context) { + tick := time.NewTicker(w.prunePeriod) + for range tick.C { + start := time.Now() + examined, pruned, err := w.prune(ctx) + if err != nil { + bklog.G(ctx).Warnf("failed to prune ref IDs: %v", err) + } else { + bklog.G(ctx).Warnf("examined %d, pruned %d stale ref IDs in %s", examined, pruned, time.Now().Sub(start)) + } + } +} + +// exists determines if the worker ref ID exists. It's important that the ref is +// released after being loaded. +func (w *WorkerResultSource) exists(ctx context.Context, fullID string) (bool, error) { + workerID, refID, err := parseWorkerRef(fullID) + if err != nil { + return false, err + } + + worker, err := w.wc.Get(workerID) + if err != nil { + return false, err + } + + ref, err := worker.LoadRef(ctx, refID, false) + if err != nil { + if cache.IsNotFound(err) { + return false, nil + } + return false, err + } + + ref.Release(ctx) + + return true, nil +} + +// prune ref IDs by identifying and purging all stale IDs. Ref IDs are unique +// and will not be rewritten by a fresh build. Hence, it's safe to delete these +// items once a worker ref has been pruned by BuildKit. +func (w *WorkerResultSource) prune(ctx context.Context) (int, int, error) { + var deleteIDs []digest.Digest + var examined, pruned int + + err := w.ids.walk(func(d digest.Digest, id string) error { + exists, err := w.exists(ctx, id) + if err != nil { + return err + } + examined++ + if !exists { + deleteIDs = append(deleteIDs, d) + } + return nil + }) + if err != nil { + return examined, 0, err + } + + for _, deleteID := range deleteIDs { + err = w.ids.del(deleteID) + if err != nil { + return examined, pruned, err + } + pruned++ + } + + return examined, pruned, nil +} + +var _ solver.ResultSource = &WorkerResultSource{} + +// FinalizeRef is a convenience function that calls Finalize on a Result's +// ImmutableRef. The 'worker' package cannot be imported by 'solver' due to an +// import cycle, so this function is passed in with solver.SolverOpt. +func FinalizeRef(ctx context.Context, res solver.Result) error { + sys := res.Sys() + if w, ok := sys.(*WorkerRef); ok { + err := w.ImmutableRef.Finalize(ctx) + if err != nil { + return err + } + } + return nil +} + +// WorkerRemoteSource can be used to fetch a remote worker source. +type WorkerRemoteSource struct { + worker Worker + remotes map[digest.Digest]*solver.Remote + mu sync.Mutex +} + +// NewWorkerRemoteSource creates and returns a remote result source. +func NewWorkerRemoteSource(worker Worker) *WorkerRemoteSource { + return &WorkerRemoteSource{ + worker: worker, + remotes: map[digest.Digest]*solver.Remote{}, + } +} + +// Load a Result from the worker. +func (w *WorkerRemoteSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + w.mu.Lock() + remote, ok := w.remotes[cacheKey] + w.mu.Unlock() + + if !ok { + return nil, false, nil + } + + ref, err := w.worker.FromRemote(ctx, remote) + if err != nil { + return nil, false, err + } + + return NewWorkerRefResult(ref, w.worker), true, nil +} + +func (c *WorkerRemoteSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + return nil // noop +} + +// AddResult adds a solver.Remote source for the given cache key. +func (w *WorkerRemoteSource) AddResult(ctx context.Context, cacheKey digest.Digest, remote *solver.Remote) { + w.mu.Lock() + defer w.mu.Unlock() + w.remotes[cacheKey] = remote +} + +var _ solver.ResultSource = &WorkerRemoteSource{} + +// CombinedResultSource implements solver.ResultSource over a list of sources. +type CombinedResultSource struct { + sources []solver.ResultSource +} + +// NewCombinedResultSource creates and returns a new source from a list of sources. +func NewCombinedResultSource(sources ...solver.ResultSource) *CombinedResultSource { + return &CombinedResultSource{sources: sources} +} + +// Load attempts to load a Result from all underlying sources. +func (c *CombinedResultSource) Load(ctx context.Context, cacheKey digest.Digest) (solver.Result, bool, error) { + for _, source := range c.sources { + res, ok, err := source.Load(ctx, cacheKey) + if err != nil { + return nil, false, err + } + if ok { + return res, true, nil + } + } + return nil, false, nil +} + +// Link a cache key to a ref ID. Only used by the worker result source. +func (c *CombinedResultSource) Link(ctx context.Context, cacheKey digest.Digest, refID string) error { + for _, source := range c.sources { + err := source.Link(ctx, cacheKey, refID) + if err != nil { + return nil + } + } + return nil +} + +var _ solver.ResultSource = &CombinedResultSource{} + +// refIDStore uses a BoltDB database to store links from computed cache keys to +// worker ref IDs. +type refIDStore struct { + db *bolt.DB + rootDir string + bucket string + prunePeriod time.Duration +} + +// newRefIDStore creates and returns a new store and initializes a BoltDB +// instance in the specified root directory. +func newRefIDStore(rootDir string) (*refIDStore, error) { + r := &refIDStore{ + bucket: "ids", + rootDir: rootDir, + prunePeriod: refIDPrunePeriod, + } + err := r.init() + if err != nil { + return nil, err + } + return r, nil +} + +func (r *refIDStore) init() error { + db, err := bolt.Open(filepath.Join(r.rootDir, "ids.db"), 0755, nil) + if err != nil { + return err + } + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(r.bucket)) + return err + }) + r.db = db + return nil +} + +// set a cache key digest to the value of the worker ref ID. It also sets the +// access time for the key. +func (r *refIDStore) set(cacheKey digest.Digest, id string) error { + err := r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + return b.Put([]byte(cacheKey), []byte(id)) + }) + if err != nil { + return errors.Wrap(err, "failed to set ref ID") + } + return nil +} + +// get a worker ref ID given a cache key digest. It also sets the +// access time for the key. +func (r *refIDStore) get(cacheKey digest.Digest) (string, bool, error) { + var id string + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + id = string(b.Get([]byte(cacheKey))) + return nil + }) + if err != nil { + return "", false, errors.Wrap(err, "failed to load ref ID") + } + if id == "" { + return "", false, nil + } + return id, true, nil +} + +// del removes a single cache key from the ref ID database. +func (r *refIDStore) del(cacheKey digest.Digest) error { + err := r.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + return b.Delete([]byte(cacheKey)) + }) + if err != nil { + return errors.Wrap(err, "failed to delete key") + } + return nil +} + +// walk all items in the database using a callback function. +func (r *refIDStore) walk(fn func(digest.Digest, string) error) error { + + all := map[digest.Digest]string{} + + err := r.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(r.bucket)) + err := b.ForEach(func(k, v []byte) error { + d := digest.Digest(string(k)) + all[d] = string(v) + return nil + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to iterate ref IDs") + } + + // The callback needs to be invoked outside of the BoltDB View + // transaction. Update is an option, but it's much slower. ForEach cannot + // modify results inside the loop. + for d, id := range all { + err = fn(d, id) + if err != nil { + return err + } + } + + return nil +}