Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions pkg/capabilities/base_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,27 @@ func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
Id: event.EventId,
}

if !safeSend(sendCh, wrapped) {
b.metrics.IncInboxFull(event.TriggerId)
b.lggr.Warnf("inbox full or closed for trigger %s", event.TriggerId)
return
}

b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
b.capabilityId, event.TriggerId, event.EventId, attempts)
}

func safeSend[T any](ch chan<- T, val T) (sent bool) {
defer func() {
if recover() != nil {
sent = false
}
}()

select {
case sendCh <- wrapped:
b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
b.capabilityId, event.TriggerId, event.EventId, attempts)
case ch <- val:
return true
default:
b.metrics.IncInboxFull(event.TriggerId)
b.lggr.Warnf("inbox full for trigger %s", event.TriggerId)
return false
}
}
43 changes: 26 additions & 17 deletions pkg/capabilities/pb/capabilities.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/capabilities/pb/capabilities.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ message InitialiseRequest {
uint32 keystore_id = 10;
uint32 org_resolver_id = 11;
uint32 cre_settings_id = 12;
uint32 trigger_event_store_id = 13;
}

message CapabilityInfosReply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/capability"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/errorlog"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/eventstore"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/gateway"
keystoreservice "github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keystore"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/core/services/keyvalue"
Expand Down Expand Up @@ -196,19 +197,34 @@ func (c *StandardCapabilitiesClient) Initialise(ctx context.Context, dependencie
resources = append(resources, creSettingsRes)
}

triggerEventStore := dependencies.TriggerEventStore
var triggerEventStoreID uint32
if triggerEventStore != nil {
var triggerEventStoreRes net.Resource
triggerEventStoreID, triggerEventStoreRes, err = c.ServeNew("TriggerEventStore", func(s *grpc.Server) {
pb.RegisterEventStoreServer(s, eventstore.NewServer(triggerEventStore))
})
if err != nil {
c.CloseAll(resources...)
return fmt.Errorf("failed to serve trigger event store: %w", err)
}
resources = append(resources, triggerEventStoreRes)
}

_, err = c.StandardCapabilitiesClient.Initialise(ctx, &capabilitiespb.InitialiseRequest{
Config: config,
ErrorLogId: errorLogID,
PipelineRunnerId: pipelineRunnerID,
TelemetryId: telemetryID,
CapRegistryId: capabilitiesRegistryID,
KeyValueStoreId: keyValueStoreID,
RelayerSetId: relayerSetID,
OracleFactoryId: oracleFactoryID,
GatewayConnectorId: gatewayConnectorID,
KeystoreId: keyStoreID,
OrgResolverId: orgResolverID,
CreSettingsId: creSettingsID,
Config: config,
ErrorLogId: errorLogID,
PipelineRunnerId: pipelineRunnerID,
TelemetryId: telemetryID,
CapRegistryId: capabilitiesRegistryID,
KeyValueStoreId: keyValueStoreID,
RelayerSetId: relayerSetID,
OracleFactoryId: oracleFactoryID,
GatewayConnectorId: gatewayConnectorID,
KeystoreId: keyStoreID,
OrgResolverId: orgResolverID,
CreSettingsId: creSettingsID,
TriggerEventStoreId: triggerEventStoreID,
})

if err != nil {
Expand Down Expand Up @@ -375,6 +391,17 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca
creSettings = settings.NewClient(s.Logger, creSettingsConn)
}

var triggerEventStoreClient capabilities.EventStore
if request.TriggerEventStoreId > 0 {
triggerEventStoreConn, err := s.Dial(request.TriggerEventStoreId)
if err != nil {
s.CloseAll(resources...)
return nil, net.ErrConnDial{Name: "TriggerEventStore", ID: request.TriggerEventStoreId, Err: err}
}
resources = append(resources, net.Resource{Closer: triggerEventStoreConn, Name: "TriggerEventStore"})
triggerEventStoreClient = eventstore.NewClient(triggerEventStoreConn)
}

dependencies := core.StandardCapabilitiesDependencies{
Config: request.Config,
TelemetryService: telemetry,
Expand All @@ -388,6 +415,7 @@ func (s *standardCapabilitiesServer) Initialise(ctx context.Context, request *ca
P2PKeystore: keyStore,
OrgResolver: orgResolver,
CRESettings: creSettings,
TriggerEventStore: triggerEventStoreClient,
}

if err = s.impl.Initialise(ctx, dependencies); err != nil {
Expand Down
87 changes: 87 additions & 0 deletions pkg/loop/internal/core/services/eventstore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package eventstore

import (
"context"
"time"

"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop/internal/pb"
)

var _ capabilities.EventStore = (*Client)(nil)

type Client struct {
grpc pb.EventStoreClient
}

func NewClient(cc grpc.ClientConnInterface) *Client {
return &Client{grpc: pb.NewEventStoreClient(cc)}
}

func (c *Client) Insert(ctx context.Context, rec capabilities.PendingEvent) error {
ev := &pb.PendingEventProto{
TriggerId: rec.TriggerId,
EventId: rec.EventId,
Payload: rec.Payload,
FirstAt: timestamppb.New(rec.FirstAt),
Attempts: int32(rec.Attempts),
}
if !rec.LastSentAt.IsZero() {
ev.LastSentAt = timestamppb.New(rec.LastSentAt)
}
_, err := c.grpc.Insert(ctx, &pb.InsertEventRequest{Event: ev})
return err
}

func (c *Client) UpdateDelivery(ctx context.Context, triggerId string, eventId string, lastSentAt time.Time, attempts int) error {
_, err := c.grpc.UpdateDelivery(ctx, &pb.UpdateDeliveryRequest{
TriggerId: triggerId,
EventId: eventId,
LastSentAt: timestamppb.New(lastSentAt),
Attempts: int32(attempts),
})
return err
}

func (c *Client) List(ctx context.Context) ([]capabilities.PendingEvent, error) {
resp, err := c.grpc.List(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
events := make([]capabilities.PendingEvent, 0, len(resp.GetEvents()))
for _, ev := range resp.GetEvents() {
rec := capabilities.PendingEvent{
TriggerId: ev.GetTriggerId(),
EventId: ev.GetEventId(),
Payload: ev.GetPayload(),
Attempts: int(ev.GetAttempts()),
}
if t := ev.GetFirstAt(); t != nil {
rec.FirstAt = t.AsTime()
}
if t := ev.GetLastSentAt(); t != nil {
rec.LastSentAt = t.AsTime()
}
events = append(events, rec)
}
return events, nil
}

func (c *Client) DeleteEvent(ctx context.Context, triggerId string, eventId string) error {
_, err := c.grpc.DeleteEvent(ctx, &pb.DeleteEventRequest{
TriggerId: triggerId,
EventId: eventId,
})
return err
}

func (c *Client) DeleteEventsForTrigger(ctx context.Context, triggerID string) error {
_, err := c.grpc.DeleteEventsForTrigger(ctx, &pb.DeleteEventsForTriggerRequest{
TriggerId: triggerID,
})
return err
}
Loading
Loading