From ceda219fe6f199cae17479deb81a45aa196070df Mon Sep 17 00:00:00 2001 From: Paul Annesley Date: Fri, 7 Mar 2025 20:08:44 +1030 Subject: [PATCH 1/4] WIP: `bktec bazel listen` stub subcommand (not yet implemented) --- .gitignore | 1 + internal/bes/bes.go | 22 ++++++++++++++++++++++ main.go | 7 +++++++ 3 files changed, 30 insertions(+) create mode 100644 internal/bes/bes.go diff --git a/.gitignore b/.gitignore index 220d374d..c26604f2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ *.dll *.so *.dylib +/bktec # Test binary, built with `go test -c` *.test diff --git a/internal/bes/bes.go b/internal/bes/bes.go new file mode 100644 index 00000000..b993edc9 --- /dev/null +++ b/internal/bes/bes.go @@ -0,0 +1,22 @@ +package bes + +import ( + "fmt" + "net" +) + +var host = "127.0.0.1" +var port = 60242 // 0 for OS-allocated + +func Listen() error { + addr := fmt.Sprintf("%s:%d", host, port) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("listening on %s: %w", addr, err) + } + fmt.Println("Bazel BES listener: grpc://" + listener.Addr().String()) + + // TODO + + return nil +} diff --git a/main.go b/main.go index 858932cf..557bc5d3 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "errors" "flag" "fmt" + "log" "os" "os/exec" "strconv" @@ -14,6 +15,7 @@ import ( "time" "github.com/buildkite/test-engine-client/internal/api" + "github.com/buildkite/test-engine-client/internal/bes" "github.com/buildkite/test-engine-client/internal/config" "github.com/buildkite/test-engine-client/internal/debug" "github.com/buildkite/test-engine-client/internal/env" @@ -69,6 +71,11 @@ func main() { logErrorAndExit(16, "upload: %v", err) } os.Exit(0) + } else if flag.Arg(0) == "bazel" && flag.Arg(1) == "listen" { + if err := bes.Listen(); err != nil { + log.Fatal(err) + } + os.Exit(0) } // get config From a2bde7a72ca557f50b6781a028139163e3ce302e Mon Sep 17 00:00:00 2001 From: Paul Annesley Date: Tue, 11 Mar 2025 15:23:59 +1030 Subject: [PATCH 2/4] bes.BuildEventServer WIP: gRPC listener prints TestResult test.xml paths --- go.mod | 17 +++++--- go.sum | 42 +++++++++++++----- internal/bes/bes.go | 102 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 143 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 7d50048f..a3f349af 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/buildkite/test-engine-client -go 1.21 +go 1.23.0 -toolchain go1.22.4 +toolchain go1.24.0 require ( github.com/buildkite/roko v1.3.1 @@ -11,19 +11,22 @@ require ( require ( drjosh.dev/zzglob v0.4.0 + github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c github.com/google/uuid v1.6.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/olekukonko/tablewriter v0.0.5 github.com/pact-foundation/pact-go/v2 v2.0.10 - golang.org/x/net v0.33.0 + golang.org/x/net v0.35.0 golang.org/x/sys v0.30.0 + google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb + google.golang.org/grpc v1.70.0 + google.golang.org/protobuf v1.36.5 ) require ( github.com/hashicorp/logutils v1.0.0 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect - golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect - google.golang.org/grpc v1.67.3 // indirect - google.golang.org/protobuf v1.36.3 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect ) diff --git a/go.sum b/go.sum index a915ffc6..1eebf23f 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,17 @@ drjosh.dev/zzglob v0.4.0 h1:gOb46aIHyHG8BlYpvZZM4dqR2dpsbKtI82IbYVAYIj4= drjosh.dev/zzglob v0.4.0/go.mod h1:c3V3WPyfG+81h/bNOalEaba0jEQl16i9efSAmWOeOw8= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c h1:qLnyVD+ND7Ll3p9Lw0Z7Vk5HirKRZcBRJzHELYe5Z84= +github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c/go.mod h1:GHZ5lGzUtz9LQ2oHt8EweXn0zS8t2sCD9bNBw9R9s8E= github.com/buildkite/roko v1.3.1 h1:t7K30ceLLYn6k7hQP4oq1c7dVlhgD5nRcuSRDEEnY1s= github.com/buildkite/roko v1.3.1/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -22,18 +30,32 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= +go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= +go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= +go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA= -google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8= -google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE= +google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e h1:nsxey/MfoGzYNduN0NN/+hqP9iiCIYsrVbXb/8hjFM8= +google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e/go.mod h1:Xsh8gBVxGCcbV8ZeTB9wI5XPyZ5RvC6V3CTeeplHbiA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e h1:YA5lmSs3zc/5w+xsRcHqpETkaYyK63ivEPzNTcUUlSA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= +google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= diff --git a/internal/bes/bes.go b/internal/bes/bes.go index b993edc9..8f3b5304 100644 --- a/internal/bes/bes.go +++ b/internal/bes/bes.go @@ -1,13 +1,28 @@ package bes import ( + "context" "fmt" + "io" + "log/slog" "net" + + bb_bes "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/emptypb" ) var host = "127.0.0.1" var port = 60242 // 0 for OS-allocated +type BuildEventServer struct { +} + func Listen() error { addr := fmt.Sprintf("%s:%d", host, port) listener, err := net.Listen("tcp", addr) @@ -16,7 +31,92 @@ func Listen() error { } fmt.Println("Bazel BES listener: grpc://" + listener.Addr().String()) - // TODO + opts := []grpc.ServerOption{} + grpcServer := grpc.NewServer(opts...) + + build.RegisterPublishBuildEventServer(grpcServer, newServer()) + grpcServer.Serve(listener) return nil } + +func newServer() build.PublishBuildEventServer { + return BuildEventServer{} +} + +// PublishLifecycleEvent handles life cycle events. +func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { + slog.DebugContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) + return &emptypb.Empty{}, nil +} + +// PublishBuildToolEventStream handles a build tool event stream. +// bktec thanks buildbarn/bb-portal for the basis of this :D +func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { + ctx := stream.Context() + + slog.InfoContext(ctx, "Stream started") + + for { + req, err := stream.Recv() + if err == io.EOF { + slog.InfoContext(ctx, "Stream finished") + return nil + } else if err != nil { + slog.ErrorContext(ctx, "Recv failed", "err", err) + return err + } + + streamID := req.OrderedBuildEvent.GetStreamId() + seq := req.OrderedBuildEvent.GetSequenceNumber() + + event := req.GetOrderedBuildEvent().GetEvent() + slog.DebugContext(ctx, "stream event", "seq", seq, "event", event) + + if event.GetBazelEvent() == nil { + slog.DebugContext(ctx, "not a bazel event", seq, seq) + continue + } + + var bazelEvent bb_bes.BuildEvent + if err = event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { + //return fmt.Errorf("unmarshaling bazel event: %w", err) + slog.InfoContext(ctx, "error unmarshalling") + } + + // slog.InfoContext(ctx, "unmarshalled bazel event", "event", &bazelEvent) + + payload := bazelEvent.GetPayload() + if testResult, ok := payload.(*bb_bes.BuildEvent_TestResult); ok { + r := testResult.TestResult + files := []string{} + for _, x := range r.GetTestActionOutput() { + if x.GetName() == "test.xml" { + files = append(files, x.GetUri()) + } + } + slog.InfoContext(ctx, "TestResult", + "status", r.GetStatus(), + "cached", r.GetCachedLocally(), + "dur", r.GetTestAttemptDuration().AsDuration().String(), + "files", files, + ) + } + + // ack + ack := &build.PublishBuildToolEventStreamResponse{StreamId: streamID, SequenceNumber: seq} + if err := stream.Send(ack); err != nil { + grpcErr := status.Convert(err) + if grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { + return nil + } + + slog.ErrorContext(ctx, "ack failed", + "err", err, + "stream", streamID, + "seq", seq, + ) + } + } +} From 4023adae2b0d4e2e608c66a773bbfe8a00def3bb Mon Sep 17 00:00:00 2001 From: Paul Annesley Date: Tue, 11 Mar 2025 16:45:32 +1030 Subject: [PATCH 3/4] package bes: refactor, closer to bb-portal's implementation PublishBuildToolEventStream is copied verbatim from bb-portal, and BuildEventHandler and BuildEventChannel interfaces/structs are created to be compatible with that code. This also resolves a bug in the earlier WIP implementation where most event sequence IDs were not acknowledged, resulting in a warning shown on the Bazel side sending the events. --- internal/bes/bes.go | 163 +++++++++++++++++++--------- internal/bes/channel.go | 88 +++++++++++++++ internal/bes/handler.go | 35 ++++++ internal/bes/quietslog/quietslog.go | 26 +++++ 4 files changed, 258 insertions(+), 54 deletions(-) create mode 100644 internal/bes/channel.go create mode 100644 internal/bes/handler.go create mode 100644 internal/bes/quietslog/quietslog.go diff --git a/internal/bes/bes.go b/internal/bes/bes.go index 8f3b5304..056aa026 100644 --- a/internal/bes/bes.go +++ b/internal/bes/bes.go @@ -1,13 +1,17 @@ +// Package bes implements a Bazel Build Event Service gRPC listener: +// https://bazel.build/remote/bep#build-event-service +// It listens for TestResult events, and uploads their XML report to Test +// Engine. package bes import ( "context" "fmt" "io" - "log/slog" "net" + "sort" - bb_bes "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + slog "github.com/buildkite/test-engine-client/internal/bes/quietslog" "google.golang.org/genproto/googleapis/devtools/build/v1" "google.golang.org/grpc" @@ -21,6 +25,7 @@ var host = "127.0.0.1" var port = 60242 // 0 for OS-allocated type BuildEventServer struct { + handler *BuildEventHandler } func Listen() error { @@ -44,79 +49,129 @@ func newServer() build.PublishBuildEventServer { return BuildEventServer{} } +// PublishLifecycleEvent is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// // PublishLifecycleEvent handles life cycle events. func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { - slog.DebugContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) + slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) return &emptypb.Empty{}, nil } +// PublishBuildToolEventStream is copied verbatim from: +// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go +// The BuildEventHandler and BuildEventChannel that it passes events to mimicks +// the expected interfaces, but provide a bktec-specific implementation. +// // PublishBuildToolEventStream handles a build tool event stream. // bktec thanks buildbarn/bb-portal for the basis of this :D func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { - ctx := stream.Context() - - slog.InfoContext(ctx, "Stream started") + slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) + + // List of SequenceIds we've received. + // We'll want to ack these once all events are received, as we don't support resumption. + seqNrs := make([]int64, 0) + + ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { + if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ + StreamId: streamID, + SequenceNumber: sequenceNumber, + }); err != nil { + + // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete + // its not an error when the send fails. the bes gracefully terminated the close + // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) + // the context is processed in the background, so by the time we are acknowledging these + // requests, the client connection may have already timed out and these errors can be + // safely ignored + grpcErr := status.Convert(err) + if isClosing && + grpcErr.Code() == codes.Unavailable && + grpcErr.Message() == "transport is closing" { + return + } - for { - req, err := stream.Recv() - if err == io.EOF { - slog.InfoContext(ctx, "Stream finished") - return nil - } else if err != nil { - slog.ErrorContext(ctx, "Recv failed", "err", err) - return err + slog.ErrorContext( + stream.Context(), + "Send failed", + "err", + err, + "streamid", + streamID, + "sequenceNumber", + sequenceNumber, + ) } + } - streamID := req.OrderedBuildEvent.GetStreamId() - seq := req.OrderedBuildEvent.GetSequenceNumber() + var streamID *build.StreamId + reqCh := make(chan *build.PublishBuildToolEventStreamRequest) + errCh := make(chan error) + var eventCh BuildEventChannel + + go func() { + for { + req, err := stream.Recv() + if err != nil { + errCh <- err + return + } + reqCh <- req + } + }() - event := req.GetOrderedBuildEvent().GetEvent() - slog.DebugContext(ctx, "stream event", "seq", seq, "event", event) + for { + select { + case err := <-errCh: + if err == io.EOF { + slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) + + if eventCh == nil { + slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) + return nil + } - if event.GetBazelEvent() == nil { - slog.DebugContext(ctx, "not a bazel event", seq, seq) - continue - } + // Validate that all events were received + sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) - var bazelEvent bb_bes.BuildEvent - if err = event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { - //return fmt.Errorf("unmarshaling bazel event: %w", err) - slog.InfoContext(ctx, "error unmarshalling") - } + // TODO: Find out if initial sequence number can be != 1 + expected := int64(1) + for _, seqNr := range seqNrs { + if seqNr != expected { + return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) + } + expected++ + } - // slog.InfoContext(ctx, "unmarshalled bazel event", "event", &bazelEvent) + err := eventCh.Finalize() + if err != nil { + return err + } - payload := bazelEvent.GetPayload() - if testResult, ok := payload.(*bb_bes.BuildEvent_TestResult); ok { - r := testResult.TestResult - files := []string{} - for _, x := range r.GetTestActionOutput() { - if x.GetName() == "test.xml" { - files = append(files, x.GetUri()) + // Ack all events + for _, seqNr := range seqNrs { + ack(streamID, seqNr, true) } - } - slog.InfoContext(ctx, "TestResult", - "status", r.GetStatus(), - "cached", r.GetCachedLocally(), - "dur", r.GetTestAttemptDuration().AsDuration().String(), - "files", files, - ) - } - // ack - ack := &build.PublishBuildToolEventStreamResponse{StreamId: streamID, SequenceNumber: seq} - if err := stream.Send(ack); err != nil { - grpcErr := status.Convert(err) - if grpcErr.Code() == codes.Unavailable && - grpcErr.Message() == "transport is closing" { return nil } - slog.ErrorContext(ctx, "ack failed", - "err", err, - "stream", streamID, - "seq", seq, - ) + slog.ErrorContext(stream.Context(), "Recv failed", "err", err) + return err + + case req := <-reqCh: + // First event + if streamID == nil { + streamID = req.OrderedBuildEvent.GetStreamId() + eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) + } + + seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) + + if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { + slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) + return err + } } } } diff --git a/internal/bes/channel.go b/internal/bes/channel.go new file mode 100644 index 00000000..c7ea4c19 --- /dev/null +++ b/internal/bes/channel.go @@ -0,0 +1,88 @@ +package bes + +import ( + "context" + "log/slog" + "time" + + "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventChannel in bktec mimics the bb-portal interface so that the +// BuildEventServer.PublishBuildEventServer code can be used verbatim. +// +// BuildEventChannel handles a single BuildEvent stream +type BuildEventChannel interface { + // HandleBuildEvent processes a single BuildEvent + // This method should be called for each received event. + HandleBuildEvent(event *build.BuildEvent) error + + // Finalize does post-processing of a stream of BuildEvents. + // This method should be called after receiving the EOF event. + Finalize() error +} + +type buildEventChannel struct { + ctx context.Context + streamID *build.StreamId +} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + if event.GetBazelEvent() == nil { + return nil + } + var bazelEvent bes.BuildEvent + if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { + slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err) + return err + } + + payload := bazelEvent.GetPayload() + if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok { + r := testResult.TestResult + files := []string{} + for _, x := range r.GetTestActionOutput() { + if x.GetName() == "test.xml" { + files = append(files, x.GetUri()) + } + } + slog.Info("TestResult", + "status", r.GetStatus(), + "cached", r.GetCachedLocally(), + "dur", r.GetTestAttemptDuration().AsDuration().String(), + "files", files, + ) + } + + return nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *buildEventChannel) Finalize() error { + // defer the ctx so its not reaped when the client closes the connection + ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) + defer cancel() + + slog.Debug("finalizing build event channel") + _ = ctx + // TODO: finalize anything that needs finalizing? + + cancel() + return nil +} + +// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. +// It is used when receiving a stream of events that we wish to ack without processing. +type noOpBuildEventChannel struct{} + +// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. +func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { + return nil +} + +// Finalize implements BuildEventChannel.Finalize. +func (c *noOpBuildEventChannel) Finalize() error { + return nil +} diff --git a/internal/bes/handler.go b/internal/bes/handler.go new file mode 100644 index 00000000..e8576089 --- /dev/null +++ b/internal/bes/handler.go @@ -0,0 +1,35 @@ +package bes + +import ( + "context" + + "google.golang.org/genproto/googleapis/devtools/build/v1" +) + +// BuildEventHandler in bktec mimics the bb-portal handler so that the +// BuildEventServer.PublishBuildToolEventStream code can be used verbatim. +// +// BuildEventHandler orchestrates the handling of incoming Build Event streams. +// For each incoming stream, and BuildEventChannel is created, which handles that stream. +// BuildEventHandler is responsible for managing the things that are common to these event streams. +type BuildEventHandler struct { +} + +// NewBuildEventHandler constructs a new BuildEventHandler +func NewBuildEventHandler() *BuildEventHandler { + return &BuildEventHandler{} +} + +// CreateEventChannel creates a new BuildEventChannel +func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel { + // If the first event does not have sequence number 1, we have processed this + // invocation previously, and should skip all processing. + if initialEvent.SequenceNumber != 1 { + return &noOpBuildEventChannel{} + } + + return &buildEventChannel{ + ctx: ctx, + streamID: initialEvent.StreamId, + } +} diff --git a/internal/bes/quietslog/quietslog.go b/internal/bes/quietslog/quietslog.go new file mode 100644 index 00000000..4bf11c62 --- /dev/null +++ b/internal/bes/quietslog/quietslog.go @@ -0,0 +1,26 @@ +// Package quietslog provides a replacement for slog which downgrades Info +// messages to Debug instead, so that the log output is quieter. This is done +// specifically so that bes.BuildEventServer.PublishBuildToolEventStream() +// source code can be kept unmodified from the bb-portal upstream it's copied +// from. +package quietslog + +import ( + "context" + "log/slog" +) + +// InfoContext delegates to DebugContext of the real logger, making this logger quiet. +func InfoContext(ctx context.Context, msg string, args ...any) { + slog.DebugContext(ctx, msg, args...) +} + +// WarnContext wraps the direct logger directly. +func WarnContext(ctx context.Context, msg string, args ...any) { + slog.WarnContext(ctx, msg, args...) +} + +// ErrorContext wraps the direct logger directly. +func ErrorContext(ctx context.Context, msg string, args ...any) { + slog.ErrorContext(ctx, msg, args...) +} From 3f922407f50d0baf26c8fd28eeeb0c705d8914b4 Mon Sep 17 00:00:00 2001 From: Paul Annesley Date: Wed, 19 Mar 2025 09:43:22 +1300 Subject: [PATCH 4/4] WIP: bazel-bes uploads via channels etc --- internal/bes/bes.go | 26 ---------- internal/bes/bes_test.go | 14 ++++++ internal/bes/channel.go | 27 +++++++++-- internal/bes/handler.go | 12 +++-- internal/bes/listen.go | 101 +++++++++++++++++++++++++++++++++++++++ internal/bes/uploader.go | 76 +++++++++++++++++++++++++++++ main.go | 2 +- 7 files changed, 223 insertions(+), 35 deletions(-) create mode 100644 internal/bes/bes_test.go create mode 100644 internal/bes/listen.go create mode 100644 internal/bes/uploader.go diff --git a/internal/bes/bes.go b/internal/bes/bes.go index 056aa026..6d247263 100644 --- a/internal/bes/bes.go +++ b/internal/bes/bes.go @@ -8,47 +8,21 @@ import ( "context" "fmt" "io" - "net" "sort" slog "github.com/buildkite/test-engine-client/internal/bes/quietslog" "google.golang.org/genproto/googleapis/devtools/build/v1" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/emptypb" ) -var host = "127.0.0.1" -var port = 60242 // 0 for OS-allocated - type BuildEventServer struct { handler *BuildEventHandler } -func Listen() error { - addr := fmt.Sprintf("%s:%d", host, port) - listener, err := net.Listen("tcp", addr) - if err != nil { - return fmt.Errorf("listening on %s: %w", addr, err) - } - fmt.Println("Bazel BES listener: grpc://" + listener.Addr().String()) - - opts := []grpc.ServerOption{} - grpcServer := grpc.NewServer(opts...) - - build.RegisterPublishBuildEventServer(grpcServer, newServer()) - grpcServer.Serve(listener) - - return nil -} - -func newServer() build.PublishBuildEventServer { - return BuildEventServer{} -} - // PublishLifecycleEvent is copied verbatim from: // https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go // diff --git a/internal/bes/bes_test.go b/internal/bes/bes_test.go new file mode 100644 index 00000000..57495e2a --- /dev/null +++ b/internal/bes/bes_test.go @@ -0,0 +1,14 @@ +package bes + +import "testing" + +func TestPathFromURI(t *testing.T) { + path, err := pathFromURI("file:///hello/world.txt") + if err != nil { + t.Errorf("pathFromURI error: %v", err) + } + + if want := "/hello/world.txt"; want != path { + t.Errorf("wanted %v got %v", want, path) + } +} diff --git a/internal/bes/channel.go b/internal/bes/channel.go index c7ea4c19..621e6a5e 100644 --- a/internal/bes/channel.go +++ b/internal/bes/channel.go @@ -2,7 +2,9 @@ package bes import ( "context" + "fmt" "log/slog" + "net/url" "time" "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" @@ -24,8 +26,9 @@ type BuildEventChannel interface { } type buildEventChannel struct { - ctx context.Context - streamID *build.StreamId + ctx context.Context + streamID *build.StreamId + filenames chan<- string } // HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. @@ -45,7 +48,12 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { files := []string{} for _, x := range r.GetTestActionOutput() { if x.GetName() == "test.xml" { - files = append(files, x.GetUri()) + path, err := pathFromURI(x.GetUri()) + if err != nil { + return err // maybe just a log a warning? + } + files = append(files, path) + c.filenames <- path } } slog.Info("TestResult", @@ -59,13 +67,24 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { return nil } +func pathFromURI(uri string) (string, error) { + url, err := url.Parse(uri) + if err != nil { + return "", err + } + if url.Scheme != "file" { + return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme) + } + return url.Path, nil +} + // Finalize implements BuildEventChannel.Finalize. func (c *buildEventChannel) Finalize() error { // defer the ctx so its not reaped when the client closes the connection ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) defer cancel() - slog.Debug("finalizing build event channel") + slog.Info("finalizing build event channel") _ = ctx // TODO: finalize anything that needs finalizing? diff --git a/internal/bes/handler.go b/internal/bes/handler.go index e8576089..95134283 100644 --- a/internal/bes/handler.go +++ b/internal/bes/handler.go @@ -13,11 +13,14 @@ import ( // For each incoming stream, and BuildEventChannel is created, which handles that stream. // BuildEventHandler is responsible for managing the things that are common to these event streams. type BuildEventHandler struct { + filenames chan<- string } // NewBuildEventHandler constructs a new BuildEventHandler -func NewBuildEventHandler() *BuildEventHandler { - return &BuildEventHandler{} +func NewBuildEventHandler(filenames chan<- string) *BuildEventHandler { + return &BuildEventHandler{ + filenames: filenames, + } } // CreateEventChannel creates a new BuildEventChannel @@ -29,7 +32,8 @@ func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent } return &buildEventChannel{ - ctx: ctx, - streamID: initialEvent.StreamId, + ctx: ctx, + streamID: initialEvent.StreamId, + filenames: h.filenames, } } diff --git a/internal/bes/listen.go b/internal/bes/listen.go new file mode 100644 index 00000000..2a74452b --- /dev/null +++ b/internal/bes/listen.go @@ -0,0 +1,101 @@ +package bes + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + + "github.com/buildkite/test-engine-client/internal/env" + "github.com/buildkite/test-engine-client/internal/upload" + "google.golang.org/genproto/googleapis/devtools/build/v1" + "google.golang.org/grpc" +) + +func ListenCLI(argv []string, env env.Env) error { + flags := flag.NewFlagSet("bktec bazel listen", flag.ExitOnError) + portFlag := flags.Int("port", 0, "gRPC port to listen") + listenHostFlag := flags.String("listen-host", "127.0.0.1", "gRPC host to listen") + debugFlag := flags.Bool("debug", false, "debug logging") + flags.Parse(argv) + + if *debugFlag { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // a channel to propagate OS signals + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + // configure uploader + cfg, err := upload.ConfigFromEnv(env) + if err != nil { + return fmt.Errorf("uploader configuration: %w", err) + } + runEnv, err := upload.RunEnvFromEnv(env) + if err != nil { + return fmt.Errorf("uploader run_env configuration: %w", err) + } + uploader := NewUploader(cfg, runEnv, "junit") + go uploader.Start(ctx) + + // configure gRPC Bazel BES server + addr := fmt.Sprintf("%s:%d", *listenHostFlag, *portFlag) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("listening on %s: %w", addr, err) + } + opts := []grpc.ServerOption{} + srv := grpc.NewServer(opts...) + build.RegisterPublishBuildEventServer(srv, BuildEventServer{ + handler: &BuildEventHandler{ + filenames: uploader.Filenames, + }, + }) + slog.Info("Bazel BES listener", "addr", "grpc://"+listener.Addr().String()) + go serve(srv, listener) + + // main loop + run := true + sigCount := 0 + for run { + select { + case url, ok := <-uploader.Responses: + if !ok { + slog.Debug("Response channel closed") + run = false + continue + } + slog.Info("Uploaded", "url", url) + case err := <-uploader.Errs: + slog.Error("Upload error", "error", err) + case sig := <-signals: + sigCount++ + srv.Stop() + if sigCount == 1 { + slog.Info("Stopping (again to force)...", "signal", sig) + uploader.Stop() + } else { + slog.Info("Stopping forcefully...", "signal", sig) + cancel() + } + } + } + + slog.Debug("done") + return nil +} + +func serve(s *grpc.Server, listener net.Listener) { + err := s.Serve(listener) + if err != nil { + slog.Error("gRPC server error", "err", err) + } +} diff --git a/internal/bes/uploader.go b/internal/bes/uploader.go new file mode 100644 index 00000000..8f7fc5a1 --- /dev/null +++ b/internal/bes/uploader.go @@ -0,0 +1,76 @@ +package bes + +import ( + "context" + "log/slog" + + "github.com/buildkite/test-engine-client/internal/upload" +) + +type Uploader struct { + Config upload.Config + RunEnv upload.RunEnvMap + Format string + Filenames chan string + Responses chan string + Errs chan error + + stopping bool +} + +func NewUploader(cfg upload.Config, runEnv upload.RunEnvMap, format string) *Uploader { + // a channel to pass filenames from BES server to uploader + filenames := make(chan string, 1024) + + // a channel to receive response upload URLs + responses := make(chan string) + + // a channel to receive errors from the uploader + errs := make(chan error) + + return &Uploader{ + Config: cfg, + RunEnv: runEnv, + Format: format, + Filenames: filenames, + Responses: responses, + Errs: errs, + } +} + +func (u *Uploader) Start(ctx context.Context) { + for filename := range u.Filenames { + if ctx.Err() != nil { + slog.Debug("Uploader context canceled") + break + } + resp, err := u.UploadFile(ctx, filename) + if err != nil { + u.Errs <- err + continue + } + u.Responses <- resp["upload_url"] + } + slog.Debug("Uploader finished") + close(u.Responses) +} + +// Stop closes the Filenames channel; filenames already buffered on the channel +// will be uploaded before finishing. +func (u *Uploader) Stop() { + if u.stopping { + slog.Warn("Uploader GracefulStop: already stopping") + return + } + slog.Debug("Uploader GracefulStop") + u.stopping = true + close(u.Filenames) +} + +func (u *Uploader) UploadFile(ctx context.Context, filename string) (map[string]string, error) { + resp, err := upload.Upload(ctx, u.Config, u.RunEnv, u.Format, filename) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/main.go b/main.go index 557bc5d3..668be0c9 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,7 @@ func main() { } os.Exit(0) } else if flag.Arg(0) == "bazel" && flag.Arg(1) == "listen" { - if err := bes.Listen(); err != nil { + if err := bes.ListenCLI(os.Args[3:], env); err != nil { log.Fatal(err) } os.Exit(0)