Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dapr-proto-ref
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ac5845ff5c5617c4c949c82e4e331dca5dddfc8d
4 changes: 4 additions & 0 deletions .github/workflows/test-on-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ jobs:
run: make modtidy check-diff
- name: Run go mod tidy
run: make modtidy

- name: Run make proto check diff
uses: bufbuild/buf-action@v1
run: make proto-check-diff
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,34 @@ check-diff:
.PHONY: modtidy
modtidy:
go mod tidy

.PHONY: proto
proto:
@command -v buf >/dev/null 2>&1 || { echo "buf is not installed. Install it from https://docs.buf.build/installation"; exit 1; }
@if [ ! -f .dapr-proto-ref ]; then echo "No .dapr-proto-ref file found. Run 'make proto-update' first."; exit 1; fi
@find ./internal/proto -type f -name '*.go' -delete
@COMMIT=$$(cat .dapr-proto-ref | tr -d '\n'); \
buf generate \
--template buf.gen.yaml \
--path dapr/proto/common/v1 \
--path dapr/proto/runtime/v1 \
"https://github.com/dapr/dapr.git#commit=$$COMMIT"

.PHONY: proto-update
proto-update:
@echo "Updating Dapr to latest commit..."
@git ls-remote https://github.com/dapr/dapr.git HEAD | cut -f1 > .dapr-proto-ref
@echo "Updated .dapr-proto-ref to: $$(cat .dapr-proto-ref)"
@$(MAKE) proto
Comment on lines +77 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a git tag or hash passed here ideally


PROTO_PATH := internal/proto
.PHONY: proto-check-diff
proto-check-diff:
@$(MAKE) proto
@# single‐shell if…then…fi block
@if ! git diff --quiet -- $(PROTO_PATH); then \
echo "::error ::Proto files are out of date. Please run 'make proto' and commit the changes."; \
git --no-pager diff -- $(PROTO_PATH); \
exit 1; \
fi

15 changes: 15 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: v2
managed:
enabled: true
override:
- file_option: go_package_prefix
value: github.com/dapr/go-sdk/internal/proto
plugins:
- remote: buf.build/protocolbuffers/go
out: internal/proto
opt:
- paths=source_relative
- remote: buf.build/grpc/go
out: internal/proto
opt:
- paths=source_relative
2 changes: 1 addition & 1 deletion client/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

"google.golang.org/protobuf/types/known/anypb"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/codec"
"github.com/dapr/go-sdk/actor/config"
pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion client/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"errors"
"fmt"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

// InvokeBindingRequest represents binding invocation request.
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"

// used to import codec implements.
_ "github.com/dapr/go-sdk/actor/codec/impl"
Expand Down
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"

commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
commonv1pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/common/v1"
pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion client/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"io"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

