diff --git a/Package.swift b/Package.swift index c4dfde6..405bbdf 100644 --- a/Package.swift +++ b/Package.swift @@ -26,6 +26,10 @@ let products: [Product] = [ name: "GRPCReflectionService", targets: ["GRPCReflectionService"] ), + .library( + name: "GRPCOTelMetricsInterceptors", + targets: ["GRPCOTelMetricsInterceptors"] + ), .library( name: "GRPCOTelTracingInterceptors", targets: ["GRPCOTelTracingInterceptors"] @@ -53,6 +57,10 @@ let dependencies: [Package.Dependency] = [ url: "https://github.com/apple/swift-protobuf.git", from: "1.31.0" ), + .package( + url: "https://github.com/apple/swift-metrics.git", + from: "2.7.1" + ), .package( url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0" @@ -131,10 +139,48 @@ let targets: [Target] = [ swiftSettings: defaultSwiftSettings ), + // gRPC interceptors shared infrastructure. + .target( + name: "GRPCInterceptorsCore", + dependencies: [ + .product(name: "GRPCCore", package: "grpc-swift-2"), + ], + swiftSettings: defaultSwiftSettings + ), + .testTarget( + name: "GRPCInterceptorsCoreTests", + dependencies: [ + .target(name: "GRPCInterceptorsCore") + ], + swiftSettings: defaultSwiftSettings + ), + + // gRPC OTel metrics interceptors. + .target( + name: "GRPCOTelMetricsInterceptors", + dependencies: [ + .target(name: "GRPCInterceptorsCore"), + .product(name: "GRPCCore", package: "grpc-swift-2"), + .product(name: "Metrics", package: "swift-metrics"), + ], + swiftSettings: defaultSwiftSettings + ), + .testTarget( + name: "GRPCOTelMetricsInterceptorsTests", + dependencies: [ + .target(name: "GRPCOTelMetricsInterceptors"), + .product(name: "GRPCCore", package: "grpc-swift-2"), + .product(name: "Metrics", package: "swift-metrics"), + .product(name: "MetricsTestKit", package: "swift-metrics"), + ], + swiftSettings: defaultSwiftSettings + ), + // gRPC OTel tracing interceptors. .target( name: "GRPCOTelTracingInterceptors", dependencies: [ + .target(name: "GRPCInterceptorsCore"), .product(name: "GRPCCore", package: "grpc-swift-2"), .product(name: "Tracing", package: "swift-distributed-tracing"), ], diff --git a/Sources/GRPCInterceptorsCore/BaseContext.swift b/Sources/GRPCInterceptorsCore/BaseContext.swift new file mode 100644 index 0000000..7dfc3ea --- /dev/null +++ b/Sources/GRPCInterceptorsCore/BaseContext.swift @@ -0,0 +1,32 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package import GRPCCore + +@available(gRPCSwiftExtras 2.0, *) +package protocol BaseContext { + var descriptor: MethodDescriptor { get } + + var remotePeer: String { get } + + var localPeer: String { get } +} + +@available(gRPCSwiftExtras 2.0, *) +extension ClientContext: BaseContext {} + +@available(gRPCSwiftExtras 2.0, *) +extension ServerContext: BaseContext {} diff --git a/Sources/GRPCInterceptorsCore/GRPCOTelAttributeKeys.swift b/Sources/GRPCInterceptorsCore/GRPCOTelAttributeKeys.swift new file mode 100644 index 0000000..2575a3d --- /dev/null +++ b/Sources/GRPCInterceptorsCore/GRPCOTelAttributeKeys.swift @@ -0,0 +1,38 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package enum GRPCOTelAttributeKeys { + package static let rpcSystem = "rpc.system" + package static let rpcMethod = "rpc.method" + package static let rpcService = "rpc.service" + package static let rpcMessageID = "rpc.message.id" + package static let rpcMessageType = "rpc.message.type" + package static let grpcStatusCode = "rpc.grpc.status_code" + + package static let serverAddress = "server.address" + package static let serverPort = "server.port" + + package static let clientAddress = "client.address" + package static let clientPort = "client.port" + + package static let networkTransport = "network.transport" + package static let networkType = "network.type" + package static let networkPeerAddress = "network.peer.address" + package static let networkPeerPort = "network.peer.port" + + package static let requestMetadataPrefix = "rpc.grpc.request.metadata." + package static let responseMetadataPrefix = "rpc.grpc.response.metadata." +} diff --git a/Sources/GRPCInterceptorsCore/HookedAsyncSequence.swift b/Sources/GRPCInterceptorsCore/HookedAsyncSequence.swift new file mode 100644 index 0000000..db704d0 --- /dev/null +++ b/Sources/GRPCInterceptorsCore/HookedAsyncSequence.swift @@ -0,0 +1,81 @@ +/* + * Copyright 2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@available(gRPCSwiftExtras 2.0, *) +package struct HookedRPCAsyncSequence: AsyncSequence, Sendable +where Wrapped.Element: Sendable { + private let wrapped: Wrapped + + private let forEachElement: @Sendable (Wrapped.Element) -> Void + private let onFinish: @Sendable ((any Error)?) -> Void + + package init( + wrapping sequence: Wrapped, + forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, + onFinish: @escaping @Sendable ((any Error)?) -> Void + ) { + self.wrapped = sequence + self.forEachElement = forEachElement + self.onFinish = onFinish + } + + package func makeAsyncIterator() -> HookedAsyncIterator { + HookedAsyncIterator( + self.wrapped, + forEachElement: self.forEachElement, + onFinish: self.onFinish + ) + } + + package struct HookedAsyncIterator: AsyncIteratorProtocol { + package typealias Element = Wrapped.Element + + private var wrapped: Wrapped.AsyncIterator + private let forEachElement: @Sendable (Wrapped.Element) -> Void + private let onFinish: @Sendable ((any Error)?) -> Void + + init( + _ sequence: Wrapped, + forEachElement: @escaping @Sendable (Wrapped.Element) -> Void, + onFinish: @escaping @Sendable ((any Error)?) -> Void + ) { + self.wrapped = sequence.makeAsyncIterator() + self.forEachElement = forEachElement + self.onFinish = onFinish + } + + package mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(Wrapped.Failure) -> Wrapped.Element? { + do { + if let element = try await self.wrapped.next(isolation: actor) { + self.forEachElement(element) + return element + } else { + self.onFinish(nil) + return nil + } + } catch { + self.onFinish(error) + throw error + } + } + + package mutating func next() async throws -> Wrapped.Element? { + try await self.next(isolation: nil) + } + } +} diff --git a/Sources/GRPCInterceptorsCore/HookedWriter.swift b/Sources/GRPCInterceptorsCore/HookedWriter.swift new file mode 100644 index 0000000..0ce1bff --- /dev/null +++ b/Sources/GRPCInterceptorsCore/HookedWriter.swift @@ -0,0 +1,40 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package import GRPCCore + +@available(gRPCSwiftExtras 2.0, *) +package struct HookedWriter: RPCWriterProtocol { + private let writer: any RPCWriterProtocol + private let afterEachWrite: @Sendable () -> Void + + package init( + wrapping other: some RPCWriterProtocol, + afterEachWrite: @Sendable @escaping () -> Void + ) { + self.writer = other + self.afterEachWrite = afterEachWrite + } + + package func write(_ element: Element) async throws { + try await self.writer.write(element) + self.afterEachWrite() + } + + package func write(contentsOf elements: some Sequence) async throws { + try await self.writer.write(contentsOf: elements) + self.afterEachWrite() + } +} diff --git a/Sources/GRPCInterceptorsCore/Int+IpAddressPortStringBytes.swift b/Sources/GRPCInterceptorsCore/Int+IpAddressPortStringBytes.swift new file mode 100644 index 0000000..5d26c6d --- /dev/null +++ b/Sources/GRPCInterceptorsCore/Int+IpAddressPortStringBytes.swift @@ -0,0 +1,44 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +extension Int { + package init?(ipAddressPortStringBytes: some Collection) { + guard (1 ... 5).contains(ipAddressPortStringBytes.count) else { + // Valid IP port values go up to 2^16-1 (65535), which is 5 digits long. + // If the string we get is over 5 characters, we know for sure that this is an invalid port. + // If it's empty, we also know it's invalid as we need at least one digit. + return nil + } + + var value = 0 + for utf8Char in ipAddressPortStringBytes { + value &*= 10 + guard (UInt8(ascii: "0") ... UInt8(ascii: "9")).contains(utf8Char) else { + // non-digit character + return nil + } + value &+= Int(utf8Char &- UInt8(ascii: "0")) + } + + guard value <= Int(UInt16.max) else { + // Valid IP port values go up to 2^16-1. + // If a number greater than this was given, it can't be a valid port. + return nil + } + + self = value + } +} diff --git a/Sources/GRPCInterceptorsCore/PeerAddress.swift b/Sources/GRPCInterceptorsCore/PeerAddress.swift new file mode 100644 index 0000000..c7fb8f7 --- /dev/null +++ b/Sources/GRPCInterceptorsCore/PeerAddress.swift @@ -0,0 +1,87 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore + +package enum PeerAddress: Equatable { + case ipv4(address: String, port: Int?) + case ipv6(address: String, port: Int?) + case unixDomainSocket(path: String) + + package init?(_ address: String) { + // We expect this address to be of one of these formats: + // - ipv4:: for ipv4 addresses + // - ipv6:[]: for ipv6 addresses + // - unix: for UNIX domain sockets + let addressUTF8View = address.utf8 + + // First get the first component so that we know what type of address we're dealing with + let firstColonIndex = addressUTF8View.firstIndex(of: UInt8(ascii: ":")) + + guard let firstColonIndex else { + // This is some unexpected/unknown format + return nil + } + + let addressType = addressUTF8View[.. + self = .unixDomainSocket(path: String(addressWithoutType) ?? "") + } else { + // This is some unexpected/unknown format + return nil + } + } +} diff --git a/Sources/GRPCInterceptorsCore/ServiceKind.swift b/Sources/GRPCInterceptorsCore/ServiceKind.swift new file mode 100644 index 0000000..d632647 --- /dev/null +++ b/Sources/GRPCInterceptorsCore/ServiceKind.swift @@ -0,0 +1,20 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package enum ServiceKind: String { + case client + case server +} diff --git a/Sources/GRPCOTelMetricsInterceptors/ClientOTelMetricsInterceptor.swift b/Sources/GRPCOTelMetricsInterceptors/ClientOTelMetricsInterceptor.swift new file mode 100644 index 0000000..375153a --- /dev/null +++ b/Sources/GRPCOTelMetricsInterceptors/ClientOTelMetricsInterceptor.swift @@ -0,0 +1,101 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore +internal import GRPCInterceptorsCore +package import Metrics + +/// A client interceptor that records metrics information for a request. +/// +/// For more information, refer to the documentation for `swift-metrics`. +/// +/// This interceptor will record all required and recommended metrics and dimensions as defined by OpenTelemetry's documentation on: +/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics +@available(gRPCSwiftExtras 2.0, *) +public struct ClientOTelMetricsInterceptor: ClientInterceptor { + private let serverHostname: String + private let networkTransport: String + private let metricsFactory: (any MetricsFactory)? + + /// Create a new instance of a `ClientOTelMetricsInterceptor`. + /// - Parameters: + /// - serverHostname: The hostname of the RPC server. This will be the value for the `server.address` dimension in metrics. + /// - networkTransport: The transport in use (e.g. "tcp", "unix"). This will be the value for the + /// `network.transport` dimension in metrics. + public init(serverHostname: String, networkTransport: String) { + self.init( + serverHostname: serverHostname, + networkTransport: networkTransport, + metricsFactory: nil + ) + } + + package init( + serverHostname: String, + networkTransport: String, + metricsFactory: (any MetricsFactory)? + ) { + self.serverHostname = serverHostname + self.networkTransport = networkTransport + self.metricsFactory = metricsFactory + } + + public func intercept( + request: StreamingClientRequest, + context: ClientContext, + next: (StreamingClientRequest, ClientContext) async throws -> StreamingClientResponse< + Output + > + ) async throws -> StreamingClientResponse { + let dimensions = context.dimensions( + serverHostname: self.serverHostname, + networkTransport: self.networkTransport + ) + + let metricsContext = GRPCMetricsContext( + kind: .client, + dimensions: dimensions, + metricsFactory: self.metricsFactory ?? MetricsSystem.factory + ) + + var request = request + let wrappedProducer = request.producer + request.producer = { writer in + let metricsWriter = HookedWriter(wrapping: writer) { + metricsContext.recordSentMessage() + } + try await wrappedProducer(RPCWriter(wrapping: metricsWriter)) + } + + var response = try await next(request, context) + + switch response.accepted { + case var .success(contents): + let sequence = HookedRPCAsyncSequence(wrapping: contents.bodyParts) { _ in + metricsContext.recordReceivedMessage() + } onFinish: { error in + metricsContext.recordCallFinished(error: error) + } + + contents.bodyParts = RPCAsyncSequence(wrapping: sequence) + response.accepted = .success(contents) + case let .failure(rpcError): + metricsContext.recordCallFinished(error: rpcError) + } + + return response + } +} diff --git a/Sources/GRPCOTelMetricsInterceptors/Documentation.docc/Documentation.md b/Sources/GRPCOTelMetricsInterceptors/Documentation.docc/Documentation.md new file mode 100644 index 0000000..c13cfc1 --- /dev/null +++ b/Sources/GRPCOTelMetricsInterceptors/Documentation.docc/Documentation.md @@ -0,0 +1,56 @@ +# ``GRPCOTelMetricsInterceptors`` + +This module contains client and server tracing interceptors adhering to OpenTelemetry's +recommendations on recording metrics. + +## Overview + +You can read more on this topic at [OpenTelemetry's documentation](https://opentelemetry.io/docs). +Some relevant pages listing which attributes you can expect on your metrics include: +- [RPC Metrics](https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans) +- [gRPC conventions](https://opentelemetry.io/docs/specs/semconv/rpc/grpc) + +You can set up a client interceptor like so during your bootstrapping phase: + +```swift +// Create the client interceptor +let interceptor = ClientOTelMetricsInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" +) + +// Add it as an interceptor when creating your client +let client = GRPCClient( + transport: transport, + interceptors: [interceptor] +) + +// Finally run your client +try await client.runConnections() +``` + +You can similarly add the server interceptor to your server like this: + +```swift +// Create the server interceptor +let interceptor = ServerOTelMetricsInterceptor( + serverHostname: "someserver.com", + networkTransportMethod: "tcp" +) + +// Add it as an interceptor when creating your server +let server = GRPCServer( + transport: transport, + services: [TestService()], + interceptors: interceptor +) + +// Finally run your server +try await server.serve() +``` + +For more information, look at the documentation for: +- ``GRPCOTelMetricsInterceptors/ClientOTelMetricsInterceptor``, +- ``GRPCOTelMetricsInterceptors/ServerOTelMetricsInterceptor``, +- [Client Interceptors](https://swiftpackageindex.com/grpc/grpc-swift-2/documentation/grpccore/clientinterceptor), +- [Server Interceptors](https://swiftpackageindex.com/grpc/grpc-swift-2/documentation/grpccore/serverinterceptor) diff --git a/Sources/GRPCOTelMetricsInterceptors/Metrics/BaseContext+Dimensions.swift b/Sources/GRPCOTelMetricsInterceptors/Metrics/BaseContext+Dimensions.swift new file mode 100644 index 0000000..cb14ed2 --- /dev/null +++ b/Sources/GRPCOTelMetricsInterceptors/Metrics/BaseContext+Dimensions.swift @@ -0,0 +1,65 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore +internal import GRPCInterceptorsCore + +@available(gRPCSwiftExtras 2.0, *) +extension BaseContext { + func dimensions( + serverHostname: String, + networkTransport: String + ) -> [(String, String)] { + var dimensions: [(String, String)] = [ + (GRPCOTelAttributeKeys.serverAddress, serverHostname), + (GRPCOTelAttributeKeys.networkTransport, networkTransport), + (GRPCOTelAttributeKeys.rpcSystem, "grpc"), + (GRPCOTelAttributeKeys.rpcService, self.descriptor.service.fullyQualifiedService), + (GRPCOTelAttributeKeys.rpcMethod, self.descriptor.method), + ] + + switch PeerAddress(self.remotePeer) { + case let .ipv4(address, port): + dimensions.append(contentsOf: [ + (GRPCOTelAttributeKeys.networkType, "ipv4"), + (GRPCOTelAttributeKeys.networkPeerAddress, address), + ]) + if let port { + dimensions.append(contentsOf: [ + (GRPCOTelAttributeKeys.networkPeerPort, port.description), + (GRPCOTelAttributeKeys.serverPort, port.description), + ]) + } + case let .ipv6(address, port): + dimensions.append(contentsOf: [ + (GRPCOTelAttributeKeys.networkType, "ipv6"), + (GRPCOTelAttributeKeys.networkPeerAddress, address), + ]) + if let port { + dimensions.append(contentsOf: [ + (GRPCOTelAttributeKeys.networkPeerPort, port.description), + (GRPCOTelAttributeKeys.serverPort, port.description), + ]) + } + case let .unixDomainSocket(path): + dimensions.append((GRPCOTelAttributeKeys.networkPeerAddress, path)) + case .none: + break + } + + return dimensions + } +} diff --git a/Sources/GRPCOTelMetricsInterceptors/Metrics/GRPCMetricsContext.swift b/Sources/GRPCOTelMetricsInterceptors/Metrics/GRPCMetricsContext.swift new file mode 100644 index 0000000..94aa864 --- /dev/null +++ b/Sources/GRPCOTelMetricsInterceptors/Metrics/GRPCMetricsContext.swift @@ -0,0 +1,101 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +internal import GRPCCore +internal import GRPCInterceptorsCore +internal import Metrics + +@available(gRPCSwiftExtras 2.0, *) +struct GRPCMetricsContext { + private let kind: ServiceKind + private let metricsFactory: any MetricsFactory + + private let startTime: ContinuousClock.Instant + private let dimensions: [(String, String)] + + private let requestsPerRPC: Recorder + private let responsesPerRPC: Recorder + + init(kind: ServiceKind, dimensions: [(String, String)], metricsFactory: any MetricsFactory) { + self.kind = kind + self.metricsFactory = metricsFactory + self.startTime = .now + self.dimensions = dimensions + + self.requestsPerRPC = Recorder( + label: "rpc.\(kind.rawValue).requests_per_rpc", + dimensions: dimensions, + factory: metricsFactory + ) + self.responsesPerRPC = Recorder( + label: "rpc.\(kind.rawValue).responses_per_rpc", + dimensions: dimensions, + factory: metricsFactory + ) + } + + func recordSentMessage() { + switch kind { + case .client: + requestsPerRPC.record(1) + case .server: + responsesPerRPC.record(1) + } + } + + func recordReceivedMessage() { + switch kind { + case .client: + responsesPerRPC.record(1) + case .server: + requestsPerRPC.record(1) + } + } + + func recordCallFinished(error: (any Error)?) { + var dimensions = dimensions + + let statusCode: Status.Code = + if let error { + .init(error.rpcErrorCode ?? .unknown) + } else { + .ok + } + + dimensions.append((GRPCOTelAttributeKeys.grpcStatusCode, statusCode.rawValue.description)) + + Metrics.Timer( + label: "rpc.\(kind.rawValue).duration", + dimensions: dimensions, + preferredDisplayUnit: .seconds, + factory: self.metricsFactory + ) + .record(duration: .now - startTime) + } +} + +@available(gRPCSwiftExtras 2.0, *) +extension Error { + fileprivate var rpcErrorCode: RPCError.Code? { + if let rpcError = self as? RPCError { + return rpcError.code + } else if let rpcError = self as? any RPCErrorConvertible { + return rpcError.rpcErrorCode + } else { + return nil + } + } +} diff --git a/Sources/GRPCOTelMetricsInterceptors/ServerOTelMetricsInterceptor.swift b/Sources/GRPCOTelMetricsInterceptors/ServerOTelMetricsInterceptor.swift new file mode 100644 index 0000000..4014c94 --- /dev/null +++ b/Sources/GRPCOTelMetricsInterceptors/ServerOTelMetricsInterceptor.swift @@ -0,0 +1,113 @@ +/* + * Copyright 2024-2025, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public import GRPCCore +internal import GRPCInterceptorsCore +package import Metrics + +/// A server interceptor that records metrics information for a request. +/// +/// For more information, refer to the documentation for `swift-metrics`. +/// +/// This interceptor will record all required and recommended metrics and dimensions as defined by OpenTelemetry's documentation on: +/// - https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics +@available(gRPCSwiftExtras 2.0, *) +public struct ServerOTelMetricsInterceptor: ServerInterceptor { + private let serverHostname: String + private let networkTransport: String + private let metricsFactory: (any MetricsFactory)? + + /// Create a new instance of a `ServerOTelMetricsInterceptor`. + /// - Parameters: + /// - serverHostname: The hostname of the RPC server. This will be the value for the `server.address` dimension in metrics. + /// - networkTransport: The transport in use (e.g. "tcp", "unix"). This will be the value for the + /// `network.transport` dimension in metrics. + public init(serverHostname: String, networkTransport: String) { + self.init( + serverHostname: serverHostname, + networkTransport: networkTransport, + metricsFactory: nil + ) + } + + package init( + serverHostname: String, + networkTransport: String, + metricsFactory: (any MetricsFactory)? + ) { + self.serverHostname = serverHostname + self.networkTransport = networkTransport + self.metricsFactory = metricsFactory + } + + public func intercept( + request: StreamingServerRequest, + context: ServerContext, + next: (StreamingServerRequest, ServerContext) async throws -> StreamingServerResponse< + Output + > + ) async throws -> StreamingServerResponse { + let dimensions = context.dimensions( + serverHostname: self.serverHostname, + networkTransport: networkTransport + ) + + let metricsContext = GRPCMetricsContext( + kind: .server, + dimensions: dimensions, + metricsFactory: self.metricsFactory ?? MetricsSystem.factory + ) + + var request = request + request.messages = RPCAsyncSequence( + wrapping: request.messages.map({ element in + metricsContext.recordReceivedMessage() + return element + }) + ) + + var response = try await next(request, context) + + switch response.accepted { + case var .success(success): + let wrappedProducer = success.producer + + success.producer = { writer in + let hookedWriter = HookedWriter(wrapping: writer) { + metricsContext.recordSentMessage() + } + + let metadata: Metadata + do { + metadata = try await wrappedProducer(RPCWriter(wrapping: hookedWriter)) + } catch { + metricsContext.recordCallFinished(error: error) + throw error + } + + metricsContext.recordCallFinished(error: nil) + return metadata + } + + response.accepted = .success(success) + return response + case let .failure(rpcError): + metricsContext.recordCallFinished(error: rpcError) + } + + return response + } +} diff --git a/Sources/GRPCOTelTracingInterceptors/Documentation.docc/Documentation.md b/Sources/GRPCOTelTracingInterceptors/Documentation.docc/Documentation.md index fe09368..909a20c 100644 --- a/Sources/GRPCOTelTracingInterceptors/Documentation.docc/Documentation.md +++ b/Sources/GRPCOTelTracingInterceptors/Documentation.docc/Documentation.md @@ -52,5 +52,5 @@ try await server.serve() For more information, look at the documentation for: - ``GRPCOTelTracingInterceptors/ClientOTelTracingInterceptor``, - ``GRPCOTelTracingInterceptors/ServerOTelTracingInterceptor``, -- [Client Interceptors](https://swiftpackageindex.com/grpc/grpc-swift/documentation/grpccore/clientinterceptor), -- [Server Interceptors](https://swiftpackageindex.com/grpc/grpc-swift/documentation/grpccore/serverinterceptor) +- [Client Interceptors](https://swiftpackageindex.com/grpc/grpc-swift-2/documentation/grpccore/clientinterceptor), +- [Server Interceptors](https://swiftpackageindex.com/grpc/grpc-swift-2/documentation/grpccore/serverinterceptor) diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift index 0962b32..18753dd 100644 --- a/Sources/GRPCOTelTracingInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/SpanAttributes+GRPCTracingKeys.swift @@ -15,31 +15,9 @@ */ internal import GRPCCore +internal import GRPCInterceptorsCore internal import Tracing -enum GRPCTracingKeys { - static let rpcSystem = "rpc.system" - static let rpcMethod = "rpc.method" - static let rpcService = "rpc.service" - static let rpcMessageID = "rpc.message.id" - static let rpcMessageType = "rpc.message.type" - static let grpcStatusCode = "rpc.grpc.status_code" - - static let serverAddress = "server.address" - static let serverPort = "server.port" - - static let clientAddress = "client.address" - static let clientPort = "client.port" - - static let networkTransport = "network.transport" - static let networkType = "network.type" - static let networkPeerAddress = "network.peer.address" - static let networkPeerPort = "network.peer.port" - - fileprivate static let requestMetadataPrefix = "rpc.grpc.request.metadata." - fileprivate static let responseMetadataPrefix = "rpc.grpc.response.metadata." -} - @available(gRPCSwiftExtras 2.0, *) extension Span { // See: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/ @@ -48,28 +26,28 @@ extension Span { serverHostname: String, networkTransportMethod: String ) { - self.attributes[GRPCTracingKeys.rpcSystem] = "grpc" - self.attributes[GRPCTracingKeys.serverAddress] = serverHostname - self.attributes[GRPCTracingKeys.networkTransport] = networkTransportMethod - self.attributes[GRPCTracingKeys.rpcService] = context.descriptor.service.fullyQualifiedService - self.attributes[GRPCTracingKeys.rpcMethod] = context.descriptor.method + self.attributes[GRPCOTelAttributeKeys.rpcSystem] = "grpc" + self.attributes[GRPCOTelAttributeKeys.serverAddress] = serverHostname + self.attributes[GRPCOTelAttributeKeys.networkTransport] = networkTransportMethod + self.attributes[GRPCOTelAttributeKeys.rpcService] = context.descriptor.service.fullyQualifiedService + self.attributes[GRPCOTelAttributeKeys.rpcMethod] = context.descriptor.method // Set server address information switch PeerAddress(context.remotePeer) { case .ipv4(let address, let port): - self.attributes[GRPCTracingKeys.networkType] = "ipv4" - self.attributes[GRPCTracingKeys.networkPeerAddress] = address - self.attributes[GRPCTracingKeys.networkPeerPort] = port - self.attributes[GRPCTracingKeys.serverPort] = port + self.attributes[GRPCOTelAttributeKeys.networkType] = "ipv4" + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = address + self.attributes[GRPCOTelAttributeKeys.networkPeerPort] = port + self.attributes[GRPCOTelAttributeKeys.serverPort] = port case .ipv6(let address, let port): - self.attributes[GRPCTracingKeys.networkType] = "ipv6" - self.attributes[GRPCTracingKeys.networkPeerAddress] = address - self.attributes[GRPCTracingKeys.networkPeerPort] = port - self.attributes[GRPCTracingKeys.serverPort] = port + self.attributes[GRPCOTelAttributeKeys.networkType] = "ipv6" + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = address + self.attributes[GRPCOTelAttributeKeys.networkPeerPort] = port + self.attributes[GRPCOTelAttributeKeys.serverPort] = port case .unixDomainSocket(let path): - self.attributes[GRPCTracingKeys.networkPeerAddress] = path + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = path case .none: // We don't recognise this address format, so don't populate any fields. @@ -82,28 +60,28 @@ extension Span { serverHostname: String, networkTransportMethod: String ) { - self.attributes[GRPCTracingKeys.rpcSystem] = "grpc" - self.attributes[GRPCTracingKeys.serverAddress] = serverHostname - self.attributes[GRPCTracingKeys.networkTransport] = networkTransportMethod - self.attributes[GRPCTracingKeys.rpcService] = context.descriptor.service.fullyQualifiedService - self.attributes[GRPCTracingKeys.rpcMethod] = context.descriptor.method + self.attributes[GRPCOTelAttributeKeys.rpcSystem] = "grpc" + self.attributes[GRPCOTelAttributeKeys.serverAddress] = serverHostname + self.attributes[GRPCOTelAttributeKeys.networkTransport] = networkTransportMethod + self.attributes[GRPCOTelAttributeKeys.rpcService] = context.descriptor.service.fullyQualifiedService + self.attributes[GRPCOTelAttributeKeys.rpcMethod] = context.descriptor.method // Set server address information switch PeerAddress(context.localPeer) { case .ipv4(let address, let port): - self.attributes[GRPCTracingKeys.networkType] = "ipv4" - self.attributes[GRPCTracingKeys.networkPeerAddress] = address - self.attributes[GRPCTracingKeys.networkPeerPort] = port - self.attributes[GRPCTracingKeys.serverPort] = port + self.attributes[GRPCOTelAttributeKeys.networkType] = "ipv4" + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = address + self.attributes[GRPCOTelAttributeKeys.networkPeerPort] = port + self.attributes[GRPCOTelAttributeKeys.serverPort] = port case .ipv6(let address, let port): - self.attributes[GRPCTracingKeys.networkType] = "ipv6" - self.attributes[GRPCTracingKeys.networkPeerAddress] = address - self.attributes[GRPCTracingKeys.networkPeerPort] = port - self.attributes[GRPCTracingKeys.serverPort] = port + self.attributes[GRPCOTelAttributeKeys.networkType] = "ipv6" + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = address + self.attributes[GRPCOTelAttributeKeys.networkPeerPort] = port + self.attributes[GRPCOTelAttributeKeys.serverPort] = port case .unixDomainSocket(let path): - self.attributes[GRPCTracingKeys.networkPeerAddress] = path + self.attributes[GRPCOTelAttributeKeys.networkPeerAddress] = path case .none: // We don't recognise this address format, so don't populate any fields. @@ -113,15 +91,15 @@ extension Span { // Set client address information switch PeerAddress(context.remotePeer) { case .ipv4(let address, let port): - self.attributes[GRPCTracingKeys.clientAddress] = address - self.attributes[GRPCTracingKeys.clientPort] = port + self.attributes[GRPCOTelAttributeKeys.clientAddress] = address + self.attributes[GRPCOTelAttributeKeys.clientPort] = port case .ipv6(let address, let port): - self.attributes[GRPCTracingKeys.clientAddress] = address - self.attributes[GRPCTracingKeys.clientPort] = port + self.attributes[GRPCOTelAttributeKeys.clientAddress] = address + self.attributes[GRPCOTelAttributeKeys.clientPort] = port case .unixDomainSocket(let path): - self.attributes[GRPCTracingKeys.clientAddress] = path + self.attributes[GRPCOTelAttributeKeys.clientAddress] = path case .none: // We don't recognise this address format, so don't populate any fields. @@ -132,14 +110,14 @@ extension Span { func setMetadataStringAttributesAsRequestSpanAttributes(_ metadata: Metadata) { self.setMetadataStringAttributesAsSpanAttributes( metadata, - prefix: GRPCTracingKeys.requestMetadataPrefix + prefix: GRPCOTelAttributeKeys.requestMetadataPrefix ) } func setMetadataStringAttributesAsResponseSpanAttributes(_ metadata: Metadata) { self.setMetadataStringAttributesAsSpanAttributes( metadata, - prefix: GRPCTracingKeys.responseMetadataPrefix + prefix: GRPCOTelAttributeKeys.responseMetadataPrefix ) } diff --git a/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift index dee934a..6c9ffdd 100644 --- a/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift +++ b/Sources/GRPCOTelTracingInterceptors/Tracing/Tracing+RPC.swift @@ -15,6 +15,7 @@ */ internal import GRPCCore +internal import GRPCInterceptorsCore internal import Synchronization internal import Tracing @@ -22,14 +23,14 @@ extension Span { @available(gRPCSwiftExtras 2.0, *) func endRPC() { // No error, status code zero. - self.attributes[GRPCTracingKeys.grpcStatusCode] = 0 + self.attributes[GRPCOTelAttributeKeys.grpcStatusCode] = 0 self.setStatus(SpanStatus(code: .ok)) self.end() } @available(gRPCSwiftExtras 2.0, *) func endRPC(withError error: RPCError) { - self.attributes[GRPCTracingKeys.grpcStatusCode] = error.code.rawValue + self.attributes[GRPCOTelAttributeKeys.grpcStatusCode] = error.code.rawValue self.setStatus(SpanStatus(code: .error)) self.recordError(error) self.end() @@ -42,7 +43,7 @@ extension Span { } else if let convertible = error as? any RPCErrorConvertible { self.endRPC(withError: RPCError(convertible)) } else { - self.attributes[GRPCTracingKeys.grpcStatusCode] = RPCError.Code.unknown.rawValue + self.attributes[GRPCOTelAttributeKeys.grpcStatusCode] = RPCError.Code.unknown.rawValue self.setStatus(SpanStatus(code: .error)) self.recordError(error) self.end() @@ -53,8 +54,8 @@ extension Span { extension SpanEvent { private static func rpcMessage(type: String, id: Int) -> Self { var event = SpanEvent(name: "rpc.message") - event.attributes[GRPCTracingKeys.rpcMessageType] = type - event.attributes[GRPCTracingKeys.rpcMessageID] = id + event.attributes[GRPCOTelAttributeKeys.rpcMessageType] = type + event.attributes[GRPCOTelAttributeKeys.rpcMessageID] = id return event } diff --git a/Tests/GRPCOTelTracingInterceptorsTests/PeerAddressTests.swift b/Tests/GRPCInterceptorsCoreTests/PeerAddressTests.swift similarity index 98% rename from Tests/GRPCOTelTracingInterceptorsTests/PeerAddressTests.swift rename to Tests/GRPCInterceptorsCoreTests/PeerAddressTests.swift index cf24b77..e2558d7 100644 --- a/Tests/GRPCOTelTracingInterceptorsTests/PeerAddressTests.swift +++ b/Tests/GRPCInterceptorsCoreTests/PeerAddressTests.swift @@ -14,7 +14,7 @@ * limitations under the License. */ -import GRPCOTelTracingInterceptors +import GRPCInterceptorsCore import Testing @Suite("PeerAddress tests") diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/ClientOTelMetricsInterceptor.swift b/Tests/GRPCOTelMetricsInterceptorsTests/ClientOTelMetricsInterceptor.swift new file mode 100644 index 0000000..4c91c5d --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/ClientOTelMetricsInterceptor.swift @@ -0,0 +1,255 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +import Foundation +import GRPCCore +import GRPCOTelMetricsInterceptors +import Metrics +import MetricsTestKit +import Testing + +@Suite("Client OTel Metrics Interceptor ") +struct ClientOTelMetricsInterceptorTests { + private let metrics = TestMetrics() + + @Test( + "Successful RPC is recorded correctly", + arguments: OTelMetricsInterceptorTestAddressType.allCases + ) + @available(gRPCSwiftExtras 2.0, *) + func interceptorRecordsMetricsForSuccessfulCall( + addressType: OTelMetricsInterceptorTestAddressType + ) async throws { + let interceptor = ClientOTelMetricsInterceptor( + serverHostname: "test-server", + networkTransport: "tcp", + metricsFactory: self.metrics + ) + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "test-service", + method: "test-method" + ) + let (_, requestStreamContinuation) = AsyncStream.makeStream() + + let testValues = OTelMetricsInterceptorTestCaseValues(addressType: addressType) + + let response = try await interceptor.intercept( + request: StreamingClientRequest { writer in + try await writer.write(contentsOf: ["test-message"]) + }, + context: ClientContext( + descriptor: methodDescriptor, + remotePeer: testValues.remotePeerAddress, + localPeer: testValues.localPeerAddress + ), + ) { request, _ in + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) + try await request.producer(writer) + requestStreamContinuation.finish() + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.yield(.message("response-message")) + $0.finish() + } + ) + ) + } + + await #expect(throws: Never.self) { + if case let .success(contents) = response.accepted { + for try await _ in contents.bodyParts {} + } + } + + let requestsPerRPC = try #require( + self.metrics.recorders.first { $0.label == "rpc.client.requests_per_rpc" } + ) + expectDimensions(requestsPerRPC.dimensions, expectAdditional: testValues.expectedDimensions) + #expect(requestsPerRPC.values.reduce(0, +) == 1) + + let responsesPerRPC = try #require( + self.metrics.recorders.first { $0.label == "rpc.client.responses_per_rpc" } + ) + expectDimensions(responsesPerRPC.dimensions, expectAdditional: testValues.expectedDimensions) + #expect(responsesPerRPC.values.reduce(0, +) == 1) + + let duration = try #require(self.metrics.timers.first { $0.label == "rpc.client.duration" }) + expectDimensions( + duration.dimensions, + expectAdditional: [("rpc.grpc.status_code", "0")] + testValues.expectedDimensions + ) + let lastDurationValueNanoSeconds = try #require(duration.lastValue) + let lastDurationValue = Double(lastDurationValueNanoSeconds) / 1_000_000_000 + #expect(0 < lastDurationValue && lastDurationValue < 1) // between 0 and 1 sec + } + + @Test("RPC that throws is correctly recorded") + @available(gRPCSwiftExtras 2.0, *) + func interceptorRecordsMetricsForFailedCall() async throws { + let interceptor = ClientOTelMetricsInterceptor( + serverHostname: "test-server", + networkTransport: "tcp", + metricsFactory: self.metrics + ) + + let (_, requestStreamContinuation) = AsyncStream.makeStream() + let request = StreamingClientRequest { writer in + try await writer.write("test-message") + } + + let next: + (StreamingClientRequest, ClientContext) async throws -> StreamingClientResponse< + String + > = { request, _ in + // Make sure the `producer` closure's which includes instrumentation is called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) + try await request.producer(writer) + requestStreamContinuation.finish() + + return .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.finish(throwing: RPCError(code: .unavailable, message: "This should be thrown")) + } + ) + ) + } + + do { + let response = try await interceptor.intercept( + request: request, + context: ClientContext( + descriptor: .init(fullyQualifiedService: "test-service", method: "test-method"), + remotePeer: "", + localPeer: "" + ), + next: next + ) + + // Consume the response to trigger the hooks + if case let .success(contents) = response.accepted { + for try await _ in contents.bodyParts { + // We don't care about any received messages here + } + } + } catch { + let requestsPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.client.requests_per_rpc" }) + ) + expectDimensions(requestsPerRPC.dimensions) + #expect(requestsPerRPC.values.reduce(0, +) == 1) + + let responsesPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.client.responses_per_rpc" }) + ) + expectDimensions(responsesPerRPC.dimensions) + #expect(responsesPerRPC.values == []) // empty values, as error is thrown in response + + let duration = try #require( + self.metrics.timers.first(where: { $0.label == "rpc.client.duration" }) + ) + expectDimensions( + duration.dimensions, + expectAdditional: [("rpc.grpc.status_code", "\(RPCError.Code.unavailable.rawValue)")] + ) + let lastDurationValueNanoSeconds = try #require(duration.lastValue) + let lastDurationValue = Double(lastDurationValueNanoSeconds) / 1_000_000_000 + #expect(0 < lastDurationValue && lastDurationValue < 1) // between 0 and 1 sec + } + } + + @Test("Messages in bidirectional-streaming are counted correctly") + @available(gRPCSwiftExtras 2.0, *) + func interceptorCountsMessagesCorrectly() async throws { + let interceptor = ClientOTelMetricsInterceptor( + serverHostname: "test-server", + networkTransport: "tcp", + metricsFactory: self.metrics + ) + let (_, requestStreamContinuation) = AsyncStream.makeStream() + + let request = StreamingClientRequest { writer in + // write multiple requests + try await writer.write("test-message-1") + try await writer.write("test-message-2") + try await writer.write("test-message-3") + } + + let responseContent = StreamingClientResponse( + accepted: .success( + .init( + metadata: [], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + // write multiple responses + $0.yield(.message("response-message-1")) + $0.yield(.message("response-message-2")) + $0.finish() + } + ) + ) + ) + ) + + let next: + (StreamingClientRequest, ClientContext) async throws -> StreamingClientResponse< + String + > = { request, _ in + // Make sure the `producer` closure's which includes instrumentation is called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: requestStreamContinuation)) + try await request.producer(writer) + requestStreamContinuation.finish() + return responseContent + } + + // Execute the interceptor + let response = try await interceptor.intercept( + request: request, + context: ClientContext( + descriptor: .init(fullyQualifiedService: "test-service", method: "test-method"), + remotePeer: "", + localPeer: "" + ), + next: next + ) + + // Consume the response to trigger the hooks + if case let .success(contents) = response.accepted { + for try await _ in contents.bodyParts {} + } + + let requestsPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.client.requests_per_rpc" }) + ) + expectDimensions(requestsPerRPC.dimensions) + #expect(requestsPerRPC.values.reduce(0, +) == 3) // 3 requests + + let responsesPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.client.responses_per_rpc" }) + ) + expectDimensions(responsesPerRPC.dimensions) + #expect(responsesPerRPC.values.reduce(0, +) == 2) // 2 responses + + let duration = try #require( + self.metrics.timers.first(where: { $0.label == "rpc.client.duration" }) + ) + expectDimensions(duration.dimensions, expectAdditional: [("rpc.grpc.status_code", "0")]) + let lastDurationValueNanoSeconds = try #require(duration.lastValue) + let lastDurationValue = Double(lastDurationValueNanoSeconds) / 1_000_000_000 + #expect(0 < lastDurationValue && lastDurationValue < 1) // between 0 and 1 sec + } +} diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/ExpectDimensions.swift b/Tests/GRPCOTelMetricsInterceptorsTests/ExpectDimensions.swift new file mode 100644 index 0000000..04b9c3f --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/ExpectDimensions.swift @@ -0,0 +1,53 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +import Testing + +func expectDimensions( + _ dimensions: [(String, String)], + rpcSystem: String = "grpc", + serverAddress: String = "test-server", + networkTransport: String = "tcp", + rpcService: String = "test-service", + rpcMethod: String = "test-method", + expectAdditional additionalDimensions: [(String, String)] = [], + sourceLocation: SourceLocation = #_sourceLocation +) { + struct Tuple: Hashable, CustomStringConvertible { + let first: String + let second: String + + var description: String { + "(\(first), \(second))" + } + } + + let expectedDimensions = + [ + ("rpc.system", rpcSystem), + ("server.address", serverAddress), + ("network.transport", networkTransport), + ("rpc.service", rpcService), + ("rpc.method", rpcMethod), + ] + additionalDimensions + + let expectedSet: Set = expectedDimensions.reduce(into: []) { partialResult, element in + partialResult.insert(Tuple(first: element.0, second: element.1)) + } + let dimensionsSet: Set = dimensions.reduce(into: []) { partialResult, element in + partialResult.insert(Tuple(first: element.0, second: element.1)) + } + + #expect(dimensionsSet == expectedSet, sourceLocation: sourceLocation) +} diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestAddressType.swift b/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestAddressType.swift new file mode 100644 index 0000000..b6322dd --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestAddressType.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +enum OTelMetricsInterceptorTestAddressType: CaseIterable { + case ipv4 + case ipv6 + case uds +} diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestCaseValues.swift b/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestCaseValues.swift new file mode 100644 index 0000000..701388d --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/OTelMetricsInterceptorTestCaseValues.swift @@ -0,0 +1,48 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +struct OTelMetricsInterceptorTestCaseValues { + let remotePeerAddress: String + let localPeerAddress: String + let expectedDimensions: [(String, String)] + + init(addressType: OTelMetricsInterceptorTestAddressType) { + switch addressType { + case .ipv4: + self.remotePeerAddress = "ipv4:10.1.2.80:567" + self.localPeerAddress = "ipv4:10.1.2.80:123" + self.expectedDimensions = [ + ("network.peer.address", "10.1.2.80"), + ("network.peer.port", "567"), + ("server.port", "567"), + ("network.type", "ipv4"), + ] + case .ipv6: + self.remotePeerAddress = "ipv6:[2001::130F:::09C0:876A:130B]:1234" + self.localPeerAddress = "ipv6:[ff06:0:0:0:0:0:0:c3]:5678" + self.expectedDimensions = [ + ("network.peer.address", "2001::130F:::09C0:876A:130B"), + ("network.peer.port", "1234"), + ("server.port", "1234"), + ("network.type", "ipv6"), + ] + case .uds: + self.remotePeerAddress = "unix:some-path" + self.localPeerAddress = "unix:some-path" + self.expectedDimensions = [ + ("network.peer.address", "some-path") + ] + } + } +} diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/ServerOTelMetricsInterceptorTests.swift b/Tests/GRPCOTelMetricsInterceptorsTests/ServerOTelMetricsInterceptorTests.swift new file mode 100644 index 0000000..bc8dac8 --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/ServerOTelMetricsInterceptorTests.swift @@ -0,0 +1,192 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +import Foundation +import GRPCCore +import GRPCOTelMetricsInterceptors +import Metrics +import MetricsTestKit +import Testing + +@Suite("Server OTel Metrics Interceptor ") +struct ServerOTelMetricsInterceptorTests { + private let metrics = TestMetrics() + + @Test( + "Successful RPC is recorded correctly", + arguments: OTelMetricsInterceptorTestAddressType.allCases + ) + @available(gRPCSwiftExtras 2.0, *) + func interceptorRecordsMetricsForSuccessfulCall( + addressType: OTelMetricsInterceptorTestAddressType + ) async throws { + let interceptor = ServerOTelMetricsInterceptor( + serverHostname: "test-server", + networkTransport: "tcp", + metricsFactory: self.metrics + ) + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "test-service", + method: "test-method" + ) + let request = ServerRequest(metadata: [:], message: "Hello World") + + let testValues = OTelMetricsInterceptorTestCaseValues(addressType: addressType) + + let response = try await interceptor.intercept( + request: .init(single: request), + context: ServerContext( + descriptor: methodDescriptor, + remotePeer: testValues.remotePeerAddress, + localPeer: testValues.localPeerAddress, + cancellation: .init() + ) + ) { request, _ in + for try await _ in request.messages { + // We need to iterate over the messages for the interceptor to be able to record metrics. + } + + return StreamingServerResponse( + accepted: .success( + .init( + metadata: [], + producer: { writer in + try await writer.write("response1") + try await writer.write("response2") + return ["Result": "Trailing metadata"] + } + ) + ) + ) + } + + let (responseStream, responseStreamContinuation) = AsyncStream.makeStream() + let responseContents = try response.accepted.get() + let trailingMetadata = try await responseContents.producer( + RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation)) + ) + responseStreamContinuation.finish() + + await assertStreamContentsEqual(["response1", "response2"], responseStream) + #expect(trailingMetadata == ["Result": "Trailing metadata"]) + + let requestsPerRPC = try #require( + self.metrics.recorders.first { $0.label == "rpc.server.requests_per_rpc" } + ) + expectDimensions(requestsPerRPC.dimensions, expectAdditional: testValues.expectedDimensions) + #expect(requestsPerRPC.values.reduce(0, +) == 1) + + let responsesPerRPC = try #require( + self.metrics.recorders.first { $0.label == "rpc.server.responses_per_rpc" } + ) + expectDimensions(responsesPerRPC.dimensions, expectAdditional: testValues.expectedDimensions) + #expect(responsesPerRPC.values.reduce(0, +) == 2) + + let duration = try #require(self.metrics.timers.first { $0.label == "rpc.server.duration" }) + expectDimensions( + duration.dimensions, + expectAdditional: [("rpc.grpc.status_code", "0")] + testValues.expectedDimensions + ) + let lastDurationValueNanoSeconds = try #require(duration.lastValue) + let lastDurationValue = Double(lastDurationValueNanoSeconds) / 1_000_000_000 + #expect(0 < lastDurationValue && lastDurationValue < 1) // between 0 and 1 sec + } + + @Test("RPC that throws is correctly recorded") + @available(gRPCSwiftExtras 2.0, *) + func interceptorRecordsMetricsForFailedCall() async throws { + let interceptor = ServerOTelMetricsInterceptor( + serverHostname: "test-server", + networkTransport: "tcp", + metricsFactory: self.metrics + ) + let methodDescriptor = MethodDescriptor( + fullyQualifiedService: "test-service", + method: "test-method" + ) + let request = ServerRequest(metadata: [:], message: "Hello World") + + // (StreamingServerRequest, ServerContext) async throws -> StreamingServerResponse + let next: + (StreamingServerRequest, ServerContext) async throws -> StreamingServerResponse< + String + > = { request, _ in + for try await _ in request.messages { + // We need to iterate over the messages for the interceptor to be able to record metrics. + } + + return StreamingServerResponse( + error: RPCError(code: .unavailable, message: "This should be thrown") + ) + } + + do { + let response = try await interceptor.intercept( + request: .init(single: request), + context: ServerContext( + descriptor: methodDescriptor, + remotePeer: "", + localPeer: "", + cancellation: .init() + ), + next: next + ) + + let (responseStream, responseStreamContinuation) = AsyncStream.makeStream() + let responseContents = try response.accepted.get() + let trailingMetadata = try await responseContents.producer( + RPCWriter(wrapping: TestWriter(streamContinuation: responseStreamContinuation)) + ) + responseStreamContinuation.finish() + + await assertStreamContentsEqual(["response1", "response2"], responseStream) + #expect(trailingMetadata == ["Result": "Trailing metadata"]) + } catch { + let requestsPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.server.requests_per_rpc" }) + ) + expectDimensions(requestsPerRPC.dimensions) + #expect(requestsPerRPC.values.reduce(0, +) == 1) + + let responsesPerRPC = try #require( + self.metrics.recorders.first(where: { $0.label == "rpc.server.responses_per_rpc" }) + ) + expectDimensions(responsesPerRPC.dimensions) + #expect(responsesPerRPC.values == []) // empty values, as error is thrown in response + + let duration = try #require( + self.metrics.timers.first(where: { $0.label == "rpc.server.duration" }) + ) + expectDimensions( + duration.dimensions, + expectAdditional: [("rpc.grpc.status_code", "\(RPCError.Code.unavailable.rawValue)")] + ) + let lastDurationValueNanoSeconds = try #require(duration.lastValue) + let lastDurationValue = Double(lastDurationValueNanoSeconds) / 1_000_000_000 + #expect(0 < lastDurationValue && lastDurationValue < 1) // between 0 and 1 sec + } + } +} + +@available(gRPCSwiftExtras 2.0, *) +private func assertStreamContentsEqual( + _ array: [T], + _ stream: any AsyncSequence +) async { + var streamElements = [T]() + for await element in stream { + streamElements.append(element) + } + #expect(streamElements == array) +} diff --git a/Tests/GRPCOTelMetricsInterceptorsTests/TestWriter.swift b/Tests/GRPCOTelMetricsInterceptorsTests/TestWriter.swift new file mode 100644 index 0000000..d981d33 --- /dev/null +++ b/Tests/GRPCOTelMetricsInterceptorsTests/TestWriter.swift @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Temporal SDK open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift Temporal SDK project authors +// Licensed under MIT License +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Temporal SDK project authors +// +// SPDX-License-Identifier: MIT +// +//===----------------------------------------------------------------------===// + +import GRPCCore + +struct TestWriter: RPCWriterProtocol { + typealias Element = WriterElement + + private let streamContinuation: AsyncStream.Continuation + + init(streamContinuation: AsyncStream.Continuation) { + self.streamContinuation = streamContinuation + } + + func write(_ element: WriterElement) { + self.streamContinuation.yield(element) + } + + func write(contentsOf elements: some Sequence) { + elements.forEach { element in + self.write(element) + } + } +}