From 1c604dca459db088ff55a8f207549af68addadd5 Mon Sep 17 00:00:00 2001 From: tfrench Date: Tue, 12 Aug 2025 21:12:54 +0000 Subject: [PATCH] feat: create grpcZstd blob access --- .../configuration/ac_blob_access_creator.go | 2 +- .../configuration/cas_blob_access_creator.go | 8 +- .../configuration/fsac_blob_access_creator.go | 2 +- .../configuration/icas_blob_access_creator.go | 2 +- .../configuration/iscc_blob_access_creator.go | 2 +- pkg/blobstore/grpcclients/BUILD.bazel | 2 + pkg/blobstore/grpcclients/cas_blob_access.go | 224 +++++++++- .../grpcclients/cas_blob_access_test.go | 400 +++++++++++++++++- .../configuration/blobstore/blobstore.pb.go | 175 +++++--- .../configuration/blobstore/blobstore.proto | 12 +- 10 files changed, 752 insertions(+), 77 deletions(-) diff --git a/pkg/blobstore/configuration/ac_blob_access_creator.go b/pkg/blobstore/configuration/ac_blob_access_creator.go index 08bc89044..ee61250af 100644 --- a/pkg/blobstore/configuration/ac_blob_access_creator.go +++ b/pkg/blobstore/configuration/ac_blob_access_creator.go @@ -106,7 +106,7 @@ func (bac *acBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gro DigestKeyFormat: base.DigestKeyFormat.Combine(bac.contentAddressableStorage.DigestKeyFormat), }, "completeness_checking", nil case *pb.BlobAccessConfiguration_Grpc: - client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc, terminationGroup) + client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc.Client, terminationGroup) if err != nil { return BlobAccessInfo{}, "", err } diff --git a/pkg/blobstore/configuration/cas_blob_access_creator.go b/pkg/blobstore/configuration/cas_blob_access_creator.go index 8085e20c9..512044079 100644 --- a/pkg/blobstore/configuration/cas_blob_access_creator.go +++ b/pkg/blobstore/configuration/cas_blob_access_creator.go @@ -89,14 +89,18 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gr DigestKeyFormat: base.DigestKeyFormat, }, "existence_caching", nil case *pb.BlobAccessConfiguration_Grpc: - client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc, terminationGroup) + client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc.Client, terminationGroup) if err != nil { return BlobAccessInfo{}, "", err } + compressionThresholdBytes := int64(0) + if backend.Grpc.EnableZstdCompression { + compressionThresholdBytes = 100 + } // TODO: Should we provide a configuration option, so // that digest.KeyWithoutInstance can be used? return BlobAccessInfo{ - BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 65536), + BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 65536, compressionThresholdBytes), DigestKeyFormat: digest.KeyWithInstance, }, "grpc", nil case *pb.BlobAccessConfiguration_ReferenceExpanding: diff --git a/pkg/blobstore/configuration/fsac_blob_access_creator.go b/pkg/blobstore/configuration/fsac_blob_access_creator.go index 85e4eafde..5374288d8 100644 --- a/pkg/blobstore/configuration/fsac_blob_access_creator.go +++ b/pkg/blobstore/configuration/fsac_blob_access_creator.go @@ -44,7 +44,7 @@ func (fsacBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provi func (bac *fsacBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Group, configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) { switch backend := configuration.Backend.(type) { case *pb.BlobAccessConfiguration_Grpc: - client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc, terminationGroup) + client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc.Client, terminationGroup) if err != nil { return BlobAccessInfo{}, "", err } diff --git a/pkg/blobstore/configuration/icas_blob_access_creator.go b/pkg/blobstore/configuration/icas_blob_access_creator.go index defe1c684..559a6f602 100644 --- a/pkg/blobstore/configuration/icas_blob_access_creator.go +++ b/pkg/blobstore/configuration/icas_blob_access_creator.go @@ -40,7 +40,7 @@ func (icasBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provi func (bac *icasBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Group, configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) { switch backend := configuration.Backend.(type) { case *pb.BlobAccessConfiguration_Grpc: - client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc, terminationGroup) + client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc.Client, terminationGroup) if err != nil { return BlobAccessInfo{}, "", err } diff --git a/pkg/blobstore/configuration/iscc_blob_access_creator.go b/pkg/blobstore/configuration/iscc_blob_access_creator.go index 7d0bcfbbc..59dd008a5 100644 --- a/pkg/blobstore/configuration/iscc_blob_access_creator.go +++ b/pkg/blobstore/configuration/iscc_blob_access_creator.go @@ -44,7 +44,7 @@ func (isccBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Provi func (bac *isccBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Group, configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) { switch backend := configuration.Backend.(type) { case *pb.BlobAccessConfiguration_Grpc: - client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc, terminationGroup) + client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc.Client, terminationGroup) if err != nil { return BlobAccessInfo{}, "", err } diff --git a/pkg/blobstore/grpcclients/BUILD.bazel b/pkg/blobstore/grpcclients/BUILD.bazel index cbe5b5646..bb43132e8 100644 --- a/pkg/blobstore/grpcclients/BUILD.bazel +++ b/pkg/blobstore/grpcclients/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/util", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_google_uuid//:uuid", + "@com_github_klauspost_compress//zstd", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", @@ -43,6 +44,7 @@ go_test( "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@bazel_remote_apis//build/bazel/semver:semver_go_proto", "@com_github_google_uuid//:uuid", + "@com_github_klauspost_compress//zstd", "@com_github_stretchr_testify//require", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", "@org_golang_google_grpc//:grpc", diff --git a/pkg/blobstore/grpcclients/cas_blob_access.go b/pkg/blobstore/grpcclients/cas_blob_access.go index 983b87bd8..a3129d870 100644 --- a/pkg/blobstore/grpcclients/cas_blob_access.go +++ b/pkg/blobstore/grpcclients/cas_blob_access.go @@ -2,7 +2,10 @@ package grpcclients import ( "context" + "errors" "io" + "slices" + "sync" remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buildbarn/bb-storage/pkg/blobstore" @@ -11,10 +14,13 @@ import ( "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/util" "github.com/google/uuid" + "github.com/klauspost/compress/zstd" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) type casBlobAccess struct { @@ -23,6 +29,10 @@ type casBlobAccess struct { capabilitiesClient remoteexecution.CapabilitiesClient uuidGenerator util.UUIDGenerator readChunkSize int + compressionThresholdBytes int64 + supportedCompressors []remoteexecution.Compressor_Value + supportedCompressorsMutex sync.RWMutex + capabilitiesOnce sync.Once } // NewCASBlobAccess creates a BlobAccess handle that relays any requests @@ -30,13 +40,18 @@ type casBlobAccess struct { // remoteexecution.ContentAddressableStorage services. Those are the // services that Bazel uses to access blobs stored in the Content // Addressable Storage. -func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int) blobstore.BlobAccess { +// +// If compressionThresholdBytes is > 0, the client will attempt to use +// ZSTD compression for blobs larger than this threshold. The server's +// supported compressors will be checked via GetCapabilities(). +func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int, compressionThresholdBytes int64) blobstore.BlobAccess { return &casBlobAccess{ byteStreamClient: bytestream.NewByteStreamClient(client), contentAddressableStorageClient: remoteexecution.NewContentAddressableStorageClient(client), capabilitiesClient: remoteexecution.NewCapabilitiesClient(client), uuidGenerator: uuidGenerator, readChunkSize: readChunkSize, + compressionThresholdBytes: compressionThresholdBytes, } } @@ -62,11 +77,137 @@ func (r *byteStreamChunkReader) Close() { } } +type zstdByteStreamChunkReader struct { + client bytestream.ByteStream_ReadClient + cancel context.CancelFunc + zstdReader io.ReadCloser + readChunkSize int + wg sync.WaitGroup +} + +func (r *zstdByteStreamChunkReader) Read() ([]byte, error) { + if r.zstdReader == nil { + pr, pw := io.Pipe() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer pw.Close() + for { + chunk, err := r.client.Recv() + if err != nil { + if err != io.EOF { + pw.CloseWithError(err) + } + return + } + if _, writeErr := pw.Write(chunk.Data); writeErr != nil { + pw.CloseWithError(writeErr) + return + } + } + }() + + var err error + r.zstdReader, err = util.NewZstdReadCloser(pr, zstd.WithDecoderConcurrency(1)) + if err != nil { + pr.CloseWithError(err) + return nil, err + } + } + + buf := make([]byte, r.readChunkSize) + n, err := r.zstdReader.Read(buf) + if n > 0 { + if err != nil && err != io.EOF { + err = nil + } + return buf[:n], err + } + return nil, err +} + +func (r *zstdByteStreamChunkReader) Close() { + if r.zstdReader != nil { + r.zstdReader.Close() + } + r.cancel() + + // Drain the gRPC stream. + for { + if _, err := r.client.Recv(); err != nil { + break + } + } + r.wg.Wait() +} + +type zstdByteStreamWriter struct { + client bytestream.ByteStream_WriteClient + resourceName string + writeOffset int64 + cancel context.CancelFunc +} + +func (w *zstdByteStreamWriter) Write(p []byte) (int, error) { + if err := w.client.Send(&bytestream.WriteRequest{ + ResourceName: w.resourceName, + WriteOffset: w.writeOffset, + Data: p, + }); err != nil { + return 0, err + } + w.writeOffset += int64(len(p)) + w.resourceName = "" + return len(p), nil +} + +func (w *zstdByteStreamWriter) Close() error { + if err := w.client.Send(&bytestream.WriteRequest{ + ResourceName: w.resourceName, + WriteOffset: w.writeOffset, + FinishWrite: true, + }); err != nil { + w.cancel() + w.client.CloseAndRecv() + return err + } + _, err := w.client.CloseAndRecv() + w.cancel() + return err +} + const resourceNameHeader = "build.bazel.remote.execution.v2.resource-name" +// shouldUseCompression checks if compression should be used for a blob of the given size. +// It also ensures GetCapabilities has been called to negotiate compression support. +func (ba *casBlobAccess) shouldUseCompression(ctx context.Context, digest digest.Digest) bool { + if ba.compressionThresholdBytes <= 0 || digest.GetSizeBytes() < ba.compressionThresholdBytes { + return false + } + + // If GetCapabilities fails, fallback to no compression. + ba.capabilitiesOnce.Do(func() { + ba.GetCapabilities(ctx, digest.GetDigestFunction().GetInstanceName()) + }) + + ba.supportedCompressorsMutex.RLock() + supportedCompressors := ba.supportedCompressors + ba.supportedCompressorsMutex.RUnlock() + + return slices.Contains(supportedCompressors, remoteexecution.Compressor_ZSTD) +} + func (ba *casBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer { + useCompression := ba.shouldUseCompression(ctx, digest) + + compressor := remoteexecution.Compressor_IDENTITY + if useCompression { + compressor = remoteexecution.Compressor_ZSTD + } + ctxWithCancel, cancel := context.WithCancel(ctx) - resourceName := digest.GetByteStreamReadPath(remoteexecution.Compressor_IDENTITY) + resourceName := digest.GetByteStreamReadPath(compressor) client, err := ba.byteStreamClient.Read( metadata.AppendToOutgoingContext(ctxWithCancel, resourceNameHeader, resourceName), &bytestream.ReadRequest{ @@ -77,6 +218,15 @@ func (ba *casBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.B cancel() return buffer.NewBufferFromError(err) } + + if useCompression { + return buffer.NewCASBufferFromChunkReader(digest, &zstdByteStreamChunkReader{ + client: client, + cancel: cancel, + readChunkSize: ba.readChunkSize, + }, buffer.BackendProvided(buffer.Irreparable(digest))) + } + return buffer.NewCASBufferFromChunkReader(digest, &byteStreamChunkReader{ client: client, cancel: cancel, @@ -89,19 +239,65 @@ func (ba *casBlobAccess) GetFromComposite(ctx context.Context, parentDigest, chi } func (ba *casBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { - r := b.ToChunkReader(0, ba.readChunkSize) - defer r.Close() + useCompression := ba.shouldUseCompression(ctx, digest) + + compressor := remoteexecution.Compressor_IDENTITY + if useCompression { + compressor = remoteexecution.Compressor_ZSTD + } ctxWithCancel, cancel := context.WithCancel(ctx) - resourceName := digest.GetByteStreamWritePath(uuid.Must(ba.uuidGenerator()), remoteexecution.Compressor_IDENTITY) + resourceName := digest.GetByteStreamWritePath(uuid.Must(ba.uuidGenerator()), compressor) client, err := ba.byteStreamClient.Write( metadata.AppendToOutgoingContext(ctxWithCancel, resourceNameHeader, resourceName), ) if err != nil { cancel() + b.Discard() return err } + if useCompression { + byteStreamWriter := &zstdByteStreamWriter{ + client: client, + resourceName: resourceName, + writeOffset: 0, + cancel: cancel, + } + + zstdWriter, err := zstd.NewWriter(byteStreamWriter, zstd.WithEncoderConcurrency(1)) + if err != nil { + cancel() + if _, closeErr := client.CloseAndRecv(); closeErr != nil { + return status.Errorf(codes.Internal, "Failed to close client: %v and create zstd writer: %v", closeErr, err) + } + return status.Errorf(codes.Internal, "Failed to create zstd writer: %v", err) + } + + if err := b.IntoWriter(zstdWriter); err != nil { + if zstdCloseErr := zstdWriter.Close(); zstdCloseErr != nil { + err = errors.Join(err, zstdCloseErr) + } + if closeErr := byteStreamWriter.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + return err + } + + if err := zstdWriter.Close(); err != nil { + if closeErr := byteStreamWriter.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + return err + } + + return byteStreamWriter.Close() + } + + // Non-compressed path + r := b.ToChunkReader(0, ba.readChunkSize) + defer r.Close() + writeOffset := int64(0) for { if data, err := r.Read(); err == nil { @@ -140,6 +336,10 @@ func (ba *casBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer } func (ba *casBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) { + return findMissingBlobsInternal(ctx, digests, ba.contentAddressableStorageClient) +} + +func findMissingBlobsInternal(ctx context.Context, digests digest.Set, cas remoteexecution.ContentAddressableStorageClient) (digest.Set, error) { // Partition all digests by digest function, as the // FindMissingBlobs() RPC can only process digests for a single // instance name and digest function. @@ -157,7 +357,7 @@ func (ba *casBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (d BlobDigests: blobDigests, DigestFunction: digestFunction.GetEnumValue(), } - response, err := ba.contentAddressableStorageClient.FindMissingBlobs(ctx, &request) + response, err := cas.FindMissingBlobs(ctx, &request) if err != nil { return digest.EmptySet, err } @@ -180,11 +380,17 @@ func (ba *casBlobAccess) GetCapabilities(ctx context.Context, instanceName diges return nil, err } + cacheCapabilities := serverCapabilities.CacheCapabilities + + // Store supported compressors for compression negotiation + ba.supportedCompressorsMutex.Lock() + ba.supportedCompressors = cacheCapabilities.SupportedCompressors + ba.supportedCompressorsMutex.Unlock() + // Only return fields that pertain to the Content Addressable // Storage. Don't set 'max_batch_total_size_bytes', as we don't - // issue batch operations. The same holds for fields related to - // compression support. - cacheCapabilities := serverCapabilities.CacheCapabilities + // issue batch operations. Don't propagate 'supported_compressors' + // as it would be merged with bb_storage's configuration. return &remoteexecution.ServerCapabilities{ CacheCapabilities: &remoteexecution.CacheCapabilities{ DigestFunctions: digest.RemoveUnsupportedDigestFunctions(cacheCapabilities.DigestFunctions), diff --git a/pkg/blobstore/grpcclients/cas_blob_access_test.go b/pkg/blobstore/grpcclients/cas_blob_access_test.go index e3ab992f7..e2b5027c5 100644 --- a/pkg/blobstore/grpcclients/cas_blob_access_test.go +++ b/pkg/blobstore/grpcclients/cas_blob_access_test.go @@ -14,6 +14,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/testutil" "github.com/buildbarn/bb-storage/pkg/util" "github.com/google/uuid" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/bytestream" @@ -30,7 +31,7 @@ func TestCASBlobAccessPut(t *testing.T) { client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, 0) blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) uuid := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) @@ -163,12 +164,109 @@ func TestCASBlobAccessPut(t *testing.T) { }) } +func TestCASBlobAccessGet(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, 0) + + t.Run("Success", func(t *testing.T) { + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/blobs/8b1a9953c4611296a827abf8c47804d7/5", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.ReadResponse) + resp.Data = []byte("Hello") + return nil + }) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, blobDigest) + data, err := buffer.ToByteSlice(1000) + require.NoError(t, err) + require.Equal(t, []byte("Hello"), data) + }) + + t.Run("SuccessLargeBlob", func(t *testing.T) { + expectedData := make([]byte, 1000) + for i := range expectedData { + expectedData[i] = byte('A' + (i % 26)) + } + largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/blobs/1411ffd5854fa029dc4d231aa89311eb/1000", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.ReadResponse) + resp.Data = expectedData + return nil + }) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, largeDigest) + data, err := buffer.ToByteSlice(1500) + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) + + t.Run("InitialFailure", func(t *testing.T) { + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(nil, status.Error(codes.Internal, "Failed to create outgoing connection")) + + buffer := blobAccess.Get(ctx, blobDigest) + _, err := buffer.ToByteSlice(1000) + testutil.RequireEqualStatus(t, + status.Error(codes.Internal, "Failed to create outgoing connection"), + err) + }) + + t.Run("ReceiveFailure", func(t *testing.T) { + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/blobs/8b1a9953c4611296a827abf8c47804d7/5", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(status.Error(codes.Internal, "Lost connection to server")).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, blobDigest) + _, err := buffer.ToByteSlice(1000) + testutil.RequireEqualStatus(t, + status.Error(codes.Internal, "Lost connection to server"), + err) + }) +} + func TestCASBlobAccessGetCapabilities(t *testing.T) { ctrl, ctx := gomock.WithContext(context.Background(), t) client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, 0) t.Run("BackendFailure", func(t *testing.T) { client.EXPECT().Invoke( @@ -263,3 +361,301 @@ func TestCASBlobAccessGetCapabilities(t *testing.T) { }, serverCapabilities) }) } + +func TestCASBlobAccessPutWithCompression(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, 100) + + client.EXPECT().Invoke( + gomock.Any(), + "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities", + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).DoAndReturn(func(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { + proto.Merge(reply.(proto.Message), &remoteexecution.ServerCapabilities{ + CacheCapabilities: &remoteexecution.CacheCapabilities{ + DigestFunctions: digest.SupportedDigestFunctions, + SupportedCompressors: []remoteexecution.Compressor_Value{ + remoteexecution.Compressor_ZSTD, + }, + }, + }) + return nil + }).AnyTimes() + + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + testUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) + + t.Run("SuccessWithCompression", func(t *testing.T) { + largeData := make([]byte, 1000) + for i := range largeData { + largeData[i] = byte('A' + (i % 26)) + } + + clientStream := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(testUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream, nil) + r := mock.NewMockFileReader(ctrl) + r.EXPECT().ReadAt(gomock.Len(1000), int64(0)).DoAndReturn(func(p []byte, off int64) (int, error) { + copy(p, largeData) + return 1000, nil + }) + r.EXPECT().Close() + + var compressedData []byte + clientStream.EXPECT().SendMsg(gomock.Any()).DoAndReturn(func(msg interface{}) error { + req := msg.(*bytestream.WriteRequest) + require.Equal(t, "hello/uploads/7d659e5f-0e4b-48f0-ad9f-3489db6e103b/compressed-blobs/zstd/1411ffd5854fa029dc4d231aa89311eb/1000", req.ResourceName) + require.Equal(t, int64(0), req.WriteOffset) + require.NotEmpty(t, req.Data) + require.Less(t, len(req.Data), 1000, "Compressed data should be smaller than original") + compressedData = append(compressedData, req.Data...) + return nil + }) + clientStream.EXPECT().SendMsg(gomock.Any()).DoAndReturn(func(msg interface{}) error { + req := msg.(*bytestream.WriteRequest) + require.True(t, req.FinishWrite) + require.Equal(t, int64(len(compressedData)), req.WriteOffset) + return nil + }) + clientStream.EXPECT().CloseSend().Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.WriteResponse) + resp.CommittedSize = int64(len(compressedData)) + return nil + }) + + err := blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromReaderAt(r, 1000)) + require.NoError(t, err) + + decoder, err := zstd.NewReader(nil) + require.NoError(t, err) + defer decoder.Close() + decompressedData, err := decoder.DecodeAll(compressedData, nil) + require.NoError(t, err) + require.Equal(t, largeData, decompressedData, "Decompressed data should match original") + }) + + t.Run("SuccessWithCompressionMultipleChunks", func(t *testing.T) { + largeData := make([]byte, 100000) + for i := range largeData { + largeData[i] = byte('A' + (i % 26)) + } + largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "cd0aaa69c1c834ae03d23951bb3eaddc", 100000) + largeUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103d")) + + clientStream := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(largeUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream, nil) + r := mock.NewMockFileReader(ctrl) + r.EXPECT().ReadAt(gomock.Any(), gomock.Any()).DoAndReturn(func(p []byte, off int64) (int, error) { + n := copy(p, largeData[off:]) + return n, nil + }).AnyTimes() + r.EXPECT().Close() + + var compressedData []byte + var messageCount int + clientStream.EXPECT().SendMsg(gomock.Any()).DoAndReturn(func(msg interface{}) error { + req := msg.(*bytestream.WriteRequest) + if messageCount == 0 { + require.Equal(t, "hello/uploads/7d659e5f-0e4b-48f0-ad9f-3489db6e103d/compressed-blobs/zstd/cd0aaa69c1c834ae03d23951bb3eaddc/100000", req.ResourceName) + } + if req.FinishWrite { + require.Equal(t, int64(len(compressedData)), req.WriteOffset) + } else { + require.Equal(t, int64(len(compressedData)), req.WriteOffset) + compressedData = append(compressedData, req.Data...) + } + messageCount++ + return nil + }).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.WriteResponse) + resp.CommittedSize = int64(len(compressedData)) + return nil + }) + + err := blobAccess.Put(ctx, largeDigest, buffer.NewValidatedBufferFromReaderAt(r, 100000)) + require.NoError(t, err) + require.GreaterOrEqual(t, messageCount, 2) + + decoder, err := zstd.NewReader(nil) + require.NoError(t, err) + defer decoder.Close() + decompressedData, err := decoder.DecodeAll(compressedData, nil) + require.NoError(t, err) + require.Equal(t, largeData, decompressedData, "Decompressed data should match original") + }) + + t.Run("SmallBlobNoCompression", func(t *testing.T) { + smallDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + smallUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103c")) + + clientStream := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(smallUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream, nil) + r := mock.NewMockFileReader(ctrl) + r.EXPECT().ReadAt(gomock.Len(5), int64(0)).DoAndReturn(func(p []byte, off int64) (int, error) { + copy(p, "Hello") + return 5, nil + }) + r.EXPECT().Close() + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.WriteRequest{ + ResourceName: "hello/uploads/7d659e5f-0e4b-48f0-ad9f-3489db6e103c/blobs/8b1a9953c4611296a827abf8c47804d7/5", + WriteOffset: 0, + Data: []byte("Hello"), + })) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.WriteRequest{ + WriteOffset: 5, + FinishWrite: true, + })) + clientStream.EXPECT().CloseSend().Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.WriteResponse) + resp.CommittedSize = 5 + return nil + }) + + err := blobAccess.Put(ctx, smallDigest, buffer.NewValidatedBufferFromReaderAt(r, 5)) + require.NoError(t, err) + }) +} + +func TestCASBlobAccessGetWithCompression(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 100, 100) + + client.EXPECT().Invoke( + gomock.Any(), + "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities", + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).DoAndReturn(func(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { + proto.Merge(reply.(proto.Message), &remoteexecution.ServerCapabilities{ + CacheCapabilities: &remoteexecution.CacheCapabilities{ + DigestFunctions: digest.SupportedDigestFunctions, + SupportedCompressors: []remoteexecution.Compressor_Value{ + remoteexecution.Compressor_ZSTD, + }, + }, + }) + return nil + }).AnyTimes() + + t.Run("SuccessWithCompression", func(t *testing.T) { + expectedData := make([]byte, 1000) + for i := range expectedData { + expectedData[i] = byte('A' + (i % 26)) + } + largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/compressed-blobs/zstd/1411ffd5854fa029dc4d231aa89311eb/1000", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + require.NoError(t, err) + compressedData := encoder.EncodeAll(expectedData, nil) + require.Less(t, len(compressedData), len(expectedData), "Compressed data should be smaller than original") + + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.ReadResponse) + resp.Data = compressedData + return nil + }) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, largeDigest) + data, err := buffer.ToByteSlice(1500) + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) + + t.Run("SuccessWithCompressionMultipleChunks", func(t *testing.T) { + expectedData := make([]byte, 100000) + for i := range expectedData { + expectedData[i] = byte('A' + (i % 26)) + } + largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "cd0aaa69c1c834ae03d23951bb3eaddc", 100000) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/compressed-blobs/zstd/cd0aaa69c1c834ae03d23951bb3eaddc/100000", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + require.NoError(t, err) + compressedData := encoder.EncodeAll(expectedData, nil) + + chunkSize := len(compressedData) / 3 + chunks := [][]byte{ + compressedData[:chunkSize], + compressedData[chunkSize : 2*chunkSize], + compressedData[2*chunkSize:], + } + chunkIndex := 0 + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + if chunkIndex >= len(chunks) { + return io.EOF + } + resp := m.(*bytestream.ReadResponse) + resp.Data = chunks[chunkIndex] + chunkIndex++ + return nil + }).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, largeDigest) + data, err := buffer.ToByteSlice(150000) + require.NoError(t, err) + require.Equal(t, expectedData, data) + }) + + t.Run("SmallBlobNoCompression", func(t *testing.T) { + smallDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(testutil.EqProto(t, &bytestream.ReadRequest{ + ResourceName: "hello/blobs/8b1a9953c4611296a827abf8c47804d7/5", + ReadOffset: 0, + ReadLimit: 0, + })).Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.ReadResponse) + resp.Data = []byte("Hello") + return nil + }) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buffer := blobAccess.Get(ctx, smallDigest) + data, err := buffer.ToByteSlice(1000) + require.NoError(t, err) + require.Equal(t, []byte("Hello"), data) + }) +} diff --git a/pkg/proto/configuration/blobstore/blobstore.pb.go b/pkg/proto/configuration/blobstore/blobstore.pb.go index 88f732d27..f7b7ada4c 100644 --- a/pkg/proto/configuration/blobstore/blobstore.pb.go +++ b/pkg/proto/configuration/blobstore/blobstore.pb.go @@ -157,7 +157,7 @@ func (x *BlobAccessConfiguration) GetReadCaching() *ReadCachingBlobAccessConfigu return nil } -func (x *BlobAccessConfiguration) GetGrpc() *grpc.ClientConfiguration { +func (x *BlobAccessConfiguration) GetGrpc() *GrpcBlobAccessConfiguration { if x != nil { if x, ok := x.Backend.(*BlobAccessConfiguration_Grpc); ok { return x.Grpc @@ -328,7 +328,7 @@ type BlobAccessConfiguration_ReadCaching struct { } type BlobAccessConfiguration_Grpc struct { - Grpc *grpc.ClientConfiguration `protobuf:"bytes,7,opt,name=grpc,proto3,oneof"` + Grpc *GrpcBlobAccessConfiguration `protobuf:"bytes,7,opt,name=grpc,proto3,oneof"` } type BlobAccessConfiguration_Error struct { @@ -1682,6 +1682,58 @@ func (x *DeadlineEnforcingBlobAccess) GetBackend() *BlobAccessConfiguration { return nil } +type GrpcBlobAccessConfiguration struct { + state protoimpl.MessageState `protogen:"open.v1"` + Client *grpc.ClientConfiguration `protobuf:"bytes,1,opt,name=client,proto3" json:"client,omitempty"` + EnableZstdCompression bool `protobuf:"varint,2,opt,name=enable_zstd_compression,json=enableZstdCompression,proto3" json:"enable_zstd_compression,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GrpcBlobAccessConfiguration) Reset() { + *x = GrpcBlobAccessConfiguration{} + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[20] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GrpcBlobAccessConfiguration) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GrpcBlobAccessConfiguration) ProtoMessage() {} + +func (x *GrpcBlobAccessConfiguration) ProtoReflect() protoreflect.Message { + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[20] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GrpcBlobAccessConfiguration.ProtoReflect.Descriptor instead. +func (*GrpcBlobAccessConfiguration) Descriptor() ([]byte, []int) { + return file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_rawDescGZIP(), []int{20} +} + +func (x *GrpcBlobAccessConfiguration) GetClient() *grpc.ClientConfiguration { + if x != nil { + return x.Client + } + return nil +} + +func (x *GrpcBlobAccessConfiguration) GetEnableZstdCompression() bool { + if x != nil { + return x.EnableZstdCompression + } + return false +} + type ShardingBlobAccessConfiguration_Shard struct { state protoimpl.MessageState `protogen:"open.v1"` Backend *BlobAccessConfiguration `protobuf:"bytes,1,opt,name=backend,proto3" json:"backend,omitempty"` @@ -1692,7 +1744,7 @@ type ShardingBlobAccessConfiguration_Shard struct { func (x *ShardingBlobAccessConfiguration_Shard) Reset() { *x = ShardingBlobAccessConfiguration_Shard{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[20] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1704,7 +1756,7 @@ func (x *ShardingBlobAccessConfiguration_Shard) String() string { func (*ShardingBlobAccessConfiguration_Shard) ProtoMessage() {} func (x *ShardingBlobAccessConfiguration_Shard) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[20] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1744,7 +1796,7 @@ type ShardingBlobAccessConfiguration_Legacy struct { func (x *ShardingBlobAccessConfiguration_Legacy) Reset() { *x = ShardingBlobAccessConfiguration_Legacy{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[21] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1756,7 +1808,7 @@ func (x *ShardingBlobAccessConfiguration_Legacy) String() string { func (*ShardingBlobAccessConfiguration_Legacy) ProtoMessage() {} func (x *ShardingBlobAccessConfiguration_Legacy) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[21] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1795,7 +1847,7 @@ type LocalBlobAccessConfiguration_KeyLocationMapInMemory struct { func (x *LocalBlobAccessConfiguration_KeyLocationMapInMemory) Reset() { *x = LocalBlobAccessConfiguration_KeyLocationMapInMemory{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[23] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1807,7 +1859,7 @@ func (x *LocalBlobAccessConfiguration_KeyLocationMapInMemory) String() string { func (*LocalBlobAccessConfiguration_KeyLocationMapInMemory) ProtoMessage() {} func (x *LocalBlobAccessConfiguration_KeyLocationMapInMemory) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[23] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1839,7 +1891,7 @@ type LocalBlobAccessConfiguration_BlocksInMemory struct { func (x *LocalBlobAccessConfiguration_BlocksInMemory) Reset() { *x = LocalBlobAccessConfiguration_BlocksInMemory{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[24] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1851,7 +1903,7 @@ func (x *LocalBlobAccessConfiguration_BlocksInMemory) String() string { func (*LocalBlobAccessConfiguration_BlocksInMemory) ProtoMessage() {} func (x *LocalBlobAccessConfiguration_BlocksInMemory) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[24] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1885,7 +1937,7 @@ type LocalBlobAccessConfiguration_BlocksOnBlockDevice struct { func (x *LocalBlobAccessConfiguration_BlocksOnBlockDevice) Reset() { *x = LocalBlobAccessConfiguration_BlocksOnBlockDevice{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[25] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1897,7 +1949,7 @@ func (x *LocalBlobAccessConfiguration_BlocksOnBlockDevice) String() string { func (*LocalBlobAccessConfiguration_BlocksOnBlockDevice) ProtoMessage() {} func (x *LocalBlobAccessConfiguration_BlocksOnBlockDevice) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[25] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[26] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1944,7 +1996,7 @@ type LocalBlobAccessConfiguration_Persistent struct { func (x *LocalBlobAccessConfiguration_Persistent) Reset() { *x = LocalBlobAccessConfiguration_Persistent{} - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[26] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1956,7 +2008,7 @@ func (x *LocalBlobAccessConfiguration_Persistent) String() string { func (*LocalBlobAccessConfiguration_Persistent) ProtoMessage() {} func (x *LocalBlobAccessConfiguration_Persistent) ProtoReflect() protoreflect.Message { - mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[26] + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1993,10 +2045,10 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blo "Qgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore/blobstore.proto\x12!buildbarn.configuration.blobstore\x1aUgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blockdevice/blockdevice.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/cloud/aws/aws.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/cloud/gcp/gcp.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/digest/digest.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto\x1aPgithub.com/buildbarn/bb-storage/pkg/proto/configuration/http/client/client.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x17google/rpc/status.proto\"\xf3\x01\n" + "\x16BlobstoreConfiguration\x12z\n" + "\x1bcontent_addressable_storage\x18\x01 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\x19contentAddressableStorage\x12]\n" + - "\faction_cache\x18\x02 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\vactionCache\"\xd6\x0f\n" + + "\faction_cache\x18\x02 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\vactionCache\"\xe3\x0f\n" + "\x17BlobAccessConfiguration\x12j\n" + - "\fread_caching\x18\x04 \x01(\v2E.buildbarn.configuration.blobstore.ReadCachingBlobAccessConfigurationH\x00R\vreadCaching\x12G\n" + - "\x04grpc\x18\a \x01(\v21.buildbarn.configuration.grpc.ClientConfigurationH\x00R\x04grpc\x12*\n" + + "\fread_caching\x18\x04 \x01(\v2E.buildbarn.configuration.blobstore.ReadCachingBlobAccessConfigurationH\x00R\vreadCaching\x12T\n" + + "\x04grpc\x18\a \x01(\v2>.buildbarn.configuration.blobstore.GrpcBlobAccessConfigurationH\x00R\x04grpc\x12*\n" + "\x05error\x18\b \x01(\v2\x12.google.rpc.StatusH\x00R\x05error\x12`\n" + "\bsharding\x18\t \x01(\v2B.buildbarn.configuration.blobstore.ShardingBlobAccessConfigurationH\x00R\bsharding\x12`\n" + "\bmirrored\x18\x0e \x01(\v2B.buildbarn.configuration.blobstore.MirroredBlobAccessConfigurationH\x00R\bmirrored\x12W\n" + @@ -2137,7 +2189,10 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blo "\x05value\x18\x02 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\x05value:\x028\x01\"\xa8\x01\n" + "\x1bDeadlineEnforcingBlobAccess\x123\n" + "\atimeout\x18\x01 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x12T\n" + - "\abackend\x18\x02 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\abackendBCZAgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blobstoreb\x06proto3" + "\abackend\x18\x02 \x01(\v2:.buildbarn.configuration.blobstore.BlobAccessConfigurationR\abackend\"\xa0\x01\n" + + "\x1bGrpcBlobAccessConfiguration\x12I\n" + + "\x06client\x18\x01 \x01(\v21.buildbarn.configuration.grpc.ClientConfigurationR\x06client\x126\n" + + "\x17enable_zstd_compression\x18\x02 \x01(\bR\x15enableZstdCompressionBCZAgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blobstoreb\x06proto3" var ( file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_rawDescOnce sync.Once @@ -2151,7 +2206,7 @@ func file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blob return file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_rawDescData } -var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes = make([]protoimpl.MessageInfo, 29) +var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_goTypes = []any{ (*BlobstoreConfiguration)(nil), // 0: buildbarn.configuration.blobstore.BlobstoreConfiguration (*BlobAccessConfiguration)(nil), // 1: buildbarn.configuration.blobstore.BlobAccessConfiguration @@ -2173,16 +2228,16 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobs (*ZIPBlobAccessConfiguration)(nil), // 17: buildbarn.configuration.blobstore.ZIPBlobAccessConfiguration (*WithLabelsBlobAccessConfiguration)(nil), // 18: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration (*DeadlineEnforcingBlobAccess)(nil), // 19: buildbarn.configuration.blobstore.DeadlineEnforcingBlobAccess - (*ShardingBlobAccessConfiguration_Shard)(nil), // 20: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard - (*ShardingBlobAccessConfiguration_Legacy)(nil), // 21: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Legacy - nil, // 22: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry - (*LocalBlobAccessConfiguration_KeyLocationMapInMemory)(nil), // 23: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.KeyLocationMapInMemory - (*LocalBlobAccessConfiguration_BlocksInMemory)(nil), // 24: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksInMemory - (*LocalBlobAccessConfiguration_BlocksOnBlockDevice)(nil), // 25: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice - (*LocalBlobAccessConfiguration_Persistent)(nil), // 26: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent - nil, // 27: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry - nil, // 28: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry - (*grpc.ClientConfiguration)(nil), // 29: buildbarn.configuration.grpc.ClientConfiguration + (*GrpcBlobAccessConfiguration)(nil), // 20: buildbarn.configuration.blobstore.GrpcBlobAccessConfiguration + (*ShardingBlobAccessConfiguration_Shard)(nil), // 21: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard + (*ShardingBlobAccessConfiguration_Legacy)(nil), // 22: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Legacy + nil, // 23: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry + (*LocalBlobAccessConfiguration_KeyLocationMapInMemory)(nil), // 24: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.KeyLocationMapInMemory + (*LocalBlobAccessConfiguration_BlocksInMemory)(nil), // 25: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksInMemory + (*LocalBlobAccessConfiguration_BlocksOnBlockDevice)(nil), // 26: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice + (*LocalBlobAccessConfiguration_Persistent)(nil), // 27: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent + nil, // 28: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry + nil, // 29: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry (*status.Status)(nil), // 30: google.rpc.Status (*blockdevice.Configuration)(nil), // 31: buildbarn.configuration.blockdevice.Configuration (*digest.ExistenceCacheConfiguration)(nil), // 32: buildbarn.configuration.digest.ExistenceCacheConfiguration @@ -2190,14 +2245,15 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobs (*client.Configuration)(nil), // 34: buildbarn.configuration.http.client.Configuration (*gcp.ClientOptionsConfiguration)(nil), // 35: buildbarn.configuration.cloud.gcp.ClientOptionsConfiguration (*emptypb.Empty)(nil), // 36: google.protobuf.Empty - (*durationpb.Duration)(nil), // 37: google.protobuf.Duration - (*timestamppb.Timestamp)(nil), // 38: google.protobuf.Timestamp + (*grpc.ClientConfiguration)(nil), // 37: buildbarn.configuration.grpc.ClientConfiguration + (*durationpb.Duration)(nil), // 38: google.protobuf.Duration + (*timestamppb.Timestamp)(nil), // 39: google.protobuf.Timestamp } var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_depIdxs = []int32{ 1, // 0: buildbarn.configuration.blobstore.BlobstoreConfiguration.content_addressable_storage:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 1, // 1: buildbarn.configuration.blobstore.BlobstoreConfiguration.action_cache:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 2, // 2: buildbarn.configuration.blobstore.BlobAccessConfiguration.read_caching:type_name -> buildbarn.configuration.blobstore.ReadCachingBlobAccessConfiguration - 29, // 3: buildbarn.configuration.blobstore.BlobAccessConfiguration.grpc:type_name -> buildbarn.configuration.grpc.ClientConfiguration + 20, // 3: buildbarn.configuration.blobstore.BlobAccessConfiguration.grpc:type_name -> buildbarn.configuration.blobstore.GrpcBlobAccessConfiguration 30, // 4: buildbarn.configuration.blobstore.BlobAccessConfiguration.error:type_name -> google.rpc.Status 3, // 5: buildbarn.configuration.blobstore.BlobAccessConfiguration.sharding:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration 4, // 6: buildbarn.configuration.blobstore.BlobAccessConfiguration.mirrored:type_name -> buildbarn.configuration.blobstore.MirroredBlobAccessConfiguration @@ -2217,17 +2273,17 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobs 1, // 20: buildbarn.configuration.blobstore.ReadCachingBlobAccessConfiguration.slow:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 1, // 21: buildbarn.configuration.blobstore.ReadCachingBlobAccessConfiguration.fast:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 10, // 22: buildbarn.configuration.blobstore.ReadCachingBlobAccessConfiguration.replicator:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration - 22, // 23: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.shards:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry - 21, // 24: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.legacy:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Legacy + 23, // 23: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.shards:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry + 22, // 24: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.legacy:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Legacy 1, // 25: buildbarn.configuration.blobstore.MirroredBlobAccessConfiguration.backend_a:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 1, // 26: buildbarn.configuration.blobstore.MirroredBlobAccessConfiguration.backend_b:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 10, // 27: buildbarn.configuration.blobstore.MirroredBlobAccessConfiguration.replicator_a_to_b:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration 10, // 28: buildbarn.configuration.blobstore.MirroredBlobAccessConfiguration.replicator_b_to_a:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration - 23, // 29: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.key_location_map_in_memory:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.KeyLocationMapInMemory + 24, // 29: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.key_location_map_in_memory:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.KeyLocationMapInMemory 31, // 30: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.key_location_map_on_block_device:type_name -> buildbarn.configuration.blockdevice.Configuration - 24, // 31: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.blocks_in_memory:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksInMemory - 25, // 32: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.blocks_on_block_device:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice - 26, // 33: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.persistent:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent + 25, // 31: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.blocks_in_memory:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksInMemory + 26, // 32: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.blocks_on_block_device:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice + 27, // 33: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.persistent:type_name -> buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent 1, // 34: buildbarn.configuration.blobstore.ExistenceCachingBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 32, // 35: buildbarn.configuration.blobstore.ExistenceCachingBlobAccessConfiguration.existence_cache:type_name -> buildbarn.configuration.digest.ExistenceCacheConfiguration 1, // 36: buildbarn.configuration.blobstore.CompletenessCheckingBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration @@ -2240,7 +2296,7 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobs 35, // 43: buildbarn.configuration.blobstore.ReferenceExpandingBlobAccessConfiguration.gcp_client_options:type_name -> buildbarn.configuration.cloud.gcp.ClientOptionsConfiguration 1, // 44: buildbarn.configuration.blobstore.ReferenceExpandingBlobAccessConfiguration.content_addressable_storage:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 36, // 45: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.local:type_name -> google.protobuf.Empty - 29, // 46: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.remote:type_name -> buildbarn.configuration.grpc.ClientConfiguration + 37, // 46: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.remote:type_name -> buildbarn.configuration.grpc.ClientConfiguration 11, // 47: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.queued:type_name -> buildbarn.configuration.blobstore.QueuedBlobReplicatorConfiguration 36, // 48: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.noop:type_name -> google.protobuf.Empty 10, // 49: buildbarn.configuration.blobstore.BlobReplicatorConfiguration.deduplicating:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration @@ -2248,32 +2304,33 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobs 10, // 51: buildbarn.configuration.blobstore.QueuedBlobReplicatorConfiguration.base:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration 32, // 52: buildbarn.configuration.blobstore.QueuedBlobReplicatorConfiguration.existence_cache:type_name -> buildbarn.configuration.digest.ExistenceCacheConfiguration 10, // 53: buildbarn.configuration.blobstore.ConcurrencyLimitingBlobReplicatorConfiguration.base:type_name -> buildbarn.configuration.blobstore.BlobReplicatorConfiguration - 27, // 54: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.instance_name_prefixes:type_name -> buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry + 28, // 54: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.instance_name_prefixes:type_name -> buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry 1, // 55: buildbarn.configuration.blobstore.DemultiplexedBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 1, // 56: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 37, // 57: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.minimum_validity:type_name -> google.protobuf.Duration - 37, // 58: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.maximum_validity_jitter:type_name -> google.protobuf.Duration - 38, // 59: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.minimum_timestamp:type_name -> google.protobuf.Timestamp + 38, // 57: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.minimum_validity:type_name -> google.protobuf.Duration + 38, // 58: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.maximum_validity_jitter:type_name -> google.protobuf.Duration + 39, // 59: buildbarn.configuration.blobstore.ActionResultExpiringBlobAccessConfiguration.minimum_timestamp:type_name -> google.protobuf.Timestamp 1, // 60: buildbarn.configuration.blobstore.ReadCanaryingBlobAccessConfiguration.source:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration 1, // 61: buildbarn.configuration.blobstore.ReadCanaryingBlobAccessConfiguration.replica:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 37, // 62: buildbarn.configuration.blobstore.ReadCanaryingBlobAccessConfiguration.maximum_cache_duration:type_name -> google.protobuf.Duration + 38, // 62: buildbarn.configuration.blobstore.ReadCanaryingBlobAccessConfiguration.maximum_cache_duration:type_name -> google.protobuf.Duration 32, // 63: buildbarn.configuration.blobstore.ZIPBlobAccessConfiguration.data_integrity_validation_cache:type_name -> buildbarn.configuration.digest.ExistenceCacheConfiguration 1, // 64: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 28, // 65: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.labels:type_name -> buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry - 37, // 66: buildbarn.configuration.blobstore.DeadlineEnforcingBlobAccess.timeout:type_name -> google.protobuf.Duration + 29, // 65: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.labels:type_name -> buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry + 38, // 66: buildbarn.configuration.blobstore.DeadlineEnforcingBlobAccess.timeout:type_name -> google.protobuf.Duration 1, // 67: buildbarn.configuration.blobstore.DeadlineEnforcingBlobAccess.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 1, // 68: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 20, // 69: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry.value:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard - 31, // 70: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice.source:type_name -> buildbarn.configuration.blockdevice.Configuration - 32, // 71: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice.data_integrity_validation_cache:type_name -> buildbarn.configuration.digest.ExistenceCacheConfiguration - 37, // 72: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent.minimum_epoch_interval:type_name -> google.protobuf.Duration - 14, // 73: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry.value:type_name -> buildbarn.configuration.blobstore.DemultiplexedBlobAccessConfiguration - 1, // 74: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry.value:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 75, // [75:75] is the sub-list for method output_type - 75, // [75:75] is the sub-list for method input_type - 75, // [75:75] is the sub-list for extension type_name - 75, // [75:75] is the sub-list for extension extendee - 0, // [0:75] is the sub-list for field type_name + 37, // 68: buildbarn.configuration.blobstore.GrpcBlobAccessConfiguration.client:type_name -> buildbarn.configuration.grpc.ClientConfiguration + 1, // 69: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration + 21, // 70: buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.ShardsEntry.value:type_name -> buildbarn.configuration.blobstore.ShardingBlobAccessConfiguration.Shard + 31, // 71: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice.source:type_name -> buildbarn.configuration.blockdevice.Configuration + 32, // 72: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.BlocksOnBlockDevice.data_integrity_validation_cache:type_name -> buildbarn.configuration.digest.ExistenceCacheConfiguration + 38, // 73: buildbarn.configuration.blobstore.LocalBlobAccessConfiguration.Persistent.minimum_epoch_interval:type_name -> google.protobuf.Duration + 14, // 74: buildbarn.configuration.blobstore.DemultiplexingBlobAccessConfiguration.InstanceNamePrefixesEntry.value:type_name -> buildbarn.configuration.blobstore.DemultiplexedBlobAccessConfiguration + 1, // 75: buildbarn.configuration.blobstore.WithLabelsBlobAccessConfiguration.LabelsEntry.value:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration + 76, // [76:76] is the sub-list for method output_type + 76, // [76:76] is the sub-list for method input_type + 76, // [76:76] is the sub-list for extension type_name + 76, // [76:76] is the sub-list for extension extendee + 0, // [0:76] is the sub-list for field type_name } func init() { @@ -2324,7 +2381,7 @@ func file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blob GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_rawDesc), len(file_github_com_buildbarn_bb_storage_pkg_proto_configuration_blobstore_blobstore_proto_rawDesc)), NumEnums: 0, - NumMessages: 29, + NumMessages: 30, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/proto/configuration/blobstore/blobstore.proto b/pkg/proto/configuration/blobstore/blobstore.proto index a035937dc..d6b2f62d8 100644 --- a/pkg/proto/configuration/blobstore/blobstore.proto +++ b/pkg/proto/configuration/blobstore/blobstore.proto @@ -32,7 +32,7 @@ message BlobAccessConfiguration { // Read objects from/write objects to a GRPC service that // implements the remote execution protocol. - buildbarn.configuration.grpc.ClientConfiguration grpc = 7; + GrpcBlobAccessConfiguration grpc = 7; // Always fail with a fixed error response. google.rpc.Status error = 8; @@ -946,3 +946,13 @@ message DeadlineEnforcingBlobAccess { // The backend to which all operations are delegated. BlobAccessConfiguration backend = 2; } + +message GrpcBlobAccessConfiguration { + // Base GRPC client configuration. + buildbarn.configuration.grpc.ClientConfiguration client = 1; + + // Enable ZSTD compression for the Content Addressable Storage (CAS). + // When enabled, blobs >= 100 bytes will use ZSTD compression if the + // server supports it. Has no effect for other storage types (AC, ICAS, etc.). + bool enable_zstd_compression = 2; +}