RPC Elements are a middleware mechanism in the aRPC framework that allows for request and response processing at different stages of the RPC lifecycle. They provide a way to implement cross-cutting concerns like logging, metrics, authentication, and other middleware functionality.
The RPCElement interface defines the contract that all RPC elements must implement:
type RPCElement interface {
// ProcessRequest processes the request before it's sent to the server
ProcessRequest(ctx context.Context, req *RPCRequest) (*RPCRequest, context.Context, error)
// ProcessResponse processes the response after it's received from the server
ProcessResponse(ctx context.Context, resp *RPCResponse) (*RPCResponse, context.Context, error)
// Name returns the name of the RPC element
Name() string
}type RPCRequest struct {
ID uint64 // Unique identifier for the request
ServiceName string // Name of the service being called
Method string // Name of the method being called
Payload any // RPC payload
}type RPCResponse struct {
ID uint64
Result any
Error error
}The RPCElementChain provides a way to compose multiple RPC elements into a processing pipeline:
type RPCElementChain struct {
elements []RPCElement
}-
Request Processing:
- Requests are processed through elements in forward order
- Each element can modify the request before passing it to the next element
- If any element returns an error, processing stops
-
Response Processing:
- Responses are processed through elements in reverse order
- Each element can modify the response before passing it to the next element
- If any element returns an error, processing stops
Here's an example of a metrics element implementation:
type MetricsElement struct {
requestCount uint64
ctx context.Context
cancel context.CancelFunc
}
func (m *MetricsElement) ProcessRequest(ctx context.Context, req *RPCRequest) (*RPCRequest, context.Context, error) {
atomic.AddUint64(&m.requestCount, 1)
return req, ctx, nil
}
func (m *MetricsElement) ProcessResponse(ctx context.Context, resp *RPCResponse) (*RPCResponse, context.Context, error) {
if resp.Error != nil {
m.cancel() // Stop metrics on error
}
return resp, ctx, nil
}
func (m *MetricsElement) Name() string {
return "metrics"
}// Create individual elements
metrics := NewMetricsElement()
logging := NewLoggingElement(transport.RoleClient, log.New(os.Stdout, "aRPC: ", log.LstdFlags))
// Create a chain with multiple elements
rpcElements := []element.RPCElement{
metrics,
logging,
}
// Create a new element chain
chain := element.NewRPCElementChain(rpcElements...)// Create client with RPC elements
client, err := rpc.NewClient(serializer, ":9000", transportElements, rpcElements)