type ConfigurationItem struct {
Expand Down
2 changes: 1 addition & 1 deletion client/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

"google.golang.org/protobuf/types/known/anypb"

runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
runtimev1pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

// conversationRequest object - currently unexported as used in a functions option pattern
Expand Down
40 changes: 26 additions & 14 deletions client/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/client/internal/crypto"
commonv1pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/common/v1"
runtimev1pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

// Encrypt data read from a stream, returning a readable stream that receives the encrypted data.
Expand All @@ -48,9 +49,11 @@ func (c *GRPCClient) Encrypt(ctx context.Context, in io.Reader, opts EncryptOpti
}

// Use the context of the stream here.
return c.performCryptoOperation(
stream.Context(), stream,
in, opts,
return performCryptoOperation(
stream.Context(),
stream,
in,
opts,
&runtimev1pb.EncryptRequest{},
&runtimev1pb.EncryptResponse{},
)
Expand All @@ -72,15 +75,24 @@ func (c *GRPCClient) Decrypt(ctx context.Context, in io.Reader, opts DecryptOpti
}

// Use the context of the stream here.
return c.performCryptoOperation(
stream.Context(), stream,
in, opts,
return performCryptoOperation(
stream.Context(),
stream,
in,
opts,
&runtimev1pb.DecryptRequest{},
&runtimev1pb.DecryptResponse{},
)
}

func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.ClientStream, in io.Reader, opts cryptoOperationOpts, reqProto runtimev1pb.CryptoRequests, resProto runtimev1pb.CryptoResponses) (io.Reader, error) {
func performCryptoOperation[T runtimev1pb.DecryptRequest | runtimev1pb.EncryptRequest, Y runtimev1pb.DecryptResponse | runtimev1pb.EncryptResponse](
ctx context.Context,
stream grpc.ClientStream,
in io.Reader,
opts cryptoOperationOpts,
reqProto *T,
resProto *Y,
) (io.Reader, error) {
var err error
// Pipe for writing the response
pr, pw := io.Pipe()
Expand Down Expand Up @@ -110,11 +122,11 @@ func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.Cli

// First message only - add the options
if optsProto != nil {
reqProto.SetOptions(optsProto)
crypto.SetOptions(reqProto, optsProto)
optsProto = nil
} else {
// Reset the object so we can re-use it
reqProto.Reset()
crypto.Reset(reqProto)
}

n, err = in.Read(*reqBuf)
Expand All @@ -127,7 +139,7 @@ func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.Cli

// Send the chunk if there's anything to send
if n > 0 {
reqProto.SetPayload(&commonv1pb.StreamPayload{
crypto.SetPayload(reqProto, &commonv1pb.StreamPayload{
Data: (*reqBuf)[:n],
Seq: seq,
})
Expand Down Expand Up @@ -184,7 +196,7 @@ func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.Cli
}

// Write the data, if any, into the pipe
payload = resProto.GetPayload()
payload = crypto.GetPayload(resProto)
if payload != nil {
if payload.GetSeq() != expectSeq {
pw.CloseWithError(fmt.Errorf("invalid sequence number in chunk: %d (expected: %d)", payload.GetSeq(), expectSeq))
Expand All @@ -205,7 +217,7 @@ func (c *GRPCClient) performCryptoOperation(ctx context.Context, stream grpc.Cli
}

// Reset the proto
resProto.Reset()
crypto.Reset(resProto)
}

// Close the writer of the pipe when done
Expand Down
40 changes: 29 additions & 11 deletions client/crypto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

commonv1 "github.com/dapr/dapr/pkg/proto/common/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/client/internal/crypto"
commonv1 "github.com/dapr/go-sdk/internal/proto/dapr/proto/common/v1"
runtimev1pb "github.com/dapr/go-sdk/internal/proto/dapr/proto/runtime/v1"
)

func TestEncrypt(t *testing.T) {
Expand Down Expand Up @@ -189,22 +190,24 @@ func TestDecrypt(t *testing.T) {
/* --- Server methods --- */

func (s *testDaprServer) EncryptAlpha1(stream runtimev1pb.Dapr_EncryptAlpha1Server) error {
return s.performCryptoOperation(
return testPerformCryptoOperation(
stream,
&runtimev1pb.EncryptRequest{},
&runtimev1pb.EncryptResponse{},
)
}

func (s *testDaprServer) DecryptAlpha1(stream runtimev1pb.Dapr_DecryptAlpha1Server) error {
return s.performCryptoOperation(
return testPerformCryptoOperation(
stream,
&runtimev1pb.DecryptRequest{},
&runtimev1pb.DecryptResponse{},
)
}

func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqProto runtimev1pb.CryptoRequests, resProto runtimev1pb.CryptoResponses) error {
func testPerformCryptoOperation[T runtimev1pb.DecryptRequest | runtimev1pb.EncryptRequest, Y runtimev1pb.DecryptResponse | runtimev1pb.EncryptResponse](
stream grpc.ServerStream, reqProto *T, resProto *Y,
) error {
// This doesn't really encrypt or decrypt the data and just sends back whatever it receives
pr, pw := io.Pipe()

Expand All @@ -216,7 +219,7 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro
)
first := true
for !done && stream.Context().Err() == nil {
reqProto.Reset()
crypto.Reset(reqProto)
err = stream.RecvMsg(reqProto)
if errors.Is(err, io.EOF) {
done = true
Expand All @@ -225,16 +228,16 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro
return
}

if first && !reqProto.HasOptions() {
if first && !hasOptions(reqProto) {
pw.CloseWithError(errors.New("first message must have options"))
return
} else if !first && reqProto.HasOptions() {
} else if !first && hasOptions(reqProto) {
pw.CloseWithError(errors.New("messages after first must not have options"))
return
}
first = false

payload := reqProto.GetPayload()
payload := crypto.GetPayload(reqProto)
if payload != nil {
if payload.GetSeq() != expectSeq {
pw.CloseWithError(fmt.Errorf("invalid sequence number: %d (expected: %d)", payload.GetSeq(), expectSeq))
Expand All @@ -261,7 +264,7 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro
)
buf := make([]byte, 2<<10)
for !done && stream.Context().Err() == nil {
resProto.Reset()
crypto.Reset(resProto)

n, err = pr.Read(buf)
if errors.Is(err, io.EOF) {
Expand All @@ -271,7 +274,7 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro
}

if n > 0 {
resProto.SetPayload(&commonv1.StreamPayload{
crypto.SetPayload(resProto, &commonv1.StreamPayload{
Seq: seq,
Data: buf[:n],
})
Expand All @@ -286,3 +289,18 @@ func (s *testDaprServer) performCryptoOperation(stream grpc.ServerStream, reqPro

return nil
}

func hasOptions[T runtimev1pb.DecryptRequest | runtimev1pb.EncryptRequest](msg *T) bool {
if msg == nil {
return false
}

switch r := any(msg).(type) {
case *runtimev1pb.EncryptRequest:
return r.GetOptions() != nil
case *runtimev1pb.DecryptRequest:
return r.GetOptions() != nil
}

return false
}
Loading